Skip to content

Commit

Permalink
ISPN-7127 Broken InboundTransferTask may lose segments on restart
Browse files Browse the repository at this point in the history
Only retry an inbound transfer when we receive a new cache topology.
  • Loading branch information
danberindei authored and tristantarrant committed Feb 1, 2017
1 parent 1f02aa4 commit eb2e3ad
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.rpc.ResponseMode;
Expand Down Expand Up @@ -51,7 +50,7 @@ public class OutboundTransferTask implements Runnable {

private final Address destination;

private final Set<Integer> segments = new CopyOnWriteArraySet<Integer>();
private final Set<Integer> segments = new CopyOnWriteArraySet<>();

private final int stateTransferChunkSize;

Expand Down Expand Up @@ -157,20 +156,17 @@ public void run() {
if (stProvider != null) {
try {
CollectionKeyFilter filter = new CollectionKeyFilter(new ReadOnlyDataContainerBackedKeySet(dataContainer));
AdvancedCacheLoader.CacheLoaderTask task = new AdvancedCacheLoader.CacheLoaderTask() {
@Override
public void processEntry(MarshalledEntry me, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
int segmentId = readCh.getSegment(me.getKey());
if (segments.contains(segmentId)) {
try {
InternalCacheEntry icv = entryFactory.create(me.getKey(), me.getValue(), me.getMetadata());
sendEntry(icv, segmentId);
} catch (CacheException e) {
log.failedLoadingValueFromCacheStore(me.getKey(), e);
}
}
AdvancedCacheLoader.CacheLoaderTask task = (me, taskContext) -> {
int segmentId = readCh.getSegment(me.getKey());
if (segments.contains(segmentId)) {
try {
InternalCacheEntry icv = entryFactory.create(me.getKey(), me.getValue(), me.getMetadata());
sendEntry(icv, segmentId);
} catch (CacheException e) {
log.failedLoadingValueFromCacheStore(me.getKey(), e);
}
};
}
};
stProvider.process(filter, task, new WithinThreadExecutor(), true, true);
} catch (CacheException e) {
log.failedLoadingKeysFromCacheStore(e);
Expand All @@ -182,7 +178,10 @@ public void processEntry(MarshalledEntry me, AdvancedCacheLoader.TaskContext tas
} catch (Throwable t) {
// ignore eventual exceptions caused by cancellation (have InterruptedException as the root cause)
if (isCancelled()) {
log.tracef("Ignoring error in already cancelled transfer to node %s, segments %s", destination, segments);
if (trace) {
log.tracef("Ignoring error in already cancelled transfer to node %s, segments %s", destination,
segments);
}
} else {
log.failedOutBoundTransferExecution(t);
}
Expand All @@ -199,21 +198,17 @@ private void sendEntry(InternalCacheEntry ice, int segmentId) {
accumulatedEntries = 0;
}

List<InternalCacheEntry> entries = entriesBySegment.get(segmentId);
if (entries == null) {
entries = new ArrayList<InternalCacheEntry>();
entriesBySegment.put(segmentId, entries);
}
List<InternalCacheEntry> entries = entriesBySegment.computeIfAbsent(segmentId, k -> new ArrayList<>());
entries.add(ice);
accumulatedEntries++;
}

private void sendEntries(boolean isLast) {
List<StateChunk> chunks = new ArrayList<StateChunk>();
List<StateChunk> chunks = new ArrayList<>();
for (Map.Entry<Integer, List<InternalCacheEntry>> e : entriesBySegment.entrySet()) {
List<InternalCacheEntry> entries = e.getValue();
if (!entries.isEmpty() || isLast) {
chunks.add(new StateChunk(e.getKey(), new ArrayList<InternalCacheEntry>(entries), isLast));
chunks.add(new StateChunk(e.getKey(), new ArrayList<>(entries), isLast));
entries.clear();
}
}
Expand All @@ -222,7 +217,7 @@ private void sendEntries(boolean isLast) {
for (int segmentId : segments) {
List<InternalCacheEntry> entries = entriesBySegment.get(segmentId);
if (entries == null) {
chunks.add(new StateChunk(segmentId, Collections.<InternalCacheEntry>emptyList(), true));
chunks.add(new StateChunk(segmentId, Collections.emptyList(), true));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,42 +860,6 @@ private void requestSegments(Set<Integer> segments, Map<Address, Set<Integer>> s
}
}


private void retryTransferTask(InboundTransferTask task) {
if (trace) log.tracef("Retrying failed task: %s", task);
task.cancel();

LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));

// look for other sources for the failed segments and replace all failed tasks with new tasks to be retried
// remove+add needs to be atomic
synchronized (transferMapsLock) {
Set<Integer> failedSegments = new HashSet<Integer>();
Set<Address> excludedSources = new HashSet<>();
if (removeTransfer(task)) {
excludedSources.add(task.getSource());
failedSegments.addAll(task.getSegments());
} else {
log.debugf("Failed to remove the failed inbound transfer %s", task);
}

// should re-add only segments we still own and are not already in
failedSegments.retainAll(getOwnedSegments(cacheTopology.getWriteConsistentHash()));
// When the cache stops, the write CH stays unchanged, but the transfers map is cleared
failedSegments.retainAll(transfersBySegment.keySet());

if (!failedSegments.isEmpty()) {
Map<Address, Set<Integer>> sources = new HashMap<Address, Set<Integer>>();
findSources(failedSegments, sources, excludedSources);
for (Map.Entry<Address, Set<Integer>> e : sources.entrySet()) {
addTransfer(e.getKey(), e.getValue());
}
} else {
log.debugf("No unfinished segments to retry for inbound transfer %s", task);
}
}
}

/**
* Cancel transfers for segments we no longer own.
*
Expand Down Expand Up @@ -1066,14 +1030,8 @@ private boolean removeTransfer(InboundTransferTask inboundTransfer) {
}

void onTaskCompletion(final InboundTransferTask inboundTransfer) {
retryOrNotifyCompletion(inboundTransfer);
}

private void retryOrNotifyCompletion(InboundTransferTask inboundTransfer) {
if (!inboundTransfer.isCompletedSuccessfully() && !inboundTransfer.isCancelled()) {
retryTransferTask(inboundTransfer);
} else {
if (trace) log.tracef("Inbound transfer finished: %s", inboundTransfer);
if (trace) log.tracef("Inbound transfer finished: %s", inboundTransfer);
if (inboundTransfer.isCompletedSuccessfully()) {
removeTransfer(inboundTransfer);
notifyEndOfRebalanceIfNeeded(cacheTopology.getTopologyId(), cacheTopology.getRebalanceId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,6 @@ public class StateTransferRestart2Test extends MultipleCacheManagersTest {

private ConfigurationBuilder cfgBuilder;

private void waitForStateTransfer(Cache... caches) throws InterruptedException {
StateTransferManager[] stm = new StateTransferManager[caches.length];
for (int i = 0; i < stm.length; i++) {
stm[i] = TestingUtil.extractComponent(caches[i], StateTransferManager.class);
}
while (true) {
boolean inProgress = false;
for (StateTransferManager aStm : stm) {
if (aStm.isStateTransferInProgress()) {
inProgress = true;
break;
}
}
if (!inProgress) {
break;
}
wait(100);
}
}

@Override
protected void createCacheManagers() throws Throwable {
cfgBuilder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
Expand All @@ -90,7 +70,7 @@ public void testStateTransferRestart() throws Throwable {
for (int k = 0; k < numKeys; k++) {
c0.put(k, k);
}
waitForStateTransfer(c0, c1);
TestingUtil.waitForRehashToComplete(c0, c1);

assertEquals(numKeys, c0.entrySet().size());
assertEquals(numKeys, c1.entrySet().size());
Expand All @@ -113,17 +93,14 @@ public CompletableFuture<Map<Address, Response>> invokeRemotelyAsync(Collection<
d1.setDiscardAll(true);
d1.setExcludeItself(true);

fork(new Callable<Void>() {
@Override
public Void call() throws Exception {
log.info("KILLING the c1 cache");
try {
TestingUtil.killCacheManagers(manager(c1));
} catch (Exception e) {
log.info("there was some exception while killing cache");
}
return null;
fork((Callable<Void>) () -> {
log.info("KILLING the c1 cache");
try {
TestingUtil.killCacheManagers(manager(c1));
} catch (Exception e) {
log.info("there was some exception while killing cache");
}
return null;
});
}
return super.invokeRemotelyAsync(recipients, rpcCommand, mode, timeout, responseFilter, deliverOrder,
Expand All @@ -142,12 +119,8 @@ public Void call() throws Exception {
log.infof("c0 entrySet size before : %d", c0.entrySet().size());
log.infof("c2 entrySet size before : %d", c2.entrySet().size());

eventually(new Condition() {
@Override
public boolean isSatisfied() throws Exception {
return c0.entrySet().size() == numKeys && c2.entrySet().size() == numKeys;
}
});
eventuallyEquals(numKeys, () -> c0.entrySet().size());
eventuallyEquals(numKeys, () -> c2.entrySet().size());

log.info("Ending the test");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,6 @@ public CompletableFuture<Map<Address, Response>> invokeRemotelyAsync(Collection<

private MockTransport mockTransport = new MockTransport();

private void waitForStateTransfer(Cache... caches) throws InterruptedException {
StateTransferManager[] stm = new StateTransferManager[caches.length];
for (int i = 0; i < stm.length; i++) {
stm[i] = TestingUtil.extractComponent(caches[i], StateTransferManager.class);
}
while (true) {
boolean inProgress = false;
for (StateTransferManager aStm : stm) {
if (aStm.isStateTransferInProgress()) {
inProgress = true;
break;
}
}
if (!inProgress) {
break;
}
wait(100);
}
}

@Override
protected void createCacheManagers() throws Throwable {
cfgBuilder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true);
Expand All @@ -116,38 +96,32 @@ public void testStateTransferRestart() throws Throwable {
for (int k = 0; k < numKeys; k++) {
c0.put(k, k);
}
waitForStateTransfer(c0, c1);
TestingUtil.waitForRehashToComplete(c0, c1);

assertEquals(numKeys, c0.entrySet().size());
assertEquals(numKeys, c1.entrySet().size());

mockTransport.callOnStateResponseCommand = new Callable<Void>() {
@Override
public Void call() throws Exception {
fork(new Callable<Void>() {
@Override
public Void call() throws Exception {
log.info("KILLING the c1 cache");
try {
DISCARD d3 = TestingUtil.getDiscardForCache(c1);
d3.setDiscardAll(true);
d3.setExcludeItself(true);
TestingUtil.killCacheManagers(manager(c1));
} catch (Exception e) {
log.info("there was some exception while killing cache");
}
return null;
}
});
mockTransport.callOnStateResponseCommand = () -> {
fork((Callable<Void>) () -> {
log.info("KILLING the c1 cache");
try {
// sleep and wait to be killed
Thread.sleep(25000);
} catch (InterruptedException e) {
log.info("Interrupted as expected.");
Thread.currentThread().interrupt();
DISCARD d3 = TestingUtil.getDiscardForCache(c1);
d3.setDiscardAll(true);
d3.setExcludeItself(true);
TestingUtil.killCacheManagers(manager(c1));
} catch (Exception e) {
log.info("there was some exception while killing cache");
}
return null;
});
try {
// sleep and wait to be killed
Thread.sleep(25000);
} catch (InterruptedException e) {
log.info("Interrupted as expected.");
Thread.currentThread().interrupt();
}
return null;
};

log.info("adding cache c2");
Expand All @@ -161,12 +135,8 @@ public Void call() throws Exception {
log.infof("c0 entrySet size before : %d", c0.entrySet().size());
log.infof("c2 entrySet size before : %d", c2.entrySet().size());

eventually(new Condition() {
@Override
public boolean isSatisfied() throws Exception {
return c0.entrySet().size() == numKeys && c2.entrySet().size() == numKeys;
}
});
eventuallyEquals(numKeys, () -> c0.entrySet().size());
eventuallyEquals(numKeys, () -> c2.entrySet().size());

log.info("Ending the test");
}
Expand Down

0 comments on commit eb2e3ad

Please sign in to comment.