From a3fd9a6e48031e70082fc9f1bd17814f8c8a1736 Mon Sep 17 00:00:00 2001 From: Dan Berindei Date: Wed, 31 Mar 2021 21:02:21 +0300 Subject: [PATCH] ISPN-4996 Update test to avoid sleep --- .../distribution/ZeroCapacityNodeTest.java | 51 +++++++++++++++---- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/core/src/test/java/org/infinispan/distribution/ZeroCapacityNodeTest.java b/core/src/test/java/org/infinispan/distribution/ZeroCapacityNodeTest.java index ec0775ae018c..b41ff7b85744 100644 --- a/core/src/test/java/org/infinispan/distribution/ZeroCapacityNodeTest.java +++ b/core/src/test/java/org/infinispan/distribution/ZeroCapacityNodeTest.java @@ -3,12 +3,19 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.infinispan.commons.test.Exceptions.expectException; import static org.infinispan.test.TestingUtil.extractCacheTopology; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; import java.util.Collections; +import java.util.Queue; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import org.infinispan.Cache; @@ -31,7 +38,10 @@ import org.infinispan.test.op.TestFunctionalWriteOperation; import org.infinispan.test.op.TestOperation; import org.infinispan.test.op.TestWriteOperation; +import org.infinispan.topology.CacheStatusResponse; +import org.infinispan.topology.ClusterTopologyManager; import org.infinispan.util.concurrent.TimeoutException; +import org.mockito.stubbing.Answer; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -156,26 +166,39 @@ private void createCache(ConfigurationBuilder cb, String cacheName) { } public void testZeroCapacityFactorNodeStartsFirst() throws Exception { + String cacheName = "testCache"; + Queue joinResponses = new LinkedBlockingQueue<>(); + + assertTrue(node1.isCoordinator()); + ClusterTopologyManager current = TestingUtil.extractGlobalComponent(node1, ClusterTopologyManager.class); + Answer delegateAnswer = invocation -> invocation.getMethod().invoke(current, invocation.getArguments()); + ClusterTopologyManager trackingCTM = mock(ClusterTopologyManager.class, delegateAnswer); + when(trackingCTM.handleJoin(eq(cacheName), any(), any(), anyInt())) + .thenAnswer(invocation -> { + return current.handleJoin(cacheName, invocation.getArgument(1), + invocation.getArgument(2), invocation.getArgument(3)) + .thenApply(r -> { + joinResponses.offer(r); + return r; + }); + }); + TestingUtil.replaceComponent(node1, ClusterTopologyManager.class, trackingCTM, true); ConfigurationBuilder cb = new ConfigurationBuilder(); - cb.clustering().cacheMode(CacheMode.DIST_SYNC); - cb.clustering().hash().numSegments(NUM_SEGMENTS); - cb.clustering().remoteTimeout(100); + cb.clustering().cacheMode(CacheMode.DIST_SYNC).remoteTimeout(100) + .hash().numSegments(NUM_SEGMENTS); ConfigurationBuilder cbZero = new ConfigurationBuilder(); - cbZero.clustering().cacheMode(CacheMode.DIST_SYNC); - cbZero.clustering().hash().numSegments(NUM_SEGMENTS); - cbZero.clustering().hash().capacityFactor(0f); - cbZero.clustering().remoteTimeout(100); + cbZero.clustering().cacheMode(CacheMode.DIST_SYNC).remoteTimeout(100) + .hash().numSegments(NUM_SEGMENTS).capacityFactor(0f); - String cacheName = "testCache"; Future> zeroCapacityNodeFuture = fork(() -> zeroCapacityNode.createCache(cacheName, cb.build())); Future> node1Future = fork(() -> node1.createCache(cacheName, cbZero.build())); - TestingUtil.sleepThread(10); assertFalse(zeroCapacityNodeFuture.isDone()); assertFalse(node1Future.isDone()); + assertEquals(0, joinResponses.size()); // Node2 is the only one that can create the initial topology node2.createCache(cacheName, cb.build()); @@ -183,6 +206,13 @@ public void testZeroCapacityFactorNodeStartsFirst() throws Exception { node1Future.get(10, SECONDS); zeroCapacityNodeFuture.get(10, SECONDS); + // 2 join responses: for node2 and zeroCapacityNode + assertEquals(3, joinResponses.size()); + while (!joinResponses.isEmpty()) { + CacheStatusResponse joinResponse = joinResponses.poll(); + assertTrue(joinResponse.getCacheTopology().getMembers().contains(node2.getAddress())); + } + waitForClusterToForm(cacheName); ConsistentHash ch3 = consistentHash(0, cacheName); assertEquals(0f, capacityFactor(ch3, zeroCapacityNode), 0.0); @@ -200,7 +230,8 @@ public void testZeroCapacityFactorNodeStartsFirst() throws Exception { } private ConsistentHash consistentHash(int managerIndex, String cacheName) { - return cache(managerIndex, cacheName).getAdvancedCache().getDistributionManager().getCacheTopology().getReadConsistentHash(); + return cache(managerIndex, cacheName).getAdvancedCache().getDistributionManager() + .getCacheTopology().getReadConsistentHash(); } private Float capacityFactor(ConsistentHash ch, EmbeddedCacheManager node) {