Skip to content

Commit

Permalink
ISPN-10080 Test cache managers created in other threads cannot with UDP
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei committed May 24, 2019
1 parent 93ae5c2 commit eb71d4b
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 19 deletions.
@@ -0,0 +1,46 @@
package org.infinispan.statetransfer;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.Future;

import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestResourceTracker;
import org.testng.annotations.Test;

/**
* Test that a node started in a different thread can join the cluster.
*
* @author Dan Berindei
* @since 10.0
*/
@Test(testName = "statetransfer.JoinInNewThreadTest", groups = "functional")
@CleanupAfterMethod
public class JoinInNewThreadTest extends MultipleCacheManagersTest {
@Override
protected void createCacheManagers() throws Throwable {
// Do nothing here
}

public void testJoinInNewThread() throws Exception {
TestResourceTracker.setThreadTestName(JoinInNewThreadTest.class.getName());

ConfigurationBuilder replCfg = new ConfigurationBuilder();
replCfg.clustering().cacheMode(CacheMode.REPL_SYNC).stateTransfer().timeout(30, SECONDS);

// Connect 2 channels
addClusterEnabledCacheManager(replCfg);
addClusterEnabledCacheManager(replCfg);
waitForClusterToForm();

Future<Void> future = fork(() -> {
TestResourceTracker.testThreadStarted(this);
addClusterEnabledCacheManager(replCfg);
waitForClusterToForm();
});
future.get(30, SECONDS);
}
}
Expand Up @@ -17,6 +17,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.infinispan.commons.util.LegacyKeySupportSystemProperties;
Expand Down Expand Up @@ -52,27 +54,17 @@ protected Integer initialValue() {
/**
* Holds unique mcast_addr for each thread used for JGroups channel construction.
*/
private static final ThreadLocal<String> threadMcastIP = new ThreadLocal<String>() {
private final AtomicInteger uniqueAddr = new AtomicInteger(11);

@Override
protected String initialValue() {
return "228.10.10." + uniqueAddr.getAndIncrement();
}
};

/**
* Holds unique mcast_port for each thread used for JGroups channel construction.
*/
private static final ThreadLocal<Integer> threadMcastPort = new ThreadLocal<Integer>() {
private final AtomicInteger uniquePort = new AtomicInteger(45589);
private static final ThreadLocal<Integer> threadUdpIndex = new ThreadLocal<Integer>() {
private final AtomicInteger counter = new AtomicInteger(0);

@Override
protected Integer initialValue() {
return uniquePort.getAndIncrement();
return counter.getAndIncrement();
}
};

private static final ConcurrentMap<String, Integer> testUdpIndex = new ConcurrentHashMap<>();

static {
JGROUPS_STACK = LegacyKeySupportSystemProperties.getProperty("infinispan.test.jgroups.protocol", "protocol.stack", "udp");
System.out.println("Transport protocol stack used = " + JGROUPS_STACK);
Expand Down Expand Up @@ -100,7 +92,7 @@ public static String getJGroupsConfig(String fullTestName, TransportFlags flags)

configureTestPing(fullTestName, jgroupsCfg);
replaceTcpStartPort(jgroupsCfg, flags);
replaceMCastAddressAndPort(jgroupsCfg);
replaceMCastAddressAndPort(jgroupsCfg, fullTestName);
return jgroupsCfg.toString();
}

Expand Down Expand Up @@ -141,13 +133,15 @@ private static void configureTestPing(String fullTestName, JGroupsProtocolCfg jg
replaceProperties(jgroupsCfg, props, TEST_PING);
}

private static void replaceMCastAddressAndPort(JGroupsProtocolCfg jgroupsCfg) {
private static void replaceMCastAddressAndPort(JGroupsProtocolCfg jgroupsCfg, String fullTestName) {
ProtocolConfiguration udp = jgroupsCfg.getProtocol(UDP);
if (udp == null) return;

Integer udpIndex = testUdpIndex.computeIfAbsent(fullTestName, k -> threadUdpIndex.get());

Map<String, String> props = udp.getProperties();
props.put("mcast_addr", threadMcastIP.get());
props.put("mcast_port", threadMcastPort.get().toString());
props.put("mcast_addr", "228.10.10." + udpIndex);
props.put("mcast_port", String.valueOf(46000 + udpIndex));
replaceProperties(jgroupsCfg, props, UDP);
}

Expand Down

0 comments on commit eb71d4b

Please sign in to comment.