Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-3129 & ISPN-3130 #1846

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -794,12 +794,14 @@ private void cancelTransfers(Set<Integer> removedSegments) {
List<Integer> segmentsToCancel = new ArrayList<Integer>(removedSegments);
while (!segmentsToCancel.isEmpty()) {
int segmentId = segmentsToCancel.remove(0);
InboundTransferTask inboundTransfer = transfersBySegment.remove(segmentId);
InboundTransferTask inboundTransfer = transfersBySegment.get(segmentId);
if (inboundTransfer != null) { // we need to check the transfer was not already completed
Set<Integer> cancelledSegments = new HashSet<Integer>(removedSegments);
cancelledSegments.retainAll(inboundTransfer.getSegments());
segmentsToCancel.removeAll(cancelledSegments);
inboundTransfer.cancelSegments(cancelledSegments); //this will also remove it from transfersBySource if the entire task gets cancelled
transfersBySegment.keySet().removeAll(cancelledSegments);
//this will also remove it from transfersBySource if the entire task gets cancelled
inboundTransfer.cancelSegments(cancelledSegments);
}
}
}
Expand Down
Expand Up @@ -234,10 +234,14 @@ protected void handleNewView(List<Address> ignored, boolean mergeView, int newVi
try {
Map<String, List<CacheTopology>> clusterCacheMap = recoverClusterStatus(newViewId);

for (Map.Entry<String, List<CacheTopology>> e : clusterCacheMap.entrySet()) {
String cacheName = e.getKey();
List<CacheTopology> topologyList = e.getValue();
updateCacheStatusAfterMerge(cacheName, transport.getMembers(), topologyList);
for (Map.Entry<String, List<CacheTopology>> entry : clusterCacheMap.entrySet()) {
String cacheName = entry.getKey();
List<CacheTopology> topologyList = entry.getValue();
try {
updateCacheStatusAfterMerge(cacheName, transport.getMembers(), topologyList);
} catch (Exception e) {
log.failedToRecoverCacheState(cacheName, e);
}
}
} catch (InterruptedException e) {
log.tracef("Cluster state recovery interrupted because the coordinator is shutting down");
Expand All @@ -247,6 +251,7 @@ protected void handleNewView(List<Address> ignored, boolean mergeView, int newVi
// TODO Retry?
log.failedToRecoverClusterState(e);
}

} else if (isCoordinator) {
try {
updateClusterMembers(transport.getMembers());
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Expand Up @@ -880,5 +880,10 @@ void asyncStoreShutdownTimeoutTooHigh(long configuredAsyncStopTimeout,
@LogMessage(level = WARN)
@Message(value = "Support for concurrent updates can no longer be configured (it is always enabled by default)", id = 227)
void warnConcurrentUpdateSupportCannotBeConfigured();

@LogMessage(level = ERROR)
@Message(value = "Failed to recover cache %s state after the current node became the coordinator", id = 228)
void failedToRecoverCacheState(String cacheName, @Cause Throwable cause);

}

111 changes: 74 additions & 37 deletions core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java
Expand Up @@ -25,7 +25,6 @@

import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commons.hash.MurmurHash3;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
Expand All @@ -51,11 +50,14 @@
import org.infinispan.remoting.rpc.RpcOptionsBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.LocalTransaction;
import org.infinispan.transaction.RemoteTransaction;
import org.infinispan.transaction.TransactionTable;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.util.CollectionFactory;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand All @@ -69,18 +71,25 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

/**
* Tests StateConsumerImpl.
Expand All @@ -89,7 +98,7 @@
* @since 5.2
*/
@Test(groups = "functional", testName = "statetransfer.StateConsumerTest")
public class StateConsumerTest {
public class StateConsumerTest extends AbstractInfinispanTest {

private static final Log log = LogFactory.getLog(StateConsumerTest.class);

Expand All @@ -116,17 +125,17 @@ public void test1() throws Exception {
Configuration configuration = cb.build();

// create list of 6 members
Address[] addresses = new Address[10];
for (int i = 0; i < 10; i++) {
Address[] addresses = new Address[4];
for (int i = 0; i < 4; i++) {
addresses[i] = new TestAddress(i);
}
List<Address> members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3], addresses[4]);
List<Address> members2 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]);
List<Address> members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]);
List<Address> members2 = Arrays.asList(addresses[0], addresses[1], addresses[2]);

// create CHes
DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory();
DefaultConsistentHash ch1 = chf.create(new MurmurHash3(), 2, 4, members1);
DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2);
DefaultConsistentHash ch1 = chf.create(new MurmurHash3(), 2, 40, members1);
final DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2);
DefaultConsistentHash ch3 = chf.rebalance(ch2);

log.debug(ch1);
Expand All @@ -139,7 +148,7 @@ public void test1() throws Exception {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
String name = "PooledExecutorThread-" + getClass().getSimpleName() + "-" + r.hashCode();
String name = "PooledExecutorThread-" + StateConsumerTest.class.getSimpleName() + "-" + r.hashCode();
return new Thread(r, name);
}
};
Expand Down Expand Up @@ -168,32 +177,31 @@ public StateRequestCommand answer(InvocationOnMock invocation) {
});

when(transport.getViewId()).thenReturn(1);
when(rpcManager.getAddress()).thenReturn(new TestAddress(0));
when(rpcManager.getAddress()).thenReturn(addresses[0]);
when(rpcManager.getTransport()).thenReturn(transport);

when(rpcManager.invokeRemotely(any(Collection.class), any(ReplicableCommand.class), any(RpcOptions.class)))
final Map<Address, Set<Integer>> requestedSegments = CollectionFactory.makeConcurrentMap();
final Set<Integer> flatRequestedSegments = new ConcurrentSkipListSet<Integer>();
when(rpcManager.invokeRemotely(any(Collection.class), any(StateRequestCommand.class), any(RpcOptions.class)))
.thenAnswer(new Answer<Map<Address, Response>>() {
@Override
public Map<Address, Response> answer(InvocationOnMock invocation) {
Collection<Address> recipients = (Collection<Address>) invocation.getArguments()[0];
ReplicableCommand rpcCommand = (ReplicableCommand) invocation.getArguments()[1];
if (rpcCommand instanceof StateRequestCommand) {
StateRequestCommand cmd = (StateRequestCommand) rpcCommand;
Map<Address, Response> results = new HashMap<Address, Response>();
if (cmd.getType().equals(StateRequestCommand.Type.GET_TRANSACTIONS)) {
for (Address recipient : recipients) {
@Override
public Map<Address, Response> answer(InvocationOnMock invocation) {
Collection<Address> recipients = (Collection<Address>) invocation.getArguments()[0];
Address recipient = recipients.iterator().next();
StateRequestCommand cmd = (StateRequestCommand) invocation.getArguments()[1];
Map<Address, Response> results = new HashMap<Address, Response>(1);
if (cmd.getType().equals(StateRequestCommand.Type.GET_TRANSACTIONS)) {
results.put(recipient, SuccessfulResponse.create(new ArrayList<TransactionInfo>()));
}
} else if (cmd.getType().equals(StateRequestCommand.Type.START_STATE_TRANSFER) || cmd.getType().equals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER)) {
for (Address recipient : recipients) {
Set<Integer> segments = (Set<Integer>) cmd.getParameters()[3];
requestedSegments.put(recipient, segments);
flatRequestedSegments.addAll(segments);
} else if (cmd.getType().equals(StateRequestCommand.Type.START_STATE_TRANSFER)
|| cmd.getType().equals(StateRequestCommand.Type.CANCEL_STATE_TRANSFER)) {
results.put(recipient, SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
}
return results;
}
return results;
}
return Collections.emptyMap();
}
});
});

when(rpcManager.getRpcOptionsBuilder(any(ResponseMode.class))).thenAnswer(new Answer<RpcOptionsBuilder>() {
public RpcOptionsBuilder answer(InvocationOnMock invocation) {
Expand All @@ -204,7 +212,7 @@ public RpcOptionsBuilder answer(InvocationOnMock invocation) {


// create state provider
StateConsumerImpl stateConsumer = new StateConsumerImpl();
final StateConsumerImpl stateConsumer = new StateConsumerImpl();
stateConsumer.init(cache, pooledExecutorService, stateTransferManager, interceptorChain, icc, configuration, rpcManager, null,
commandsFactory, cacheLoaderManager, dataContainer, transactionTable, stateTransferLock, cacheNotifier, totalOrderManager);
stateConsumer.start();
Expand All @@ -223,19 +231,48 @@ public Iterator<InternalCacheEntry> answer(InvocationOnMock invocation) {
when(transactionTable.getLocalTransactions()).thenReturn(Collections.<LocalTransaction>emptyList());
when(transactionTable.getRemoteTransactions()).thenReturn(Collections.<RemoteTransaction>emptyList());

// create segments
Set<Integer> segments = new HashSet<Integer>(Arrays.asList(0, 1, 2, 3, 4));

Set<Integer> seg = new HashSet<Integer>(Arrays.asList(0));

assertFalse(stateConsumer.hasActiveTransfers());

// node 481 leaves
stateConsumer.onTopologyUpdate(new CacheTopology(1, ch2, null), false);
assertFalse(stateConsumer.hasActiveTransfers());

// start a rebalance
stateConsumer.onTopologyUpdate(new CacheTopology(2, ch2, ch3), true);
assertTrue(stateConsumer.hasActiveTransfers());

// check that all segments have been requested
Set<Integer> oldSegments = ch2.getSegmentsForOwner(addresses[0]);
final Set<Integer> newSegments = ch3.getSegmentsForOwner(addresses[0]);
newSegments.removeAll(oldSegments);
log.debugf("Rebalancing. Added segments=%s, old segments=%s", newSegments, oldSegments);
assertEquals(flatRequestedSegments, newSegments);

// simulate a cluster state recovery and return to ch2
fork(new Callable<Object>() {
@Override
public Object call() throws Exception {
stateConsumer.onTopologyUpdate(new CacheTopology(3, ch2, null), false);
return null;
}
});
stateConsumer.onTopologyUpdate(new CacheTopology(3, ch2, null), false);
assertFalse(stateConsumer.hasActiveTransfers());


// restart the rebalance
requestedSegments.clear();
stateConsumer.onTopologyUpdate(new CacheTopology(4, ch2, ch3), true);
assertTrue(stateConsumer.hasActiveTransfers());
assertEquals(flatRequestedSegments, newSegments);

// apply state
ArrayList<StateChunk> stateChunks = new ArrayList<StateChunk>();
for (Integer segment : newSegments) {
stateChunks.add(new StateChunk(segment, InfinispanCollections.<InternalCacheEntry>emptyList(), true));
}
stateConsumer.applyState(addresses[1], 2, stateChunks);

stateConsumer.stop();
assertFalse(stateConsumer.hasActiveTransfers());
}
Expand Down