From 21617ac9b976377544a63a5fad21bb808a61ec7d Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Thu, 17 May 2018 17:10:03 +0200 Subject: [PATCH] ISPN-9128 RehashWithSharedStoreTest.testRehashes random failures --- .../impl/AvailabilityStrategy.java | 8 +- .../impl/PreferAvailabilityStrategy.java | 4 +- .../impl/PreferConsistencyStrategy.java | 6 +- .../impl/ScatteredStateConsumerImpl.java | 5 + .../transport/ControlledTransport.java | 79 +++++-- .../statetransfer/CoordinatorStopTest.java | 216 ++++++++++++++++++ .../java/org/infinispan/test/TestingUtil.java | 8 + .../test/TopologyChangeListener.java | 35 +++ .../infinispan/test/ViewChangeListener.java | 2 +- .../DelayedViewJGroupsTransport.java | 29 +++ .../topology/TestClusterCacheStatus.java | 2 +- .../tx/InfinispanNodeFailureTest.java | 24 +- .../util/BlockingLocalTopologyManager.java | 3 +- .../statetransfer/BaseStateTransferTest.java | 10 +- 14 files changed, 364 insertions(+), 67 deletions(-) create mode 100644 core/src/test/java/org/infinispan/scattered/statetransfer/CoordinatorStopTest.java create mode 100644 core/src/test/java/org/infinispan/test/TopologyChangeListener.java create mode 100644 core/src/test/java/org/infinispan/test/transport/DelayedViewJGroupsTransport.java diff --git a/core/src/main/java/org/infinispan/partitionhandling/impl/AvailabilityStrategy.java b/core/src/main/java/org/infinispan/partitionhandling/impl/AvailabilityStrategy.java index 2204a5be373a..640ceb0c8510 100644 --- a/core/src/main/java/org/infinispan/partitionhandling/impl/AvailabilityStrategy.java +++ b/core/src/main/java/org/infinispan/partitionhandling/impl/AvailabilityStrategy.java @@ -26,13 +26,17 @@ public interface AvailabilityStrategy { /** * Compute the read consistent hash for a topology with a {@code null} union consistent hash. + * Originally a copy of {@link CacheTopology#getReadConsistentHash()} but differs in case of scattered cache. */ - static ConsistentHash readConsistentHash(CacheTopology topology, ConsistentHashFactory chFactory) { + static ConsistentHash ownersConsistentHash(CacheTopology topology, ConsistentHashFactory chFactory) { switch (topology.getPhase()) { case NO_REBALANCE: return topology.getCurrentCH(); case TRANSITORY: - return topology.getPendingCH(); + // This is used to determine nodes that own the entries. In scattered cache (which uses transitory topology) + // the pendingCH is used for reading but the nodes in there are not guaranteed to have the data yet. + // CurrentCH should be safe - the nodes either have the data or the owner is unknown. + return topology.getCurrentCH(); case CONFLICT_RESOLUTION: case READ_OLD_WRITE_ALL: return topology.getCurrentCH(); diff --git a/core/src/main/java/org/infinispan/partitionhandling/impl/PreferAvailabilityStrategy.java b/core/src/main/java/org/infinispan/partitionhandling/impl/PreferAvailabilityStrategy.java index 27d740f3a13f..79e913a17009 100644 --- a/core/src/main/java/org/infinispan/partitionhandling/impl/PreferAvailabilityStrategy.java +++ b/core/src/main/java/org/infinispan/partitionhandling/impl/PreferAvailabilityStrategy.java @@ -1,6 +1,6 @@ package org.infinispan.partitionhandling.impl; -import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.readConsistentHash; +import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.ownersConsistentHash; import static org.infinispan.util.logging.events.Messages.MESSAGES; import java.util.ArrayList; @@ -262,7 +262,7 @@ private List computePartitions(Map stat // The node hasn't properly joined yet, so it can't be part of a partition continue; } - ConsistentHash readCH = readConsistentHash(topology, response.getCacheJoinInfo().getConsistentHashFactory()); + ConsistentHash readCH = ownersConsistentHash(topology, response.getCacheJoinInfo().getConsistentHashFactory()); Partition p = new Partition(sender, topology, response.getStableTopology(), readCH); partitions.add(p); } diff --git a/core/src/main/java/org/infinispan/partitionhandling/impl/PreferConsistencyStrategy.java b/core/src/main/java/org/infinispan/partitionhandling/impl/PreferConsistencyStrategy.java index 27fd42f2eb36..78fc23ef8c4f 100644 --- a/core/src/main/java/org/infinispan/partitionhandling/impl/PreferConsistencyStrategy.java +++ b/core/src/main/java/org/infinispan/partitionhandling/impl/PreferConsistencyStrategy.java @@ -1,6 +1,6 @@ package org.infinispan.partitionhandling.impl; -import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.readConsistentHash; +import static org.infinispan.partitionhandling.impl.AvailabilityStrategy.ownersConsistentHash; import static org.infinispan.util.logging.events.Messages.MESSAGES; import java.util.ArrayList; @@ -208,13 +208,13 @@ public void onPartitionMerge(AvailabilityStrategyContext context, Map keys, Response response) for (int i = 0; i < keys.size(); ++i) { Object key = keys.get(i); InternalCacheValue icv = values[i]; + if (icv == null) { + // The entry got lost in the meantime - this can happen when the container is cleared concurrently to processing + // the GetAllCommand. We'll just avoid NPEs here: data is lost as > 1 nodes have left. + continue; + } PutKeyValueCommand put = commandsFactory.buildPutKeyValueCommand(key, icv.getValue(), keyPartitioner.getSegment(key), icv.getMetadata(), STATE_TRANSFER_FLAGS); try { diff --git a/core/src/test/java/org/infinispan/remoting/transport/ControlledTransport.java b/core/src/test/java/org/infinispan/remoting/transport/ControlledTransport.java index 5253a080bb36..609e1012ae37 100644 --- a/core/src/test/java/org/infinispan/remoting/transport/ControlledTransport.java +++ b/core/src/test/java/org/infinispan/remoting/transport/ControlledTransport.java @@ -1,16 +1,20 @@ package org.infinispan.remoting.transport; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; +import static org.infinispan.test.TestingUtil.wrapGlobalComponent; +import static org.testng.AssertJUnit.assertTrue; + import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.infinispan.Cache; import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.remote.SingleRpcCommand; import org.infinispan.remoting.responses.Response; import org.infinispan.util.concurrent.ReclosableLatch; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; import org.infinispan.xsite.XSiteReplicateCommand; /** @@ -21,55 +25,73 @@ * @since 7.0 */ public class ControlledTransport extends AbstractDelegatingTransport { + private static final Log log = LogFactory.getLog(ControlledTransport.class); + private static final Predicate NEVER = cmd -> false; private final ReclosableLatch replicationLatch = new ReclosableLatch(true); private final ReclosableLatch blockingLatch = new ReclosableLatch(true); - private volatile Set blockBeforeFilter = Collections.emptySet(); - private volatile Set blockAfterFilter = Collections.emptySet(); - private volatile Set failFilter = Collections.emptySet(); + private volatile Predicate blockBeforeFilter = NEVER; + private volatile Predicate blockAfterFilter = NEVER; + private volatile Predicate failFilter = NEVER; - public ControlledTransport(Transport realOne) { + private ControlledTransport(Transport realOne) { super(realOne); } + public static ControlledTransport replace(Cache cache) { + return wrapGlobalComponent(cache.getCacheManager(), Transport.class, ControlledTransport::new, true); + } + @Override public void start() { //skip start it again. } public void failFor(Class... filter) { - this.failFilter = new HashSet<>(Arrays.asList(filter)); + failFor(classListToFilter(filter)); + } + + private void failFor(Predicate filter) { + this.failFilter = filter; blockingLatch.open(); } public void stopFailing() { - this.failFilter = Collections.emptySet(); + this.failFilter = NEVER; blockingLatch.open(); } public void blockBefore(Class... filter) { - this.blockBeforeFilter = new HashSet<>(Arrays.asList(filter)); + blockBefore(classListToFilter(filter)); + } + + public void blockBefore(Predicate filter) { + this.blockBeforeFilter = filter; replicationLatch.close(); blockingLatch.close(); } public void blockAfter(Class... filter) { - this.blockAfterFilter = new HashSet<>(Arrays.asList(filter)); + blockAfter(classListToFilter(filter)); + } + + public void blockAfter(Predicate filter) { + this.blockAfterFilter = filter; replicationLatch.close(); blockingLatch.close(); } public void stopBlocking() { - getLog().tracef("Stop blocking commands"); - blockBeforeFilter = Collections.emptySet(); - blockAfterFilter = Collections.emptySet(); + log.tracef("Stop blocking commands"); + blockBeforeFilter = NEVER; + blockAfterFilter = NEVER; replicationLatch.open(); blockingLatch.open(); } public void waitForCommandToBlock() throws InterruptedException { - getLog().tracef("Waiting for at least one command to block"); - blockingLatch.await(30, TimeUnit.SECONDS); + log.tracef("Waiting for at least one command to block"); + assertTrue(blockingLatch.await(30, TimeUnit.SECONDS)); } public boolean waitForCommandToBlock(long time, TimeUnit unit) throws InterruptedException { @@ -77,7 +99,7 @@ public boolean waitForCommandToBlock(long time, TimeUnit unit) throws Interrupte } public void failIfNeeded(ReplicableCommand rpcCommand) { - if (failFilter.contains(getActualClass(rpcCommand))) { + if (failFilter.test(rpcCommand)) { throw new IllegalStateException("Induced failure!"); } } @@ -90,21 +112,21 @@ protected void waitAfter(ReplicableCommand rpcCommand) { waitForReplicationLatch(rpcCommand, blockAfterFilter); } - protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Set filter) { - Class cmdClass = getActualClass(rpcCommand); - if (!filter.contains(cmdClass)) { + protected void waitForReplicationLatch(ReplicableCommand rpcCommand, Predicate filter) { + if (!filter.test(rpcCommand)) { + log.tracef("Not blocking command %s", rpcCommand); return; } try { if (!blockingLatch.isOpened()) { - getLog().debugf("Replication trigger called, releasing any waiters for command to block."); + log.debugf("Replication trigger called, releasing any waiters for command to block."); blockingLatch.open(); } - getLog().debugf("Replication trigger called, waiting for latch to open."); - replicationLatch.await(30, TimeUnit.SECONDS); - getLog().trace("Replication latch opened, continuing."); + log.debugf("Replication trigger called, waiting for latch to open."); + assertTrue(replicationLatch.await(30, TimeUnit.SECONDS)); + log.trace("Replication latch opened, continuing."); } catch (Exception e) { throw new RuntimeException("Unexpected exception!", e); } @@ -134,6 +156,13 @@ protected BackupResponse afterBackupRemotely(ReplicableCommand command, BackupRe return response; } + private Predicate classListToFilter(Class[] filter) { + return cmd -> { + Class actualClass = getActualClass(cmd); + return Stream.of(filter).anyMatch(clazz -> clazz.isAssignableFrom(actualClass)); + }; + } + private Class getActualClass(ReplicableCommand rpcCommand) { Class cmdClass = rpcCommand.getClass(); if (cmdClass.equals(SingleRpcCommand.class)) { diff --git a/core/src/test/java/org/infinispan/scattered/statetransfer/CoordinatorStopTest.java b/core/src/test/java/org/infinispan/scattered/statetransfer/CoordinatorStopTest.java new file mode 100644 index 000000000000..efe097e77c1b --- /dev/null +++ b/core/src/test/java/org/infinispan/scattered/statetransfer/CoordinatorStopTest.java @@ -0,0 +1,216 @@ +package org.infinispan.scattered.statetransfer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.infinispan.commands.read.GetKeyValueCommand; +import org.infinispan.commands.remote.ClusteredGetCommand; +import org.infinispan.configuration.cache.BiasAcquisition; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.distribution.BlockingInterceptor; +import org.infinispan.distribution.MagicKey; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.remoting.transport.Address; +import org.infinispan.remoting.transport.ControlledTransport; +import org.infinispan.statetransfer.StateRequestCommand; +import org.infinispan.statetransfer.StateResponseCommand; +import org.infinispan.statetransfer.StateTransferInterceptor; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TopologyChangeListener; +import org.infinispan.test.fwk.CleanupAfterMethod; +import org.infinispan.test.transport.DelayedViewJGroupsTransport; +import org.infinispan.topology.CacheTopology; +import org.infinispan.topology.CacheTopologyControlCommand; +import org.infinispan.util.BlockingLocalTopologyManager; +import org.infinispan.util.BlockingLocalTopologyManager.BlockedTopology; +import org.infinispan.util.ControlledConsistentHashFactory; +import org.infinispan.util.ControlledRpcManager; +import org.testng.annotations.Test; + +@Test(groups = "functional", testName = "scattered.statetransfer.CoordinatorStopTest") +@CleanupAfterMethod +public class CoordinatorStopTest extends MultipleCacheManagersTest { + private CountDownLatch viewLatch; + private ControlledConsistentHashFactory.Scattered chf; + + @Override + public Object[] factory() { + return new Object[] { + new CoordinatorStopTest().biasAcquisition(BiasAcquisition.NEVER), + new CoordinatorStopTest().biasAcquisition(BiasAcquisition.ON_WRITE) + }; + } + + @Override + protected void createCacheManagers() throws Throwable { + ConfigurationBuilder cb = new ConfigurationBuilder(); + chf = new ControlledConsistentHashFactory.Scattered(new int[] {0, 1, 2}); + cb.clustering().cacheMode(CacheMode.SCATTERED_SYNC).hash().numSegments(3).consistentHashFactory(chf); + if (biasAcquisition != null) { + cb.clustering().biasAcquisition(biasAcquisition); + } + + addClusterEnabledCacheManager(cb); + // If the updated topologies from old coord come when it's no longer in the view these are ignored. + // Therefore we have to delay the view. + viewLatch = new CountDownLatch(1); + GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder(); + gcb.transport().transport(new DelayedViewJGroupsTransport(viewLatch)); + addClusterEnabledCacheManager(gcb, cb); + // we need distinct transport instances on manager(1) and (2) + gcb.transport().transport(new DelayedViewJGroupsTransport(viewLatch)); + addClusterEnabledCacheManager(gcb, cb); + + assertTrue(cache(0).getCacheManager().isCoordinator()); + // start other caches + cache(1); + cache(2); + waitForClusterToForm(); + } + + // Reproducer for ISPN-9128 + public void testCoordinatorLeaves() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException { + String cacheName = cache(1).getName(); + MagicKey key = new MagicKey(cache(0)); + cache(1).put(key, "value"); + + int stableTopologyId = cache(1).getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId(); + + BlockingLocalTopologyManager ltm2 = BlockingLocalTopologyManager.replaceTopologyManager(manager(2), cacheName); + ControlledTransport transport0 = ControlledTransport.replace(cache(0)); + ControlledTransport transport1 = ControlledTransport.replace(cache(1)); + // Block sending REBALANCE_START until the CH_UPDATE is delivered to make the test deterministic + transport0.blockBefore(cmd -> { + if (cmd instanceof CacheTopologyControlCommand) { + CacheTopologyControlCommand command = (CacheTopologyControlCommand) cmd; + if (command.getCacheName().equals(cacheName) && + command.getTopologyId() == stableTopologyId + 2 && + command.getType() == CacheTopologyControlCommand.Type.REBALANCE_START) { + return true; + } + } + return false; + }); + // Also block rebalance initiated by the new coord until we test with topology + 3 + transport1.blockBefore(cmd -> { + if (cmd instanceof CacheTopologyControlCommand) { + CacheTopologyControlCommand command = (CacheTopologyControlCommand) cmd; + if (command.getCacheName().equals(cacheName) && + command.getTopologyId() == stableTopologyId + 4 && + command.getType() == CacheTopologyControlCommand.Type.REBALANCE_START) { + return true; + } + } + return false; + }); + + ControlledRpcManager rpcManager2 = ControlledRpcManager.replaceRpcManager(cache(2)); + // Ignore push transfer of segment 2 + // Ignore the remote get which does not happen without the fix + rpcManager2.excludeCommands(StateResponseCommand.class, ClusteredGetCommand.class); + + // segment 0 will be moved to cache(2). Since we've lost coord cache(1) now -> 0 and cache(2) -> 1 + chf.setOwnerIndexes(new int[][] { {1}, {0}, {1} }); + + log.infof("Stopping coordinator %s, last stable topology is %d", manager(0), stableTopologyId); + Future stopFuture = fork(() -> manager(0).stop()); + + // topology + 1 is the one that just omits the leaving node + BlockedTopology t1 = ltm2.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, stableTopologyId + 1); + if (t1.getCacheTopology().getTopologyId() == stableTopologyId + 1) + assertEquals(CacheTopology.Phase.NO_REBALANCE, t1.getPhase()); + assertEquals(2, t1.getCacheTopology().getActualMembers().size()); + assertEquals(null, t1.getCacheTopology().getPendingCH()); + assertOwners(t1, true, 0); + assertOwners(t1, true, 1, address(1)); + assertOwners(t1, true, 2, address(2)); + t1.unblock(); + + transport0.stopBlocking(); + stopFuture.get(10, TimeUnit.SECONDS); + // It is not guaranteed that all members got new view when stop() finishes - when the coord is leaving + // the members ack the view before installing it. + // We are delaying view 3 until topology + 2 is installed on cache(1) - therefore at this point manager(1) + // is not the coordinator yet, and we have 3 members in view + + // topology + 2 has TRANSITORY phase and all segments have an owner in pendingCH + BlockedTopology t2 = ltm2.expectTopologyUpdate(CacheTopology.Phase.TRANSITORY, stableTopologyId + 2); + assertEquals(CacheTopology.Phase.TRANSITORY, t2.getPhase()); + assertEquals(2, t2.getCacheTopology().getActualMembers().size()); + assertNotNull(t2.getCacheTopology().getPendingCH()); + assertOwners(t2, false, 0, address(2)); + assertOwners(t2, false, 1, address(1)); + assertOwners(t2, false, 2, address(2)); + t2.unblock(); + + // Let the rebalance begin + rpcManager2.expectCommand(StateRequestCommand.class, + request -> assertEquals(StateRequestCommand.Type.CONFIRM_REVOKED_SEGMENTS, request.getType())) + .send().receiveAll(); + // Allow both nodes to receive the view. If we did not block (1), too, topology + 2 could be ignored + // on cache(1) and the CONFIRM_REVOKED_SEGMENTS would get blocked until topology + 3 arrives - and this + // does not happen before the test times out. + viewLatch.countDown(); + + ControlledRpcManager.BlockedRequest keyTransferRequest = rpcManager2.expectCommand(StateRequestCommand.class, + request -> assertEquals(StateRequestCommand.Type.START_KEYS_TRANSFER, request.getType())); + + // topology + 3 should have null pendingCH + // Before the fix the topology would recover transitory topologies from above and base current CH on them + BlockedTopology t3 = ltm2.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, stableTopologyId + 3); + assertEquals(2, t3.getCacheTopology().getActualMembers().size()); + assertEquals(null, t3.getCacheTopology().getPendingCH()); + + TopologyChangeListener topologyChangeListener = TopologyChangeListener.install(cache(2)); + ltm2.stopBlocking(); + t3.unblock(); + + // Cancel command is sent only with the fix in + if (t3.getCacheTopology().getCurrentCH().locatePrimaryOwnerForSegment(0) == null) { + ControlledRpcManager.BlockedRequest cancelStateTransfer = rpcManager2.expectCommand(StateRequestCommand.class, + request -> assertEquals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER, request.getType())); + cancelStateTransfer.sendWithoutResponses(); + } + + // Wait until topology + 3 is installed + topologyChangeListener.await(10, TimeUnit.SECONDS); + + // unblock outdated keys transfer + keyTransferRequest.send().receiveAll(); + + CyclicBarrier oteBarrier = new CyclicBarrier(2); + BlockingInterceptor oteInterceptor = new BlockingInterceptor(oteBarrier, GetKeyValueCommand.class, true, true); + cache(2).getAdvancedCache().getAsyncInterceptorChain().addInterceptorAfter(oteInterceptor, StateTransferInterceptor.class); + + // The get is supposed to retry as the primary owner is null in topology + 3 + Future future = fork(() -> cache(2).get(key)); + + // This barrier will wait until the command returns, in any way. Without the fix it should just return null, + // with the fix it should throw OTE and we'll be waiting for the next topology - that's why we have to unblock it. + oteBarrier.await(10, TimeUnit.SECONDS); + oteInterceptor.suspend(true); + + rpcManager2.stopBlocking(); + transport1.stopBlocking(); + oteBarrier.await(10, TimeUnit.SECONDS); + + assertEquals("value", future.get()); + } + + private void assertOwners(BlockedTopology t, boolean current, int segmentId, Address... address) { + ConsistentHash ch = current ? t.getCacheTopology().getCurrentCH() : t.getCacheTopology().getPendingCH(); + assertEquals("Topology: " + t.getCacheTopology(), Arrays.asList(address), ch.locateOwnersForSegment(segmentId)); + } +} diff --git a/core/src/test/java/org/infinispan/test/TestingUtil.java b/core/src/test/java/org/infinispan/test/TestingUtil.java index 4eac58189eed..53494bba90ca 100644 --- a/core/src/test/java/org/infinispan/test/TestingUtil.java +++ b/core/src/test/java/org/infinispan/test/TestingUtil.java @@ -1776,6 +1776,14 @@ public static W wrapGlobalComponent(CacheContainer cacheContain return wrap; } + public static W wrapGlobalComponent(CacheContainer cacheContainer, Class tClass, + Function ctor, boolean rewire) { + T current = extractGlobalComponent(cacheContainer, tClass); + W wrap = ctor.apply(current); + replaceComponent(cacheContainer, tClass, wrap, rewire); + return wrap; + } + public static W wrapComponent(Cache cache, Class tClass, WrapFactory> factory, boolean rewire) { T current = extractComponent(cache, tClass); diff --git a/core/src/test/java/org/infinispan/test/TopologyChangeListener.java b/core/src/test/java/org/infinispan/test/TopologyChangeListener.java new file mode 100644 index 000000000000..24325bfb74d1 --- /dev/null +++ b/core/src/test/java/org/infinispan/test/TopologyChangeListener.java @@ -0,0 +1,35 @@ +package org.infinispan.test; + +import static org.testng.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.infinispan.Cache; +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.TopologyChanged; +import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent; + +@Listener(observation = Listener.Observation.POST) +public class TopologyChangeListener { + private final CountDownLatch latch = new CountDownLatch(1); + + public static TopologyChangeListener install(Cache cache) { + TopologyChangeListener listener = new TopologyChangeListener(); + cache.addListener(listener); + return listener; + } + + @TopologyChanged + public void onTopologyChange(TopologyChangedEvent event) { + latch.countDown(); + } + + public void await() throws InterruptedException { + await(10, TimeUnit.SECONDS); + } + + public void await(long time, TimeUnit unit) throws InterruptedException { + assertTrue(latch.await(time, unit), "View change not seen after " + time + " " + unit); + } +} diff --git a/core/src/test/java/org/infinispan/test/ViewChangeListener.java b/core/src/test/java/org/infinispan/test/ViewChangeListener.java index 4165d710e89d..a34804cfbfcd 100644 --- a/core/src/test/java/org/infinispan/test/ViewChangeListener.java +++ b/core/src/test/java/org/infinispan/test/ViewChangeListener.java @@ -14,7 +14,7 @@ * Listens for view changes. Note that you do NOT have to register this listener; it does so automatically when * constructed. */ -@Listener +@Listener(observation = Listener.Observation.POST) public class ViewChangeListener { CacheContainer cm; final CountDownLatch latch = new CountDownLatch(1); diff --git a/core/src/test/java/org/infinispan/test/transport/DelayedViewJGroupsTransport.java b/core/src/test/java/org/infinispan/test/transport/DelayedViewJGroupsTransport.java new file mode 100644 index 000000000000..3ebccf0db25c --- /dev/null +++ b/core/src/test/java/org/infinispan/test/transport/DelayedViewJGroupsTransport.java @@ -0,0 +1,29 @@ +package org.infinispan.test.transport; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.infinispan.remoting.transport.jgroups.JGroupsTransport; +import org.jgroups.View; + +public final class DelayedViewJGroupsTransport extends JGroupsTransport { + + private final CountDownLatch waitLatch; + + public DelayedViewJGroupsTransport(CountDownLatch waitLatch) { + this.waitLatch = waitLatch; + } + + @Override + public void receiveClusterView(View newView) { + // check if this is an event of node going down, and if so wait for a signal to apply new view + if (waitLatch != null && getMembers().size() > newView.getMembers().size()) { + try { + waitLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + super.receiveClusterView(newView); + } +} diff --git a/core/src/test/java/org/infinispan/topology/TestClusterCacheStatus.java b/core/src/test/java/org/infinispan/topology/TestClusterCacheStatus.java index d0cbe61956ac..a50e22e49a04 100644 --- a/core/src/test/java/org/infinispan/topology/TestClusterCacheStatus.java +++ b/core/src/test/java/org/infinispan/topology/TestClusterCacheStatus.java @@ -98,7 +98,7 @@ public void cancelRebalance() { * {@link CacheTopology#getReadConsistentHash()} doesn't work. */ public ConsistentHash readConsistentHash() { - return AvailabilityStrategy.readConsistentHash(topology, joinInfo.getConsistentHashFactory()); + return AvailabilityStrategy.ownersConsistentHash(topology, joinInfo.getConsistentHashFactory()); } public void updateStableTopology() { diff --git a/core/src/test/java/org/infinispan/tx/InfinispanNodeFailureTest.java b/core/src/test/java/org/infinispan/tx/InfinispanNodeFailureTest.java index fdca88ee9711..4211c6fa2512 100644 --- a/core/src/test/java/org/infinispan/tx/InfinispanNodeFailureTest.java +++ b/core/src/test/java/org/infinispan/tx/InfinispanNodeFailureTest.java @@ -15,12 +15,11 @@ import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.distribution.MagicKey; import org.infinispan.interceptors.BaseCustomAsyncInterceptor; -import org.infinispan.remoting.transport.jgroups.JGroupsTransport; import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.transport.DelayedViewJGroupsTransport; import org.infinispan.transaction.LockingMode; import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup; import org.infinispan.util.concurrent.IsolationLevel; -import org.jgroups.View; import org.testng.annotations.Test; /** @@ -162,25 +161,4 @@ protected void createCacheManagers() throws Throwable { addClusterEnabledCacheManager(configuration); } - private static final class DelayedViewJGroupsTransport extends JGroupsTransport { - - private final CountDownLatch waitLatch; - - DelayedViewJGroupsTransport(CountDownLatch waitLatch) { - this.waitLatch = waitLatch; - } - - @Override - public void receiveClusterView(View newView) { - // check if this is an event of node going down, and if so wait for a signal to apply new view - if (waitLatch != null && getMembers().size() > newView.getMembers().size()) { - try { - waitLatch.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - super.receiveClusterView(newView); - } - } } diff --git a/core/src/test/java/org/infinispan/util/BlockingLocalTopologyManager.java b/core/src/test/java/org/infinispan/util/BlockingLocalTopologyManager.java index d62ba617dcee..a25ee30f938c 100644 --- a/core/src/test/java/org/infinispan/util/BlockingLocalTopologyManager.java +++ b/core/src/test/java/org/infinispan/util/BlockingLocalTopologyManager.java @@ -177,6 +177,7 @@ public void stopBlocking() { " blocked updates in the queue: " + queuedTopologies); } enabled = false; + log.debugf("Stopped blocking topology updates"); } @Override @@ -194,7 +195,7 @@ protected final void beforeHandleTopologyUpdate(String cacheName, CacheTopology @Override protected final void beforeHandleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) { - if (!expectedCacheName.equals(cacheName)) + if (!enabled || !expectedCacheName.equals(cacheName)) return; Event event = new Event(cacheTopology, cacheTopology.getTopologyId(), viewId, diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java index e7e2aa00314a..d0bf778733f7 100644 --- a/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/BaseStateTransferTest.java @@ -39,7 +39,6 @@ import org.infinispan.manager.CacheContainer; import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.ControlledTransport; -import org.infinispan.remoting.transport.Transport; import org.infinispan.test.ExceptionRunnable; import org.infinispan.test.fwk.CheckPoint; import org.infinispan.xsite.BackupReceiver; @@ -100,7 +99,7 @@ public void testCancelStateTransfer(Method method) throws InterruptedException { //check if NYC is empty assertInSite(NYC, cache -> assertTrue(cache.isEmpty())); - ControlledTransport controllerTransport = replaceTransport(cache(LON, 0)); + ControlledTransport controllerTransport = ControlledTransport.replace(cache(LON, 0)); controllerTransport.blockBefore(XSiteStatePushCommand.class); startStateTransfer(); @@ -484,13 +483,6 @@ private boolean containsKey(XSiteState[] states, Object key) { return false; } - private ControlledTransport replaceTransport(Cache cache) { - Transport current = extractGlobalComponent(cache.getCacheManager(), Transport.class); - ControlledTransport controlled = new ControlledTransport(current); - replaceComponent(cache.getCacheManager(), Transport.class, controlled, true); - return controlled; - } - private enum Operation { PUT("v0", "v1") { @Override