Skip to content

Commit

Permalink
ISPN-1123 - Extend the TCPPING port_range to make the initial member …
Browse files Browse the repository at this point in the history
…discovery more reliable

The old port_range was 2, so if the first two ports were busy the coordinator would not be in the port range and a second node wouldn't find it.
I changed it to 30 to match the TCP port_range.

I also moved waitForRehashToComplete/waitForInitRehash to TestingUtil and I modified most tests to use MultipleCacheManagersTest.waitForClusterToComplete instead.
  • Loading branch information
Dan Berindei committed Jun 26, 2011
1 parent f996013 commit 55843e4
Show file tree
Hide file tree
Showing 29 changed files with 103 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

import java.util.ArrayList;
Expand All @@ -39,14 +39,4 @@ public class DistTopologyChangeTest extends ReplTopologyChangeTest {
protected Configuration.CacheMode getCacheMode() {
return Configuration.CacheMode.DIST_SYNC;
}

@Override
protected void waitForClusterToForm(int memberCount) {
super.waitForClusterToForm(memberCount);
List<Cache> caches = new ArrayList<Cache>();
for (int i = 0; i < memberCount; i++) {
caches.add(manager(i).getCache());
}
BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(caches);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ protected void createCacheManagers() throws Throwable {
hotRodServer1 = TestHelper.startHotRodServer(manager(0));
hotRodServer2 = TestHelper.startHotRodServer(manager(1));

manager(0).getCache();
manager(1).getCache();

waitForClusterToForm(2);
waitForClusterToForm();

manager(0).getCache().put("k_test", "v");
manager(0).getCache().get("k_test").equals("v");
Expand Down Expand Up @@ -127,7 +124,7 @@ public void testAddNewServer() {
hotRodServer3 = TestHelper.startHotRodServer(manager(2));
manager(2).getCache();

waitForClusterToForm(3);
waitForClusterToForm();

try {
expectTopologyChange(new InetSocketAddress("localhost", hotRodServer3.getPort()), true);
Expand Down Expand Up @@ -171,13 +168,6 @@ private void expectTopologyChange(InetSocketAddress server1Address, boolean adde
assertEquals(server1Address + " not found in " + addresses, added, addresses.contains(server1Address));
}

protected void waitForClusterToForm(int memberCount) {
TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 30000);
for (int i = 0; i < memberCount; i++) {
TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 30000);
}
}

protected void waitForServerToDie(int memberCount) {
TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 30000, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ protected void createCacheManagers() throws Throwable {
hotRodServer3 = TestHelper.startHotRodServer(manager(2));
hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);

manager(0).getCache();
manager(1).getCache();
manager(2).getCache();

waitForClusterToForm();

Properties clientConfig = new Properties();
Expand All @@ -104,11 +100,4 @@ protected void createCacheManagers() throws Throwable {
}

protected abstract Configuration getCacheConfig();

protected void waitForClusterToForm() {
TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransport;
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
import org.infinispan.config.Configuration;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.marshall.Marshaller;
import org.infinispan.marshall.jboss.JBossMarshaller;
import org.infinispan.remoting.transport.Address;
Expand Down Expand Up @@ -58,12 +57,6 @@ protected Configuration getCacheConfig() {
return config;
}

@Override
protected void waitForClusterToForm() {
super.waitForClusterToForm();
BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
}

public void testGet() throws Exception {
log.info("Starting actual test");
Object key = generateKeyAndShutdownServer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void assertKeyAffinityCorrectness(Collection<Address> addressList) {

protected void waitForClusterToResize() {
TestingUtil.blockUntilViewsReceived(10000, caches);
RehashWaiter.waitForRehashToComplete(new HashSet<Cache>(caches));
TestingUtil.waitForRehashToComplete(caches);
assertEquals(caches.size(), topology().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.infinispan.config.Configuration;
import static org.infinispan.context.Flag.SKIP_REMOTE_LOOKUP;
import org.infinispan.replication.AsyncAPISyncReplTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.data.Key;
import org.infinispan.util.Util;
import org.testng.annotations.Test;
Expand All @@ -47,7 +48,7 @@ protected void createCacheManagers() throws Throwable {
c2 = l.get(1);

// wait for any rehashing to complete
BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(c1, c2);
waitForClusterToForm();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,18 @@
import org.infinispan.distribution.ch.DefaultConsistentHash;
import org.infinispan.distribution.ch.UnionConsistentHash;
import org.infinispan.distribution.group.Grouper;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;

import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -115,9 +107,6 @@ protected void createCacheManagers() throws Throwable {
EmbeddedCacheManager cacheManager = cache.getCacheManager();
cacheAddresses.add(cacheManager.getAddress());
}

RehashWaiter.waitForInitRehashToComplete(caches.toArray(new Cache[INIT_CLUSTER_SIZE]));

}

protected static ConsistentHash createNewConsistentHash(Collection<Address> servers) {
Expand All @@ -132,68 +121,14 @@ protected static ConsistentHash createNewConsistentHash(Collection<Address> serv
}
}

/**
* This is a separate class because some tools try and run this method as a test
*/
public static class RehashWaiter {
private static final Log log = LogFactory.getLog(RehashWaiter.class);
public static void waitForInitRehashToComplete(Cache... caches) {
int gracetime = 60000; // 60 seconds?
long giveup = System.currentTimeMillis() + gracetime;
for (Cache c : caches) {
DistributionManagerImpl dmi = (DistributionManagerImpl) TestingUtil.extractComponent(c, DistributionManager.class);
while (!dmi.isJoinComplete()) {
if (System.currentTimeMillis() > giveup) {
String message = "Timed out waiting for initial join sequence to complete on node " + dmi.getRpcManager().getAddress() + " !";
log.error(message);
throw new RuntimeException(message);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}
log.trace("Node " + dmi.getRpcManager().getAddress() + " finished rehash task.");
}
}

public static void waitForRehashToComplete(Cache... caches) {
int gracetime = 120000; // 120 seconds?
long giveup = System.currentTimeMillis() + gracetime;
for (Cache c : caches) {
DistributionManagerImpl dmi = (DistributionManagerImpl) TestingUtil.extractComponent(c, DistributionManager.class);
while (dmi.isRehashInProgress()) {
if (System.currentTimeMillis() > giveup) {
String message = "Timed out waiting for rehash to complete on node " + dmi.getRpcManager().getAddress() + " !";
log.error(message);
throw new RuntimeException(message);
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
}
log.trace("Node " + dmi.getRpcManager().getAddress() + " finished rehash task.");
}
}

public static void waitForInitRehashToComplete(Collection<Cache> caches) {
Set<Cache> cachesSet = new HashSet<Cache>();
cachesSet.addAll(caches);
waitForInitRehashToComplete(cachesSet.toArray(new Cache[cachesSet.size()]));
}

public static void waitForRehashToComplete(Collection<Cache> caches) {
Set<Cache> cachesSet = new HashSet<Cache>();
cachesSet.addAll(caches);
waitForRehashToComplete(cachesSet.toArray(new Cache[cachesSet.size()]));
}

}

// only used if the CH impl does not order the hash ring based on the order of the view.
// in the case of the DefaultConsistentHash, the order is based on a has code of the addres modded by
// the hash space. So this will not adhere to the positions in the view, but it is deterministic.
// so this function orders things such that the test can predict where keys get mapped to.
private void reorderBasedOnCHPositions() {
// wait for all joiners to join
List<Cache> clist = new ArrayList<Cache>(caches);
assert clist.size() == INIT_CLUSTER_SIZE;
waitForJoinTasksToComplete(SECONDS.toMillis(480), clist.toArray(new Cache[clist.size()]));
assert caches.size() == INIT_CLUSTER_SIZE;
TestingUtil.waitForRehashToComplete(caches);

// seed this with an initial cache. Any one will do.
Cache seed = caches.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

import java.util.HashSet;
import java.util.Set;

import static java.lang.String.format;
import static org.infinispan.distribution.BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete;
import static org.infinispan.distribution.DistributionTestHelper.addressOf;
import static org.infinispan.distribution.DistributionTestHelper.isOwner;

Expand All @@ -59,10 +59,7 @@ protected void createCacheManagers() throws Throwable {
c.setCacheLoaderManagerConfig(new CacheLoaderManagerConfig(new DummyInMemoryCacheStore.Cfg("DistCacheStoreTxDisjointSetTest2")));
addClusterEnabledCacheManager(c, true);

waitForInitRehashToComplete(cache(0),
cache(1),
cache(2)
);
waitForClusterToForm();
}

public void testDisjointSetTransaction() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public boolean isSatisfied() throws Exception {

System.out.println("MultipleNodesLeavingTest.testMultipleLeaves");

BaseDistFunctionalTest.RehashWaiter.waitForRehashToComplete(cache(0));
TestingUtil.blockUntilViewsReceived(60000, cache(0));
TestingUtil.waitForRehashToComplete(cache(0));
Set<Address> caches = advancedCache(0).getDistributionManager().getConsistentHash().getCaches();
System.out.println("caches = " + caches);
int size = caches.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.infinispan.config.Configuration;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.IsolationLevel;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -70,8 +71,6 @@ protected void createCacheManagers() throws Throwable {
EmbeddedCacheManager cacheManager = cache.getCacheManager();
cacheAddresses.add(cacheManager.getAddress());
}

RehashWaiter.waitForInitRehashToComplete(c1, c2);
}

public void testPutOnKeyOwner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.infinispan.config.Configuration;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.testng.TestException;
import org.testng.annotations.AfterTest;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void testStartingUnknownCaches() throws Throwable {
assert "v".equals(c2.get("k"));
assert "v".equals(c2_new.get("k"));

BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(c2, c2_new);
TestingUtil.waitForRehashToComplete(c2, c2_new);

assert false : "Should have thrown an exception!";
} catch (CacheException expected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public synchronized Thread newThread(Runnable r) {
Thread.sleep(25000);

// lets make sure any rehashing work has completed
RehashTestBase.RehashWaiter.waitForRehashToComplete(cacheMap.values().toArray(new Cache[NUM_NODES - 1]));
TestingUtil.waitForRehashToComplete(cacheMap.values());
AbstractWheelConsistentHash hash = (AbstractWheelConsistentHash) cache(1).getAdvancedCache().getDistributionManager().getConsistentHash();

for (int i = 0; i < NUM_NODES; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.config.Configuration;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.interceptors.DistTxInterceptor;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.interceptors.base.CommandInterceptor;
Expand All @@ -51,10 +50,8 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,7 +142,7 @@ public Object call() throws Exception {

for (Thread t : threads) t.join();

BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(1));
TestingUtil.waitForInitRehashToComplete(cache(1));

for (int i = 0; i < 10; i++) {
Object key = "OLD" + i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testCachePartition() {
// wait till we see the view change
while (ai.get() < 2) TestingUtil.sleepThread(500);

BaseDistFunctionalTest.RehashWaiter.waitForRehashToComplete(c1, c2);
TestingUtil.waitForRehashToComplete(c1, c2);

c1.put("5", "value");
c2.put("6", "value");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
package org.infinispan.distribution.rehash;

import org.infinispan.Cache;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "distribution.rehash.RehashLeaveTestBase")
public abstract class RehashLeaveTestBase extends RehashTestBase {
void waitForRehashCompletion() {
RehashWaiter.waitForRehashToComplete(caches.toArray(new Cache[caches.size()]));
TestingUtil.waitForRehashToComplete(caches);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public int prepare(Xid id) {
//only check for these values if tx was not rolled back
if (!rollback.get()) {
//ownership can only be verified after the rehashing has completed
RehashWaiter.waitForRehashToComplete(new ArrayList<Cache>(caches));
TestingUtil.waitForRehashToComplete(caches);
// the ownership of k1 might change during the tx and a cache might end up with it in L1
assertOwnershipAndNonOwnership(keys.get(0), true);
assertOwnershipAndNonOwnership(keys.get(1), l1OnRehash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void testRehashes() throws CacheLoaderException {
primaryOwner.getCacheManager().stop();


RehashWaiter.waitForRehashToComplete(caches.toArray(new Cache[INIT_CLUSTER_SIZE - 1]));
TestingUtil.waitForRehashToComplete(caches);



Expand Down
Loading

0 comments on commit 55843e4

Please sign in to comment.