Skip to content

Commit

Permalink
ISPN-6239 Fix TEST_PING race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and Sebastian Laskawiec committed Mar 21, 2016
1 parent e34a4c9 commit e080bac
Showing 1 changed file with 63 additions and 55 deletions.
118 changes: 63 additions & 55 deletions core/src/test/java/org/infinispan/test/fwk/TEST_PING.java
Expand Up @@ -53,52 +53,73 @@ public class TEST_PING extends Discovery {

@Override
protected void findMembers(List<Address> addresses, boolean initialDiscovery, Responses pingDatas) {
if (!stopped) {
Map<Address, TEST_PING> discoveries = registerInDiscoveries();

// Only send message if DISCARD is not used, or if DISCARD is
// configured but it's not discarding messages.
boolean discardEnabled = isDiscardEnabled(this);
if (!discardEnabled) {
// Make sure that concurrent startups within a test won't mess up discovery
synchronized (discoveries) {
if (!discoveries.isEmpty()) {
boolean traceEnabled = log.isTraceEnabled();
for (TEST_PING discovery : discoveries.values()) {
// Avoid sending to self! Since there are single instances of
// discovery protocol in each node, just compare them by ref.
if (discovery != this) {
boolean remoteDiscardEnabled = isDiscardEnabled(discovery);
if (!remoteDiscardEnabled && !discovery.stopped) {
addPingRsp(pingDatas, discovery);
} else if (discovery.stopped) {
log.debug(String.format(
"%s is stopped, so no ping responses will be received",
discovery.getLocalAddr()));
} else {
if (traceEnabled)
log.trace("Skipping sending response cos DISCARD is on");
// If discard is enabled, add an empty response
addPingRsp(null, discovery);
}
} else {
if (traceEnabled)
log.trace("Skipping sending discovery to self");
}
}
doFindMembers(initialDiscovery, pingDatas);

pingDatas.done();
}

private void doFindMembers(boolean initialDiscovery, Responses pingDatas) {
boolean trace = log.isTraceEnabled();
if (stopped) {
log.debug("Discovery protocol already stopped, so don't look for members");
return;
}

// Only send message if DISCARD is not used, or if DISCARD is
// configured but it's not discarding messages.
boolean discardEnabled = isDiscardEnabled(this);
if (discardEnabled) {
log.debug("Not sending discovery because DISCARD is on");
return;
}

Map<Address, TEST_PING> discoveries = getTestDiscoveries();

// Make sure that concurrent startups within a test won't mess up discovery
synchronized (discoveries) {
if (initialDiscovery) {
boolean firstNode = discoveries.isEmpty();

TEST_PING prev = discoveries.putIfAbsent(local_addr, this);
if (prev == null) {
if (trace)
log.trace("Added discovery for %s. Registered discoveries: %s", local_addr, discoveries);
} else {
if (prev != this) {
throw new IllegalStateException(
"Trying to add two discoveries for the same address: " + local_addr);
}
}

if (firstNode) {
log.debug("No other nodes yet, marking this node as coord");
is_coord = true;
return;
}
}

for (TEST_PING discovery : discoveries.values()) {
// Avoid sending to self! Since there are single instances of
// discovery protocol in each node, just compare them by ref.
if (discovery != this) {
boolean remoteDiscardEnabled = isDiscardEnabled(discovery);
if (!remoteDiscardEnabled && !discovery.stopped) {
addPingRsp(pingDatas, discovery);
} else if (discovery.stopped) {
log.debug(String.format("%s is stopped, so no ping responses will be received",
discovery.getLocalAddr()));
} else {
log.debug("No other nodes yet, marking this node as coord");
is_coord = true;
if (trace)
log.trace("Ignoring discovery for %s because DISCARD is on", discovery.getLocalAddr());
// If discard is enabled, add an empty response
addPingRsp(null, discovery);
}
} else {
if (trace)
log.trace("Skipping sending discovery to self");
}
} else {
log.debug("Not sending discovery because DISCARD is on");
}
} else {
log.debug("Discovery protocol already stopped, so don't look for members");
}

pingDatas.done();
}

private boolean isDiscardEnabled(TEST_PING discovery) {
Expand Down Expand Up @@ -150,7 +171,7 @@ private void mapAddrWithPhysicalAddr(TEST_PING local, TEST_PING remote) {
remote.getLocalAddr(), physical_addr, local));
}

private Map<Address, TEST_PING> registerInDiscoveries() {
private Map<Address, TEST_PING> getTestDiscoveries() {
DiscoveryKey key = new DiscoveryKey(testName, cluster_name);
ConcurrentMap<Address, TEST_PING> discoveries = all.get(key);
if (discoveries == null) {
Expand All @@ -159,15 +180,6 @@ private Map<Address, TEST_PING> registerInDiscoveries() {
if (ret != null)
discoveries = ret;
}
boolean traceEnabled = log.isTraceEnabled();
if (traceEnabled)
log.trace(sf("Discoveries for %s are : %s", key, discoveries));

TEST_PING prev = discoveries.putIfAbsent(local_addr, this);
if (prev == null && traceEnabled)
log.trace(sf("Add discovery for %s to cache. The cache now contains: %s",
local_addr, discoveries));

return discoveries;
}

Expand Down Expand Up @@ -241,10 +253,6 @@ public String toString() {
return "TEST_PING@" + local_addr;
}

private static String sf(String format, Object ... args) {
return String.format(format, args);
}

static private class DiscoveryKey {
final String testName;
final String clusterName;
Expand Down

0 comments on commit e080bac

Please sign in to comment.