Skip to content

Commit

Permalink
ISPN-4996 Update test to avoid sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and ryanemerson committed Apr 1, 2021
1 parent 76e72e4 commit a3fd9a6
Showing 1 changed file with 41 additions and 10 deletions.
Expand Up @@ -3,12 +3,19 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.infinispan.commons.test.Exceptions.expectException;
import static org.infinispan.test.TestingUtil.extractCacheTopology;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import org.infinispan.Cache;
Expand All @@ -31,7 +38,10 @@
import org.infinispan.test.op.TestFunctionalWriteOperation;
import org.infinispan.test.op.TestOperation;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.CacheStatusResponse;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.concurrent.TimeoutException;
import org.mockito.stubbing.Answer;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -156,33 +166,53 @@ private void createCache(ConfigurationBuilder cb, String cacheName) {
}

public void testZeroCapacityFactorNodeStartsFirst() throws Exception {
String cacheName = "testCache";
Queue<CacheStatusResponse> joinResponses = new LinkedBlockingQueue<>();

assertTrue(node1.isCoordinator());
ClusterTopologyManager current = TestingUtil.extractGlobalComponent(node1, ClusterTopologyManager.class);
Answer<?> delegateAnswer = invocation -> invocation.getMethod().invoke(current, invocation.getArguments());
ClusterTopologyManager trackingCTM = mock(ClusterTopologyManager.class, delegateAnswer);
when(trackingCTM.handleJoin(eq(cacheName), any(), any(), anyInt()))
.thenAnswer(invocation -> {
return current.handleJoin(cacheName, invocation.getArgument(1),
invocation.getArgument(2), invocation.getArgument(3))
.thenApply(r -> {
joinResponses.offer(r);
return r;
});
});
TestingUtil.replaceComponent(node1, ClusterTopologyManager.class, trackingCTM, true);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.clustering().cacheMode(CacheMode.DIST_SYNC);
cb.clustering().hash().numSegments(NUM_SEGMENTS);
cb.clustering().remoteTimeout(100);
cb.clustering().cacheMode(CacheMode.DIST_SYNC).remoteTimeout(100)
.hash().numSegments(NUM_SEGMENTS);

ConfigurationBuilder cbZero = new ConfigurationBuilder();
cbZero.clustering().cacheMode(CacheMode.DIST_SYNC);
cbZero.clustering().hash().numSegments(NUM_SEGMENTS);
cbZero.clustering().hash().capacityFactor(0f);
cbZero.clustering().remoteTimeout(100);
cbZero.clustering().cacheMode(CacheMode.DIST_SYNC).remoteTimeout(100)
.hash().numSegments(NUM_SEGMENTS).capacityFactor(0f);

String cacheName = "testCache";
Future<Cache<Object, Object>> zeroCapacityNodeFuture =
fork(() -> zeroCapacityNode.createCache(cacheName, cb.build()));
Future<Cache<Object, Object>> node1Future =
fork(() -> node1.createCache(cacheName, cbZero.build()));

TestingUtil.sleepThread(10);
assertFalse(zeroCapacityNodeFuture.isDone());
assertFalse(node1Future.isDone());
assertEquals(0, joinResponses.size());

// Node2 is the only one that can create the initial topology
node2.createCache(cacheName, cb.build());

node1Future.get(10, SECONDS);
zeroCapacityNodeFuture.get(10, SECONDS);

// 2 join responses: for node2 and zeroCapacityNode
assertEquals(3, joinResponses.size());
while (!joinResponses.isEmpty()) {
CacheStatusResponse joinResponse = joinResponses.poll();
assertTrue(joinResponse.getCacheTopology().getMembers().contains(node2.getAddress()));
}

waitForClusterToForm(cacheName);
ConsistentHash ch3 = consistentHash(0, cacheName);
assertEquals(0f, capacityFactor(ch3, zeroCapacityNode), 0.0);
Expand All @@ -200,7 +230,8 @@ public void testZeroCapacityFactorNodeStartsFirst() throws Exception {
}

private ConsistentHash consistentHash(int managerIndex, String cacheName) {
return cache(managerIndex, cacheName).getAdvancedCache().getDistributionManager().getCacheTopology().getReadConsistentHash();
return cache(managerIndex, cacheName).getAdvancedCache().getDistributionManager()
.getCacheTopology().getReadConsistentHash();
}

private Float capacityFactor(ConsistentHash ch, EmbeddedCacheManager node) {
Expand Down

0 comments on commit a3fd9a6

Please sign in to comment.