From 216fc58ee7ae210f7d14569277872e9e297956f8 Mon Sep 17 00:00:00 2001 From: Pedro Ruivo Date: Fri, 8 Nov 2013 17:55:26 +0000 Subject: [PATCH] ISPN-3315 Retry remote get after topology change if all the targets are no longer owners * Added test case to cover all possible inter-leaving scenarios between a remote get and state transfer * changed the remote get algorithm to retry when the topology id changes --- .../BaseDistributionInterceptor.java | 75 ++- .../RemoteGetDuringStateTransferTest.java | 510 ++++++++++++++++++ .../TxReadAfterLosingOwnershipTest.java | 43 +- ...bstractControlledLocalTopologyManager.java | 75 +++ .../SingleSegmentConsistentHashFactory.java | 63 +++ 5 files changed, 712 insertions(+), 54 deletions(-) create mode 100644 core/src/test/java/org/infinispan/statetransfer/RemoteGetDuringStateTransferTest.java create mode 100644 core/src/test/java/org/infinispan/util/AbstractControlledLocalTopologyManager.java create mode 100644 core/src/test/java/org/infinispan/util/SingleSegmentConsistentHashFactory.java diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java index a67c1809416d..bf70fcb0a1bc 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -23,8 +23,10 @@ import org.infinispan.remoting.rpc.ResponseFilter; import org.infinispan.remoting.rpc.ResponseMode; import org.infinispan.remoting.rpc.RpcOptions; +import org.infinispan.remoting.rpc.RpcOptionsBuilder; import org.infinispan.remoting.transport.Address; import org.infinispan.statetransfer.OutdatedTopologyException; +import org.infinispan.topology.CacheTopology; import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -53,6 +55,7 @@ public abstract class BaseDistributionInterceptor extends ClusteringInterceptor protected RemoteValueRetrievedListener rvrl; private static final Log log = LogFactory.getLog(BaseDistributionInterceptor.class); + private static final boolean trace = log.isTraceEnabled(); @Override protected Log getLog() { @@ -73,24 +76,70 @@ protected final InternalCacheEntry retrieveFromRemoteSource(Object key, Invocati ClusteredGetCommand get = cf.buildClusteredGetCommand(key, command.getFlags(), acquireRemoteLock, gtx); get.setWrite(isWrite); - List
targets = new ArrayList
(stateTransferManager.getCacheTopology().getReadConsistentHash().locateOwners(key)); - // if any of the recipients has left the cluster since the command was issued, just don't wait for its response - targets.retainAll(rpcManager.getTransport().getMembers()); + + RpcOptionsBuilder rpcOptionsBuilder = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false); + int lastTopologyId = -1; + InternalCacheEntry value = null; + while (value == null) { + final CacheTopology cacheTopology = stateTransferManager.getCacheTopology(); + final int currentTopologyId = cacheTopology.getTopologyId(); + + if (trace) { + log.tracef("Perform remote get for key %s. topologyId=%s, currentTopologyId=%s", + key, lastTopologyId, currentTopologyId); + } + List
targets; + if (lastTopologyId < currentTopologyId) { + // Cache topology has changed or it is the first time. + lastTopologyId = currentTopologyId; + targets = new ArrayList
(cacheTopology.getReadConsistentHash().locateOwners(key)); + } else if (lastTopologyId == currentTopologyId) { + // Same topologyId, but the owners could have already installed the next topology + // Lets try with write consistent owners (the read owners in the next topology) + lastTopologyId = currentTopologyId + 1; + targets = new ArrayList
(cacheTopology.getWriteConsistentHash().locateOwners(key)); + // Remove already contacted nodes + targets.removeAll(cacheTopology.getReadConsistentHash().locateOwners(key)); + if (targets.isEmpty()) { + if (trace) { + log.tracef("No valid values found for key '%s' (topologyId=%s).", key, currentTopologyId); + } + break; + } + } else { // lastTopologyId > currentTopologyId + // We have not received a valid value from the write CH owners either, and the topology id hasn't changed + if (trace) { + log.tracef("No valid values found for key '%s' (topologyId=%s).", key, currentTopologyId); + } + break; + } + + value = invokeClusterGetCommandRemotely(targets, rpcOptionsBuilder, get, key); + if (trace) { + log.tracef("Remote get of key '%s' (topologyId=%s) returns %s", key, currentTopologyId, value); + } + } + return value; + } + + private InternalCacheEntry invokeClusterGetCommandRemotely(List
targets, RpcOptionsBuilder rpcOptionsBuilder, + ClusteredGetCommand get, Object key) { ResponseFilter filter = new ClusteredGetResponseValidityFilter(targets, rpcManager.getAddress()); - RpcOptions options = rpcManager.getRpcOptionsBuilder(ResponseMode.WAIT_FOR_VALID_RESPONSE, false) - .responseFilter(filter).build(); + RpcOptions options = rpcOptionsBuilder.responseFilter(filter).build(); Map responses = rpcManager.invokeRemotely(targets, get, options); if (!responses.isEmpty()) { for (Response r : responses.values()) { if (r instanceof SuccessfulResponse) { - + // The response value might be null. - SuccessfulResponse response = (SuccessfulResponse)r; - if( response.getResponseValue() == null ) - return null; - - InternalCacheValue cacheValue = (InternalCacheValue) response.getResponseValue(); + SuccessfulResponse response = (SuccessfulResponse) r; + Object responseValue = response.getResponseValue(); + if (responseValue == null) { + continue; + } + + InternalCacheValue cacheValue = (InternalCacheValue) responseValue; InternalCacheEntry ice = cacheValue.toInternalCacheEntry(key); if (rvrl != null) { rvrl.remoteValueFound(ice); @@ -99,10 +148,6 @@ protected final InternalCacheEntry retrieveFromRemoteSource(Object key, Invocati } } } - - // TODO If everyone returned null, and the read CH has changed, retry the remote get. - // Otherwise our get command might be processed by the old owners after they have invalidated their data - // and we'd return a null even though the key exists on return null; } diff --git a/core/src/test/java/org/infinispan/statetransfer/RemoteGetDuringStateTransferTest.java b/core/src/test/java/org/infinispan/statetransfer/RemoteGetDuringStateTransferTest.java new file mode 100644 index 000000000000..6fa2a4bc08ce --- /dev/null +++ b/core/src/test/java/org/infinispan/statetransfer/RemoteGetDuringStateTransferTest.java @@ -0,0 +1,510 @@ +package org.infinispan.statetransfer; + +import org.infinispan.Cache; +import org.infinispan.commands.remote.ClusteredGetCommand; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.manager.CacheContainer; +import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Address; +import org.infinispan.test.MultipleCacheManagersTest; +import org.infinispan.test.TestingUtil; +import org.infinispan.test.fwk.CleanupAfterMethod; +import org.infinispan.topology.CacheTopology; +import org.infinispan.topology.LocalTopologyManager; +import org.infinispan.tx.dld.ControlledRpcManager; +import org.infinispan.util.AbstractControlledLocalTopologyManager; +import org.infinispan.util.SingleSegmentConsistentHashFactory; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; + +import static org.infinispan.distribution.DistributionTestHelper.isFirstOwner; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + +/** + * Test multiple possible situations of interleaving between a remote get and state transfer. + * + * @author Pedro Ruivo + * @since 6.0 + */ +@Test(groups = "functional", testName = "statetransfer.RemoteGetDuringStateTransferTest") +@CleanupAfterMethod +public class RemoteGetDuringStateTransferTest extends MultipleCacheManagersTest { + + /* + summary (0: node which requests the remote get, 1: old owner) + + sc | currentTopologyId | currentTopologyId + 1 (rebalance) | currentTopologyId + 2 (finish) + 1 | 0:remoteGet+receiveReply | 1:sendReply | + 2 | 0:remoteGet | 1:sendReply, 0:receiveReply | + 3 | | 0:remoteGet+receiveReply, 1:sendReply | + 4 | 0:remoteGet | 0:receiveReply | 1:sendReply + 5 | 0:remoteGet | | 0:receiveReply, 1:sendReply + 6 | | 0:remoteGet+receiveReply | 1: sendReply + 7 | | 0:remoteGet | 0:receiveReply, 1:sendReply + */ + + /** + * ISPN-3315: In this scenario, a remote get is triggered and the reply received in a stable state. the old owner + * receives the request after the rebalance_start command. + */ + public void testScenario1() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s1"; + ownerCheckAndInit(cache(1), key, "v"); + + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.REBALANCE); + + //remote get is processed in current topology id. + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.REBALANCE); + + //wait until the rebalance_start arrives in old owner and let the remote get go + awaitForTopology(currentTopologyId + 1, cache(1)); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is still in currentTopologyId (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId, cache(0)); + + topologyManager.stopBlocking(LatchType.REBALANCE); + joinerFuture.get(); + } + + /** + * ISPN-3315: similar to scenario 1, the remote get is triggered in stable state but reply is received after the + * rebalance_start command. As in scenario 1, the owner receives the request after the rebalance_start command. + */ + public void testScenario2() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s2"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.CONFIRM_REBALANCE); + + //the remote get is triggered in the current topology id. + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.CONFIRM_REBALANCE); + + //wait until the rebalance start arrives in old owner and in the requestor. then let the remote get go. + awaitForTopology(currentTopologyId + 1, cache(1)); + awaitForTopology(currentTopologyId + 1, cache(0)); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 1, cache(0)); + + topologyManager.stopBlocking(LatchType.CONFIRM_REBALANCE); + joinerFuture.get(); + } + + /** + * ISPN-3315: the remote get is triggered and the reply received after the rebalance_start command. As in previous + * scenario, the old owner receives the request after the rebalance_start command. + */ + public void testScenario3() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s3"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.CONFIRM_REBALANCE); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.CONFIRM_REBALANCE); + + //consistency check + awaitForTopology(currentTopologyId + 1, cache(0)); + + //the remote get is triggered after the rebalance_start and before the confirm_rebalance. + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + //wait until the rebalance_start arrives in old owner + awaitForTopology(currentTopologyId + 1, cache(1)); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 1, cache(0)); + + topologyManager.stopBlocking(LatchType.CONFIRM_REBALANCE); + joinerFuture.get(); + } + + /** + * ISPN-3315: the remote get is trigger in stable state and the reply received after the rebalance_start command. + * However, the old owner will receive the request after the state transfer and he no longer has the key. + */ + public void testScenario4() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s4"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.CONSISTENT_HASH_UPDATE); + + //consistency check. the remote get is triggered + assertTopologyId(currentTopologyId, cache(0)); + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.CONSISTENT_HASH_UPDATE); + + //wait until the consistent_hash_update arrives in old owner. Also, awaits until the requestor receives the + //rebalance_start. + awaitForTopology(currentTopologyId + 2, cache(1)); + awaitForTopology(currentTopologyId + 1, cache(0)); + awaitUntilNotInDataContainer(cache(1), key); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 1, cache(0)); + + topologyManager.stopBlocking(LatchType.CONSISTENT_HASH_UPDATE); + joinerFuture.get(); + } + + /** + * ISPN-3315: similar to scenario 4, but this time the reply arrives after the state transfer. + */ + public void testScenario5() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s5"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + + //consistency check. trigger the remote get + assertTopologyId(currentTopologyId, cache(0)); + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + Future joinerFuture = addNode(); + + //wait until the state transfer ends in old owner and requestor. then let the remote get go. + awaitForTopology(currentTopologyId + 2, cache(1)); + awaitForTopology(currentTopologyId + 2, cache(0)); + awaitUntilNotInDataContainer(cache(1), key); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 2, cache(0)); + + joinerFuture.get(); + } + + /** + * ISPN-3315: the remote get and the reply are done after the rebalance_start command. The old owner receives the + * request after the consistent_hash_update and no longer has the key + */ + public void testScenario6() throws Exception { + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s6"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.CONSISTENT_HASH_UPDATE); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.CONSISTENT_HASH_UPDATE); + + //consistency check. trigger the remote get. + assertTopologyId(currentTopologyId + 1, cache(0)); + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + //wait until the consistent_hash_update arrives in old owner + awaitForTopology(currentTopologyId + 2, cache(1)); + awaitUntilNotInDataContainer(cache(1), key); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 1, cache(0)); + + topologyManager.stopBlocking(LatchType.CONSISTENT_HASH_UPDATE); + joinerFuture.get(); + } + + /** + * ISPN-3315: the remote get is triggered after the rebalance_start command and the reply is received after the + * consistent_hash_update command. The old owner receives the request after the consistent_hash_update command and no + * longer has the key. + */ + public void testScenario7() throws Exception { + //events: + //0: remote get target list obtained in topology i+1. reply obtained in topology i+2 + //1: remote get received in topology i+2 (no longer a owner) + assertClusterSize("Wrong cluster size.", 2); + final Object key = "key_s7"; + ownerCheckAndInit(cache(1), key, "v"); + final ControlledRpcManager rpcManager = replaceRpcManager(cache(0)); + final ControlledLocalTopologyManager topologyManager = replaceTopologyManager(manager(0)); + final int currentTopologyId = currentTopologyId(cache(0)); + + rpcManager.blockBefore(ClusteredGetCommand.class); + topologyManager.startBlocking(LatchType.CONSISTENT_HASH_UPDATE); + + Future joinerFuture = addNode(); + topologyManager.waitToBlock(LatchType.CONSISTENT_HASH_UPDATE); + + //consistency check. trigger the remote get. + assertTopologyId(currentTopologyId + 1, cache(0)); + Future remoteGetFuture = remoteGet(cache(0), key); + rpcManager.waitForCommandToBlock(); + + topologyManager.stopBlocking(LatchType.CONSISTENT_HASH_UPDATE); + + //wait until the consistent_hash_update arrives in old owner and in the requestor. + awaitForTopology(currentTopologyId + 2, cache(1)); + awaitForTopology(currentTopologyId + 2, cache(0)); + awaitUntilNotInDataContainer(cache(1), key); + rpcManager.stopBlocking(); + + //check the value returned and make sure that the requestor is in the correct topology id (consistency check) + assertEquals("Wrong value from remote get.", "v", remoteGetFuture.get()); + assertTopologyId(currentTopologyId + 2, cache(0)); + + joinerFuture.get(); + } + + @Override + protected void createCacheManagers() throws Throwable { + createClusteredCaches(2, configuration()); + } + + private Future remoteGet(Cache cache, Object key) { + return fork(new RemoteGetCallable(cache, key)); + } + + private int currentTopologyId(Cache cache) { + return TestingUtil.extractComponent(cache, StateTransferManager.class).getCacheTopology().getTopologyId(); + } + + private void assertTopologyId(final int expectedTopologyId, final Cache cache) { + assertEquals(expectedTopologyId, currentTopologyId(cache)); + } + + private void awaitForTopology(final int expectedTopologyId, final Cache cache) { + eventually(new Condition() { + @Override + public boolean isSatisfied() throws Exception { + return expectedTopologyId == currentTopologyId(cache); + } + }); + } + + private void awaitUntilNotInDataContainer(final Cache cache, final Object key) { + eventually(new Condition() { + @Override + public boolean isSatisfied() throws Exception { + return !cache.getAdvancedCache().getDataContainer().containsKey(key); + } + }); + } + + private Future addNode() { + addClusterEnabledCacheManager(configuration()); + return fork(new Callable() { + @Override + public Void call() throws Exception { + waitForClusterToForm(); + return null; + } + }); + } + + private void ownerCheckAndInit(Cache owner, Object key, Object value) { + assertTrue(address(owner) + " should be the owner of " + key + ".", isFirstOwner(cache(1), key)); + owner.put(key, value); + assertCacheValue(key, value); + } + + private void assertCacheValue(Object key, Object value) { + for (Cache cache : caches()) { + assertEquals("Wrong value for key " + key + " on " + address(cache) + ".", value, cache.get(key)); + } + } + + private ConfigurationBuilder configuration() { + ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false); + builder.clustering() + .hash() + .numSegments(1) + .numOwners(1) + .consistentHashFactory(new SingleKeyConsistentHashFactory()) + .stateTransfer() + .fetchInMemoryState(true); + return builder; + } + + private ControlledLocalTopologyManager replaceTopologyManager(CacheContainer cacheContainer) { + LocalTopologyManager manager = TestingUtil.extractGlobalComponent(cacheContainer, LocalTopologyManager.class); + ControlledLocalTopologyManager controlledLocalTopologyManager = new ControlledLocalTopologyManager(manager); + TestingUtil.replaceComponent(cacheContainer, LocalTopologyManager.class, controlledLocalTopologyManager, true); + return controlledLocalTopologyManager; + } + + private ControlledRpcManager replaceRpcManager(Cache cache) { + RpcManager manager = TestingUtil.extractComponent(cache, RpcManager.class); + ControlledRpcManager controlledRpcManager = new ControlledRpcManager(manager); + TestingUtil.replaceComponent(cache, RpcManager.class, controlledRpcManager, true); + return controlledRpcManager; + } + + private static enum LatchType { + CONSISTENT_HASH_UPDATE, + CONFIRM_REBALANCE, + REBALANCE + } + + @SuppressWarnings("unchecked") + public static class SingleKeyConsistentHashFactory extends SingleSegmentConsistentHashFactory { + + @Override + protected List
createOwnersCollection(List
members, int numberOfOwners) { + assertEquals("Wrong number of owners.", 1, numberOfOwners); + return Collections.singletonList(members.get(members.size() - 1)); + } + } + + private class RemoteGetCallable implements Callable { + + private final Cache cache; + private final Object key; + + private RemoteGetCallable(Cache cache, Object key) { + this.cache = cache; + this.key = key; + } + + @Override + public Object call() throws Exception { + return cache.get(key); + } + } + + private class ControlledLocalTopologyManager extends AbstractControlledLocalTopologyManager { + + private final Latch blockConfirmRebalance; + private final Latch blockConsistentHashUpdate; + private final Latch blockRebalanceStart; + + public ControlledLocalTopologyManager(LocalTopologyManager delegate) { + super(delegate); + blockRebalanceStart = new Latch(); + blockConsistentHashUpdate = new Latch(); + blockConfirmRebalance = new Latch(); + } + + public void startBlocking(LatchType type) { + getLatch(type).enable(); + } + + public void stopBlocking(LatchType type) { + getLatch(type).disable(); + } + + public void waitToBlock(LatchType type) throws InterruptedException { + getLatch(type).waitToBlock(); + } + + @Override + protected final void beforeHandleConsistentHashUpdate(String cacheName, CacheTopology cacheTopology, int viewId) { + getLatch(LatchType.CONSISTENT_HASH_UPDATE).blockIfNeeded(); + } + + @Override + protected final void beforeHandleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) { + getLatch(LatchType.REBALANCE).blockIfNeeded(); + } + + @Override + protected final void beforeConfirmRebalance(String cacheName, int topologyId, Throwable throwable) { + getLatch(LatchType.CONFIRM_REBALANCE).blockIfNeeded(); + } + + private Latch getLatch(LatchType type) { + switch (type) { + case CONSISTENT_HASH_UPDATE: + return blockConsistentHashUpdate; + case CONFIRM_REBALANCE: + return blockConfirmRebalance; + case REBALANCE: + return blockRebalanceStart; + } + throw new IllegalStateException("Should never happen!"); + } + } + + private class Latch { + + private boolean enabled = false; + private boolean blocked = false; + + public final synchronized void enable() { + this.enabled = true; + } + + public final synchronized void disable() { + this.enabled = false; + notifyAll(); + } + + public final synchronized void blockIfNeeded() { + blocked = true; + notifyAll(); + while (enabled) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + public final synchronized void waitToBlock() throws InterruptedException { + while (!blocked) { + wait(); + } + } + + } +} diff --git a/core/src/test/java/org/infinispan/statetransfer/TxReadAfterLosingOwnershipTest.java b/core/src/test/java/org/infinispan/statetransfer/TxReadAfterLosingOwnershipTest.java index b7fae0f23a43..00fcaa19da62 100644 --- a/core/src/test/java/org/infinispan/statetransfer/TxReadAfterLosingOwnershipTest.java +++ b/core/src/test/java/org/infinispan/statetransfer/TxReadAfterLosingOwnershipTest.java @@ -1,24 +1,20 @@ package org.infinispan.statetransfer; import org.infinispan.Cache; -import org.infinispan.commons.hash.Hash; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.distribution.ch.ConsistentHashFactory; -import org.infinispan.distribution.ch.DefaultConsistentHash; import org.infinispan.remoting.transport.Address; import org.infinispan.test.MultipleCacheManagersTest; import org.infinispan.test.TestingUtil; import org.infinispan.test.fwk.CleanupAfterMethod; +import org.infinispan.util.SingleSegmentConsistentHashFactory; import org.testng.AssertJUnit; import org.testng.annotations.Test; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -52,7 +48,7 @@ protected void createCacheManagers() throws Throwable { protected final ConfigurationBuilder createConfigurationBuilder() { ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, transactional()); builder.clustering() - .hash().numOwners(2).consistentHashFactory(new SingleKeyConsistentHashFactory()) + .hash().numOwners(2).consistentHashFactory(new SingleKeyConsistentHashFactory()).numSegments(1) .l1().enabled(l1()) .stateTransfer().fetchInMemoryState(true); return builder; @@ -138,40 +134,9 @@ public Object finalValue() { } } - public static class SingleKeyConsistentHashFactory implements ConsistentHashFactory, Serializable { + public static class SingleKeyConsistentHashFactory extends SingleSegmentConsistentHashFactory { - - @Override - public DefaultConsistentHash create(Hash hashFunction, int numOwners, int numSegments, List
members, - Map capacityFactors) { - return new DefaultConsistentHash(hashFunction, numOwners, 1, members, null, - new List[]{createOwnersCollection(members, numOwners)}); - } - - @Override - public DefaultConsistentHash updateMembers(DefaultConsistentHash baseCH, List
newMembers, - Map capacityFactors) { - final int numOwners = baseCH.getNumOwners(); - DefaultConsistentHash updated = new DefaultConsistentHash(baseCH.getHashFunction(), numOwners, 1, newMembers, null, - new List[]{createOwnersCollection(baseCH.getMembers(), numOwners)}); - return baseCH.equals(updated) ? baseCH : updated; - } - - @Override - public DefaultConsistentHash rebalance(DefaultConsistentHash baseCH) { - final List
members = baseCH.getMembers(); - final int numOwners = baseCH.getNumOwners(); - DefaultConsistentHash rebalanced = new DefaultConsistentHash(baseCH.getHashFunction(), numOwners, 1, members, null, - new List[]{createOwnersCollection(members, numOwners)}); - return baseCH.equals(rebalanced) ? baseCH : rebalanced; - } - - @Override - public DefaultConsistentHash union(DefaultConsistentHash ch1, DefaultConsistentHash ch2) { - return ch1.union(ch2); - } - - private static List
createOwnersCollection(List
members, int numberOfOwners) { + protected final List
createOwnersCollection(List
members, int numberOfOwners) { //the owners will be the first member and the last (numberOfOwners - 1)-th members List
owners = new ArrayList
(numberOfOwners); owners.add(members.get(0)); diff --git a/core/src/test/java/org/infinispan/util/AbstractControlledLocalTopologyManager.java b/core/src/test/java/org/infinispan/util/AbstractControlledLocalTopologyManager.java new file mode 100644 index 000000000000..474005ebcb10 --- /dev/null +++ b/core/src/test/java/org/infinispan/util/AbstractControlledLocalTopologyManager.java @@ -0,0 +1,75 @@ +package org.infinispan.util; + +import org.infinispan.topology.CacheJoinInfo; +import org.infinispan.topology.CacheTopology; +import org.infinispan.topology.CacheTopologyHandler; +import org.infinispan.topology.LocalTopologyManager; + +import java.util.Map; + +/** + * Class to be extended to allow some control over the local topology manager when testing Infinispan. + * + * Note: create before/after method lazily when need. + * + * @author Pedro Ruivo + * @since 6.0 + */ +public abstract class AbstractControlledLocalTopologyManager implements LocalTopologyManager { + + private final LocalTopologyManager delegate; + + protected AbstractControlledLocalTopologyManager(LocalTopologyManager delegate) { + this.delegate = delegate; + } + + @Override + public final CacheTopology join(String cacheName, CacheJoinInfo joinInfo, CacheTopologyHandler stm) throws Exception { + return delegate.join(cacheName, joinInfo, stm); + } + + @Override + public final void leave(String cacheName) { + delegate.leave(cacheName); + } + + @Override + public final void confirmRebalance(String cacheName, int topologyId, Throwable throwable) { + beforeConfirmRebalance(cacheName, topologyId, throwable); + delegate.confirmRebalance(cacheName, topologyId, throwable); + } + + @Override + public final Map handleStatusRequest(int viewId) { + return delegate.handleStatusRequest(viewId); + } + + @Override + public final void handleConsistentHashUpdate(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException { + beforeHandleConsistentHashUpdate(cacheName, cacheTopology, viewId); + delegate.handleConsistentHashUpdate(cacheName, cacheTopology, viewId); + } + + @Override + public final void handleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) throws InterruptedException { + beforeHandleRebalance(cacheName, cacheTopology, viewId); + delegate.handleRebalance(cacheName, cacheTopology, viewId); + } + + @Override + public final CacheTopology getCacheTopology(String cacheName) { + return delegate.getCacheTopology(cacheName); + } + + protected void beforeHandleConsistentHashUpdate(String cacheName, CacheTopology cacheTopology, int viewId) { + //no-op by default + } + + protected void beforeHandleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) { + //no-op by default + } + + protected void beforeConfirmRebalance(String cacheName, int topologyId, Throwable throwable) { + //no-op by default + } +} diff --git a/core/src/test/java/org/infinispan/util/SingleSegmentConsistentHashFactory.java b/core/src/test/java/org/infinispan/util/SingleSegmentConsistentHashFactory.java new file mode 100644 index 000000000000..f20aadb91ca1 --- /dev/null +++ b/core/src/test/java/org/infinispan/util/SingleSegmentConsistentHashFactory.java @@ -0,0 +1,63 @@ +package org.infinispan.util; + +import org.infinispan.commons.hash.Hash; +import org.infinispan.distribution.ch.ConsistentHashFactory; +import org.infinispan.distribution.ch.DefaultConsistentHash; +import org.infinispan.remoting.transport.Address; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static org.testng.AssertJUnit.assertEquals; + +/** + * Base consistent hash factory that contains a single segments + * + * @author Pedro Ruivo + * @since 6.0 + */ +@SuppressWarnings("unchecked") +public abstract class SingleSegmentConsistentHashFactory implements ConsistentHashFactory, + Serializable { + + @Override + public DefaultConsistentHash create(Hash hashFunction, int numOwners, int numSegments, List
members, + Map capacityFactors) { + assertNumberOfSegments(numSegments); + return new DefaultConsistentHash(hashFunction, numOwners, 1, members, null, new List[]{createOwnersCollection(members, numOwners)}); + } + + @Override + public DefaultConsistentHash updateMembers(DefaultConsistentHash baseCH, List
newMembers, + Map capacityFactors) { + assertNumberOfSegments(baseCH.getNumSegments()); + final int numOwners = baseCH.getNumOwners(); + DefaultConsistentHash updated = new DefaultConsistentHash(baseCH.getHashFunction(), numOwners, 1, newMembers, null, + new List[]{createOwnersCollection(baseCH.getMembers(), numOwners)}); + return baseCH.equals(updated) ? baseCH : updated; + } + + @Override + public DefaultConsistentHash rebalance(DefaultConsistentHash baseCH) { + assertNumberOfSegments(baseCH.getNumSegments()); + final List
members = baseCH.getMembers(); + final int numOwners = baseCH.getNumOwners(); + DefaultConsistentHash rebalanced = new DefaultConsistentHash(baseCH.getHashFunction(), numOwners, 1, members, null, + new List[]{createOwnersCollection(members, numOwners)}); + return baseCH.equals(rebalanced) ? baseCH : rebalanced; + } + + @Override + public DefaultConsistentHash union(DefaultConsistentHash ch1, DefaultConsistentHash ch2) { + assertNumberOfSegments(ch1.getNumSegments()); + assertNumberOfSegments(ch2.getNumSegments()); + return ch1.union(ch2); + } + + protected abstract List
createOwnersCollection(List
members, int numberOfOwners); + + private void assertNumberOfSegments(int numSegments) { + assertEquals("Wrong number of segments.", 1, numSegments); + } +}