From a269dc67764072f4556b718edb290c6e5fbcb2d6 Mon Sep 17 00:00:00 2001 From: Dan Berindei Date: Thu, 23 May 2013 15:50:24 +0300 Subject: [PATCH 1/2] ISPN-3129 If the status recovery fails for a cache, it stops recovery for all the caches --- .../topology/ClusterTopologyManagerImpl.java | 13 +++++++++---- .../main/java/org/infinispan/util/logging/Log.java | 5 +++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java b/core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java index 342d6d0204c2..6a26c86c93df 100644 --- a/core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java +++ b/core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java @@ -234,10 +234,14 @@ protected void handleNewView(List
ignored, boolean mergeView, int newVi try { Map> clusterCacheMap = recoverClusterStatus(newViewId); - for (Map.Entry> e : clusterCacheMap.entrySet()) { - String cacheName = e.getKey(); - List topologyList = e.getValue(); - updateCacheStatusAfterMerge(cacheName, transport.getMembers(), topologyList); + for (Map.Entry> entry : clusterCacheMap.entrySet()) { + String cacheName = entry.getKey(); + List topologyList = entry.getValue(); + try { + updateCacheStatusAfterMerge(cacheName, transport.getMembers(), topologyList); + } catch (Exception e) { + log.failedToRecoverCacheState(cacheName, e); + } } } catch (InterruptedException e) { log.tracef("Cluster state recovery interrupted because the coordinator is shutting down"); @@ -247,6 +251,7 @@ protected void handleNewView(List
ignored, boolean mergeView, int newVi // TODO Retry? log.failedToRecoverClusterState(e); } + } else if (isCoordinator) { try { updateClusterMembers(transport.getMembers()); diff --git a/core/src/main/java/org/infinispan/util/logging/Log.java b/core/src/main/java/org/infinispan/util/logging/Log.java index e83160183c3a..47ad005dd5b2 100644 --- a/core/src/main/java/org/infinispan/util/logging/Log.java +++ b/core/src/main/java/org/infinispan/util/logging/Log.java @@ -880,5 +880,10 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout, @LogMessage(level = WARN) @Message(value = "Support for concurrent updates can no longer be configured (it is always enabled by default)", id = 227) void warnConcurrentUpdateSupportCannotBeConfigured(); + + @LogMessage(level = ERROR) + @Message(value = "Failed to recover cache %s state after the current node became the coordinator", id = 228) + void failedToRecoverCacheState(String cacheName, @Cause Throwable cause); + } From 0d92da13b74d84ff050c4e5e677176df9a54ae75 Mon Sep 17 00:00:00 2001 From: Dan Berindei Date: Mon, 27 May 2013 09:58:58 +0300 Subject: [PATCH 2/2] ISPN-3130 Cancelling an InboundTransferTask should be idempotent Remove all the cancelled segments from the transfersBySegment map. --- .../statetransfer/StateConsumerImpl.java | 6 +- .../statetransfer/StateConsumerTest.java | 111 ++++++++++++------ 2 files changed, 78 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/infinispan/statetransfer/StateConsumerImpl.java b/core/src/main/java/org/infinispan/statetransfer/StateConsumerImpl.java index 6e89bbfbcfd9..b7229022b6e5 100644 --- a/core/src/main/java/org/infinispan/statetransfer/StateConsumerImpl.java +++ b/core/src/main/java/org/infinispan/statetransfer/StateConsumerImpl.java @@ -794,12 +794,14 @@ private void cancelTransfers(Set removedSegments) { List segmentsToCancel = new ArrayList(removedSegments); while (!segmentsToCancel.isEmpty()) { int segmentId = segmentsToCancel.remove(0); - InboundTransferTask inboundTransfer = transfersBySegment.remove(segmentId); + InboundTransferTask inboundTransfer = transfersBySegment.get(segmentId); if (inboundTransfer != null) { // we need to check the transfer was not already completed Set cancelledSegments = new HashSet(removedSegments); cancelledSegments.retainAll(inboundTransfer.getSegments()); segmentsToCancel.removeAll(cancelledSegments); - inboundTransfer.cancelSegments(cancelledSegments); //this will also remove it from transfersBySource if the entire task gets cancelled + transfersBySegment.keySet().removeAll(cancelledSegments); + //this will also remove it from transfersBySource if the entire task gets cancelled + inboundTransfer.cancelSegments(cancelledSegments); } } } diff --git a/core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java b/core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java index d0d69d9877ad..67ef0207fc58 100644 --- a/core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java +++ b/core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java @@ -25,7 +25,6 @@ import org.infinispan.Cache; import org.infinispan.commands.CommandsFactory; -import org.infinispan.commands.ReplicableCommand; import org.infinispan.commons.hash.MurmurHash3; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.Configuration; @@ -51,11 +50,14 @@ import org.infinispan.remoting.rpc.RpcOptionsBuilder; import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Transport; +import org.infinispan.test.AbstractInfinispanTest; import org.infinispan.topology.CacheTopology; import org.infinispan.transaction.LocalTransaction; import org.infinispan.transaction.RemoteTransaction; import org.infinispan.transaction.TransactionTable; import org.infinispan.transaction.totalorder.TotalOrderManager; +import org.infinispan.util.CollectionFactory; +import org.infinispan.util.InfinispanCollections; import org.infinispan.util.concurrent.IsolationLevel; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -69,18 +71,25 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; /** * Tests StateConsumerImpl. @@ -89,7 +98,7 @@ * @since 5.2 */ @Test(groups = "functional", testName = "statetransfer.StateConsumerTest") -public class StateConsumerTest { +public class StateConsumerTest extends AbstractInfinispanTest { private static final Log log = LogFactory.getLog(StateConsumerTest.class); @@ -116,17 +125,17 @@ public void test1() throws Exception { Configuration configuration = cb.build(); // create list of 6 members - Address[] addresses = new Address[10]; - for (int i = 0; i < 10; i++) { + Address[] addresses = new Address[4]; + for (int i = 0; i < 4; i++) { addresses[i] = new TestAddress(i); } - List
members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3], addresses[4]); - List
members2 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]); + List
members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]); + List
members2 = Arrays.asList(addresses[0], addresses[1], addresses[2]); // create CHes DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory(); - DefaultConsistentHash ch1 = chf.create(new MurmurHash3(), 2, 4, members1); - DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2); + DefaultConsistentHash ch1 = chf.create(new MurmurHash3(), 2, 40, members1); + final DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2); DefaultConsistentHash ch3 = chf.rebalance(ch2); log.debug(ch1); @@ -139,7 +148,7 @@ public void test1() throws Exception { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { - String name = "PooledExecutorThread-" + getClass().getSimpleName() + "-" + r.hashCode(); + String name = "PooledExecutorThread-" + StateConsumerTest.class.getSimpleName() + "-" + r.hashCode(); return new Thread(r, name); } }; @@ -168,32 +177,31 @@ public StateRequestCommand answer(InvocationOnMock invocation) { }); when(transport.getViewId()).thenReturn(1); - when(rpcManager.getAddress()).thenReturn(new TestAddress(0)); + when(rpcManager.getAddress()).thenReturn(addresses[0]); when(rpcManager.getTransport()).thenReturn(transport); - when(rpcManager.invokeRemotely(any(Collection.class), any(ReplicableCommand.class), any(RpcOptions.class))) + final Map> requestedSegments = CollectionFactory.makeConcurrentMap(); + final Set flatRequestedSegments = new ConcurrentSkipListSet(); + when(rpcManager.invokeRemotely(any(Collection.class), any(StateRequestCommand.class), any(RpcOptions.class))) .thenAnswer(new Answer>() { - @Override - public Map answer(InvocationOnMock invocation) { - Collection
recipients = (Collection
) invocation.getArguments()[0]; - ReplicableCommand rpcCommand = (ReplicableCommand) invocation.getArguments()[1]; - if (rpcCommand instanceof StateRequestCommand) { - StateRequestCommand cmd = (StateRequestCommand) rpcCommand; - Map results = new HashMap(); - if (cmd.getType().equals(StateRequestCommand.Type.GET_TRANSACTIONS)) { - for (Address recipient : recipients) { + @Override + public Map answer(InvocationOnMock invocation) { + Collection
recipients = (Collection
) invocation.getArguments()[0]; + Address recipient = recipients.iterator().next(); + StateRequestCommand cmd = (StateRequestCommand) invocation.getArguments()[1]; + Map results = new HashMap(1); + if (cmd.getType().equals(StateRequestCommand.Type.GET_TRANSACTIONS)) { results.put(recipient, SuccessfulResponse.create(new ArrayList())); - } - } else if (cmd.getType().equals(StateRequestCommand.Type.START_STATE_TRANSFER) || cmd.getType().equals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER)) { - for (Address recipient : recipients) { + Set segments = (Set) cmd.getParameters()[3]; + requestedSegments.put(recipient, segments); + flatRequestedSegments.addAll(segments); + } else if (cmd.getType().equals(StateRequestCommand.Type.START_STATE_TRANSFER) + || cmd.getType().equals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER)) { results.put(recipient, SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE); } + return results; } - return results; - } - return Collections.emptyMap(); - } - }); + }); when(rpcManager.getRpcOptionsBuilder(any(ResponseMode.class))).thenAnswer(new Answer() { public RpcOptionsBuilder answer(InvocationOnMock invocation) { @@ -204,7 +212,7 @@ public RpcOptionsBuilder answer(InvocationOnMock invocation) { // create state provider - StateConsumerImpl stateConsumer = new StateConsumerImpl(); + final StateConsumerImpl stateConsumer = new StateConsumerImpl(); stateConsumer.init(cache, pooledExecutorService, stateTransferManager, interceptorChain, icc, configuration, rpcManager, null, commandsFactory, cacheLoaderManager, dataContainer, transactionTable, stateTransferLock, cacheNotifier, totalOrderManager); stateConsumer.start(); @@ -223,19 +231,48 @@ public Iterator answer(InvocationOnMock invocation) { when(transactionTable.getLocalTransactions()).thenReturn(Collections.emptyList()); when(transactionTable.getRemoteTransactions()).thenReturn(Collections.emptyList()); - // create segments - Set segments = new HashSet(Arrays.asList(0, 1, 2, 3, 4)); - - Set seg = new HashSet(Arrays.asList(0)); - assertFalse(stateConsumer.hasActiveTransfers()); + // node 481 leaves stateConsumer.onTopologyUpdate(new CacheTopology(1, ch2, null), false); assertFalse(stateConsumer.hasActiveTransfers()); + // start a rebalance stateConsumer.onTopologyUpdate(new CacheTopology(2, ch2, ch3), true); assertTrue(stateConsumer.hasActiveTransfers()); + // check that all segments have been requested + Set oldSegments = ch2.getSegmentsForOwner(addresses[0]); + final Set newSegments = ch3.getSegmentsForOwner(addresses[0]); + newSegments.removeAll(oldSegments); + log.debugf("Rebalancing. Added segments=%s, old segments=%s", newSegments, oldSegments); + assertEquals(flatRequestedSegments, newSegments); + + // simulate a cluster state recovery and return to ch2 + fork(new Callable() { + @Override + public Object call() throws Exception { + stateConsumer.onTopologyUpdate(new CacheTopology(3, ch2, null), false); + return null; + } + }); + stateConsumer.onTopologyUpdate(new CacheTopology(3, ch2, null), false); + assertFalse(stateConsumer.hasActiveTransfers()); + + + // restart the rebalance + requestedSegments.clear(); + stateConsumer.onTopologyUpdate(new CacheTopology(4, ch2, ch3), true); + assertTrue(stateConsumer.hasActiveTransfers()); + assertEquals(flatRequestedSegments, newSegments); + + // apply state + ArrayList stateChunks = new ArrayList(); + for (Integer segment : newSegments) { + stateChunks.add(new StateChunk(segment, InfinispanCollections.emptyList(), true)); + } + stateConsumer.applyState(addresses[1], 2, stateChunks); + stateConsumer.stop(); assertFalse(stateConsumer.hasActiveTransfers()); }