Skip to content

Commit

Permalink
Fix bug in previous patch for avoiding notifying for initial data
Browse files Browse the repository at this point in the history
Now we will instead return a specially constructed empty set that
can be used to distinguish empty result from missing result.

There is a new method in ChangeNotifers that consumers can use to
detect this.

I apologize for the hacky code, if the major version is ever
bumped this should be revert so that the API can return an
Optional<Set<T>> instead, or avoid firing in ChangeNotifier.setListener(listener, true)
if no change has ever happened
  • Loading branch information
spkrka committed Aug 10, 2016
1 parent fcc3c6e commit 3d54084
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 23 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@
<version>1.7.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.jayway.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>1.7.0</version>
<scope>test</scope>
</dependency>

<!-- Need explicit dependency on this to avoid conflict between awaitility and mockito -->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<distributionManagement>
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/com/spotify/dns/AbstractChangeNotifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.spotify.dns;

import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,12 +118,22 @@ protected ChangeNotificationImpl(Set<T> current, Set<T> previous) {

@Override
public Set<T> current() {
return Collections.unmodifiableSet(current);
return unmodifiable(current);
}

private Set<T> unmodifiable(Set<T> set) {
if (ChangeNotifiers.isInitialEmptyData(set)) {
return set;
}
if (set instanceof ImmutableSet) {
return set;
}
return Collections.unmodifiableSet(set);
}

@Override
public Set<T> previous() {
return Collections.unmodifiableSet(previous);
return unmodifiable(previous);
}
}
}
19 changes: 15 additions & 4 deletions src/main/java/com/spotify/dns/AggregatingChangeNotifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class AggregatingChangeNotifier<T> extends AbstractChangeNotifier<T> {

private final List<ChangeNotifier<T>> changeNotifiers;

private volatile Set<T> records = ImmutableSet.of();
private volatile boolean waitingForFirstEvent = true;
private volatile Set<T> records = ChangeNotifiers.initialEmptyDataInstance();

/**
* Create a new aggregating {@link ChangeNotifier}.
Expand Down Expand Up @@ -68,8 +67,7 @@ protected void closeImplementation() {
private synchronized void checkChange() {
Set<T> currentRecords = aggregateSet();

if (waitingForFirstEvent || !currentRecords.equals(records)) {
waitingForFirstEvent = false;
if (ChangeNotifiers.isNoLongerInitial(currentRecords, records) || !currentRecords.equals(records)) {
final ChangeNotification<T> changeNotification =
newChangeNotification(currentRecords, records);
records = currentRecords;
Expand All @@ -79,11 +77,24 @@ private synchronized void checkChange() {
}

private Set<T> aggregateSet() {
if (areAllInitial(changeNotifiers)) {
return ChangeNotifiers.initialEmptyDataInstance();
}

ImmutableSet.Builder<T> records = ImmutableSet.builder();
for (final ChangeNotifier<T> changeNotifier : changeNotifiers) {
records.addAll(changeNotifier.current());
}
return records.build();
}

private boolean areAllInitial(List<ChangeNotifier<T>> changeNotifiers) {
for (final ChangeNotifier<T> changeNotifier : changeNotifiers) {
if (!ChangeNotifiers.isInitialEmptyData(changeNotifier.current())) {
return false;
}
}
return true;
}
}

30 changes: 30 additions & 0 deletions src/main/java/com/spotify/dns/ChangeNotifiers.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.common.collect.Sets;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -28,9 +30,37 @@

public final class ChangeNotifiers {

/**
* Ensure that we get a unique empty set that you can't create any other way
* (i.e. ImmutableSet.of() always returns the same instance).
*
* This is needed to distinguishing the initial state of change notifiers from
* when they have gotten proper data.
*/
private static final Set INITIAL_EMPTY_DATA = Collections.unmodifiableSet(new HashSet());

private ChangeNotifiers() {
}

/**
* Use this to determine if the data you get back from a notifier is the initial result of the result of a proper
* DNS lookup. This is useful for distinguishing a proper but empty DNS result from the case
* where a lookup has not completed yet.
* @param set
* @return true if the input is an initially empty set.
*/
public static <T> boolean isInitialEmptyData(Set<T> set) {
return set == INITIAL_EMPTY_DATA;
}

static <T> Set<T> initialEmptyDataInstance() {
return INITIAL_EMPTY_DATA;
}

static <T> boolean isNoLongerInitial(Set<T> current, Set<T> previous) {
return isInitialEmptyData(previous) && !isInitialEmptyData(current);
}

/**
* Creates a {@link ChangeNotifier} that aggregates the records provided by a list of notifiers.
*
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/com/spotify/dns/DirectChangeNotifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.spotify.dns;

import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

Expand All @@ -28,8 +27,7 @@ class DirectChangeNotifier<T> extends AbstractChangeNotifier<T>

private final Supplier<Set<T>> recordsSupplier;

private volatile Set<T> records = ImmutableSet.of();
private volatile boolean waitingForFirstEvent = true;
private volatile Set<T> records = ChangeNotifiers.initialEmptyDataInstance();
private volatile boolean run = true;

public DirectChangeNotifier(Supplier<Set<T>> recordsSupplier) {
Expand All @@ -53,13 +51,13 @@ public void run() {
}

final Set<T> current = recordsSupplier.get();
if (waitingForFirstEvent || !current.equals(records)) {
waitingForFirstEvent = false;
if (ChangeNotifiers.isNoLongerInitial(current, records) || !current.equals(records)) {
final ChangeNotification<T> changeNotification =
newChangeNotification(current, records);
records = current;

fireRecordsUpdated(changeNotification);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Set;

import javax.annotation.Nullable;

import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand All @@ -46,7 +44,7 @@ class ServiceResolvingChangeNotifier<T> extends AbstractChangeNotifier<T>
@Nullable
private final ErrorHandler errorHandler;

private volatile Set<T> records = ImmutableSet.of();
private volatile Set<T> records = ChangeNotifiers.initialEmptyDataInstance();
private volatile boolean waitingForFirstEvent = true;

private volatile boolean run = true;
Expand Down Expand Up @@ -123,7 +121,8 @@ public void run() {
return;
}

if (waitingForFirstEvent || !current.equals(records)) {
if (ChangeNotifiers.isNoLongerInitial(current, records) || !current.equals(records)) {
// This means that any subsequent DNS error will be ignored and the existing result will be kept
waitingForFirstEvent = false;
final ChangeNotification<T> changeNotification =
newChangeNotification(current, records);
Expand All @@ -136,7 +135,9 @@ public void run() {
private void fireIfFirstError() {
if (waitingForFirstEvent) {
waitingForFirstEvent = false;
fireRecordsUpdated(newChangeNotification(current(), current()));
Set<T> previous = current();
records = ImmutableSet.of();
fireRecordsUpdated(newChangeNotification(records, previous));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand All @@ -40,14 +39,12 @@ public void testEmptySet() throws Exception {
verify(listener, never()).onChange(any(ChangeNotifier.ChangeNotification.class));

childNotifier.set(ImmutableSet.<String>of());

verify(listener, times(1)).onChange(any(ChangeNotifier.ChangeNotification.class));
verifyNoMoreInteractions(listener);

}

private static class MyNotifier extends AbstractChangeNotifier<String> {
private volatile Set<String> records = ImmutableSet.of();
private volatile Set<String> records = ChangeNotifiers.initialEmptyDataInstance();

@Override
protected void closeImplementation() {
Expand Down
40 changes: 39 additions & 1 deletion src/test/java/com/spotify/dns/DnsSrvResolversIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@

package com.spotify.dns;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.jayway.awaitility.Awaitility;
import com.spotify.dns.statistics.DnsReporter;
import com.spotify.dns.statistics.DnsTimingContext;
import org.junit.Before;
import org.junit.Test;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.isA;
Expand All @@ -46,7 +56,35 @@ public void setUp() throws Exception {

@Test
public void shouldReturnResultsForValidQuery() throws Exception {
assertThat(resolver.resolve("_spotify-client._tcp.sto.spotify.net").isEmpty(), is(false));
assertThat(resolver.resolve("_spotify-client._tcp.spotify.com").isEmpty(), is(false));
}

@Test
public void testCorrectSequenceOfNotifications() throws Exception {
ChangeNotifier<LookupResult> notifier = ChangeNotifiers.aggregate(
DnsSrvWatchers.newBuilder(resolver)
.polling(100, TimeUnit.MILLISECONDS)
.build().watch("_spotify-client._tcp.spotify.com"));

final List<String> changes = Collections.synchronizedList(Lists.<String>newArrayList());

notifier.setListener(new ChangeNotifier.Listener<LookupResult>() {
@Override
public void onChange(ChangeNotifier.ChangeNotification<LookupResult> changeNotification) {
Set<LookupResult> current = changeNotification.current();
if (!ChangeNotifiers.isInitialEmptyData(current)) {
changes.add(current.isEmpty() ? "empty" : "data");
}
}
}, true);
assertEquals(ImmutableList.of(), changes);
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return changes.size() >= 1;
}
});
assertEquals(ImmutableList.of("data"), changes);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.spotify.dns;

import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down

0 comments on commit 3d54084

Please sign in to comment.