From 08060f5583dbf239739d9cee7c6888f12b51207a Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Thu, 24 Aug 2017 16:25:56 +0200 Subject: [PATCH] ISPN-8218 ScatteredStreamIteratorTest.waitUntilProcessingResults random failure Reverts most of the stream related changes from ISPN-6645, fixing the consistent hash instance used for both remote and local iteration. The retry with newer topology is moved to xCacheStream instead. Non-rehash-aware iteration through scattered cache may miss segments that don't have owner during rehash. --- .../stream/impl/AbstractCacheStream.java | 58 ++++++++++--- .../stream/impl/ClusterStreamManager.java | 38 +++++---- .../stream/impl/ClusterStreamManagerImpl.java | 83 ++++++------------- .../stream/impl/DistributedCacheStream.java | 27 ++++-- .../PartitionAwareClusterStreamManager.java | 33 ++++---- .../impl/tx/TxClusterStreamManager.java | 30 +++---- 6 files changed, 148 insertions(+), 121 deletions(-) diff --git a/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java index 0c4768562e9d..a3a5e53a3a1f 100644 --- a/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java @@ -25,11 +25,13 @@ import org.infinispan.commons.util.concurrent.ConcurrentHashSet; import org.infinispan.container.entries.CacheEntry; import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.distribution.ch.KeyPartitioner; import org.infinispan.factories.ComponentRegistry; import org.infinispan.partitionhandling.impl.PartitionHandlingManager; import org.infinispan.remoting.transport.Address; +import org.infinispan.statetransfer.StateTransferLock; import org.infinispan.stream.impl.intops.IntermediateOperation; import org.infinispan.stream.impl.termop.SegmentRetryingOperation; import org.infinispan.stream.impl.termop.SingleRunOperation; @@ -60,6 +62,7 @@ public abstract class AbstractCacheStream, S2 exte protected final ComponentRegistry registry; protected final PartitionHandlingManager partition; protected final KeyPartitioner keyPartitioner; + protected final StateTransferLock stateTransferLock; protected Runnable closeRunnable = null; @@ -94,6 +97,7 @@ protected AbstractCacheStream(Address localAddress, boolean parallel, Distributi this.registry = registry; this.partition = registry.getComponent(PartitionHandlingManager.class); this.keyPartitioner = registry.getComponent(KeyPartitioner.class); + this.stateTransferLock = registry.getComponent(StateTransferLock.class); intermediateOperations = new ArrayDeque<>(); } @@ -108,6 +112,7 @@ protected AbstractCacheStream(AbstractCacheStream other) { this.registry = other.registry; this.partition = other.partition; this.keyPartitioner = other.keyPartitioner; + this.stateTransferLock = other.stateTransferLock; this.closeRunnable = other.closeRunnable; @@ -206,7 +211,7 @@ R performOperation(Function function, ResultsAccumu ConsistentHash ch = dm.getWriteConsistentHash(); TerminalOperation op = new SingleRunOperation(intermediateOperations, supplierForSegments(ch, segmentsToFilter, null), function); - Object id = csm.remoteStreamOperation(getParallelDistribution(), parallel, segmentsToFilter, keysToFilter, + Object id = csm.remoteStreamOperation(getParallelDistribution(), parallel, ch, segmentsToFilter, keysToFilter, Collections.emptyMap(), includeLoader, op, remoteResults, earlyTerminatePredicate); try { R localValue = op.performOperation(); @@ -235,7 +240,8 @@ R performOperationRehashAware(Function function, bo Set segmentsToProcess = segmentsToFilter; TerminalOperation op; do { - ConsistentHash ch = dm.getReadConsistentHash(); + LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); + ConsistentHash ch = cacheTopology.getReadConsistentHash(); if (retryOnRehash) { op = new SegmentRetryingOperation(intermediateOperations, supplierForSegments(ch, segmentsToProcess, null), function); @@ -243,7 +249,7 @@ R performOperationRehashAware(Function function, bo op = new SingleRunOperation(intermediateOperations, supplierForSegments(ch, segmentsToProcess, null), function); } - Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, segmentsToProcess, + Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess, keysToFilter, Collections.emptyMap(), includeLoader, op, remoteResults, earlyTerminatePredicate); try { R localValue; @@ -285,6 +291,13 @@ R performOperationRehashAware(Function function, bo segmentsToProcess = new HashSet<>(remoteResults.lostSegments); remoteResults.lostSegments.clear(); log.tracef("Found %s lost segments for identifier %s", segmentsToProcess, id); + if (remoteResults.requiresNextTopology) { + try { + stateTransferLock.waitForTopology(cacheTopology.getTopologyId(), timeout, timeoutUnit); + } catch (InterruptedException | java.util.concurrent.TimeoutException e) { + throw new CacheException(e); + } + } } else { // If we didn't lose any segments we don't need to process anymore if (segmentsToProcess != null) { @@ -305,12 +318,13 @@ void performRehashKeyTrackingOperation( final AtomicBoolean complete = new AtomicBoolean(); ConsistentHash segmentInfoCH = dm.getReadConsistentHash(); - KeyTrackingConsumer results = new KeyTrackingConsumer<>(keyPartitioner, segmentInfoCH, (c) -> {}, - c -> c, null); + KeyTrackingConsumer results = new KeyTrackingConsumer<>(keyPartitioner, segmentInfoCH.getNumSegments(), c -> {}, + c -> c, null); Set segmentsToProcess = segmentsToFilter == null ? new RangeSet(segmentInfoCH.getNumSegments()) : segmentsToFilter; do { - ConsistentHash ch = dm.getReadConsistentHash(); + LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); + ConsistentHash ch = cacheTopology.getReadConsistentHash(); boolean localRun = ch.getMembers().contains(localAddress); Set segments; Set excludedKeys; @@ -328,7 +342,7 @@ void performRehashKeyTrackingOperation( KeyTrackingTerminalOperation op = function.apply(supplierForSegments(ch, segmentsToProcess, excludedKeys)); op.handleInjection(registry); - Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, segmentsToProcess, + Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess, keysToFilter, new AtomicReferenceArrayToMap<>(results.referenceArray), includeLoader, op, results); try { @@ -360,6 +374,14 @@ void performRehashKeyTrackingOperation( segmentsToProcess = new HashSet<>(results.lostSegments); results.lostSegments.clear(); log.tracef("Found %s lost segments for identifier %s", segmentsToProcess, id); + if (results.requiresNextTopology) { + try { + stateTransferLock.waitForTopology(cacheTopology.getTopologyId() + 1, timeout, timeoutUnit); + results.requiresNextTopology = false; + } catch (InterruptedException | java.util.concurrent.TimeoutException e) { + throw new CacheException(e); + } + } } else { log.tracef("Finished rehash aware operation for id %s", id); complete.set(true); @@ -425,26 +447,25 @@ public Set> entrySet() { class KeyTrackingConsumer implements ClusterStreamManager.ResultsCallback>>, KeyTrackingTerminalOperation.IntermediateCollector>> { final KeyPartitioner keyPartitioner; - final ConsistentHash ch; final Consumer consumer; final Set lostSegments = new ConcurrentHashSet<>(); final Function, V> valueFunction; + boolean requiresNextTopology; final AtomicReferenceArray> referenceArray; final DistributedCacheStream.SegmentListenerNotifier listenerNotifier; - KeyTrackingConsumer(KeyPartitioner keyPartitioner, ConsistentHash ch, Consumer consumer, + KeyTrackingConsumer(KeyPartitioner keyPartitioner, int numSegments, Consumer consumer, Function, V> valueFunction, DistributedCacheStream.SegmentListenerNotifier completedSegments) { this.keyPartitioner = keyPartitioner; - this.ch = ch; this.consumer = consumer; this.valueFunction = valueFunction; this.listenerNotifier = completedSegments; - this.referenceArray = new AtomicReferenceArray<>(ch.getNumSegments()); + this.referenceArray = new AtomicReferenceArray<>(numSegments); for (int i = 0; i < referenceArray.length(); ++i) { // We only allow 1 request per id referenceArray.set(i, new HashSet<>()); @@ -515,6 +536,11 @@ public void onSegmentsLost(Set segments) { } } + @Override + public void requestFutureTopology() { + requiresNextTopology = true; + } + @Override public void sendDataResonse(Collection> response) { onIntermediateResult(null, response); @@ -525,6 +551,7 @@ static class ResultsAccumulator implements ClusterStreamManager.ResultsCallba private final BinaryOperator binaryOperator; private final Set lostSegments = new ConcurrentHashSet<>(); R currentValue; + boolean requiresNextTopology; ResultsAccumulator(BinaryOperator binaryOperator) { this.binaryOperator = binaryOperator; @@ -556,6 +583,11 @@ public void onSegmentsLost(Set segments) { lostSegments.add(segment); } } + + @Override + public void requestFutureTopology() { + requiresNextTopology = true; + } } static class CollectionConsumer implements ClusterStreamManager.ResultsCallback>, @@ -583,6 +615,10 @@ public void onCompletion(Address address, Set completedSegments, Collec public void onSegmentsLost(Set segments) { } + @Override + public void requestFutureTopology() { + } + @Override public void sendDataResonse(Collection response) { onIntermediateResult(null, response); diff --git a/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManager.java b/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManager.java index 6e118f166ba3..098de4623252 100644 --- a/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManager.java +++ b/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManager.java @@ -8,6 +8,7 @@ import java.util.function.Predicate; import org.infinispan.CacheStream; +import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.remoting.transport.Address; /** @@ -47,6 +48,11 @@ interface ResultsCallback { * @param segments The segments that were requested but are now local */ void onSegmentsLost(Set segments); + + /** + * Called when a an owner of a segment is not available in the provided {@link ConsistentHash} + */ + void requestFutureTopology(); } /** @@ -54,6 +60,7 @@ interface ResultsCallback { * @param the type of response * @param parallelDistribution whether or not parallel distribution is enabled * @param parallelStream whether or not the stream is paralllel + * @param ch the consistent hash to use when determining segment ownership * @param segments the segments that this request should utilize * @param keysToInclude which keys to include in the request * @param keysToExclude which keys to exclude in the request @@ -63,15 +70,16 @@ interface ResultsCallback { * @param earlyTerminatePredicate a predicate to determine if this operation should stop based on intermediate results * @return the operation id to be used for further calls */ - Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate); + Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate); /** * Performs the remote stream operation with rehash awareness. * @param the type of response * @param parallelDistribution whether or not parallel distribution is enabled * @param parallelStream whether or not the stream is paralllel + * @param ch the consistent hash to use when determining segment ownership * @param segments the segments that this request should utilize * @param keysToInclude which keys to include in the request * @param keysToExclude which keys to exclude in the request @@ -81,15 +89,16 @@ Object remoteStreamOperation(boolean parallelDistribution, boolean parallelS * @param earlyTerminatePredicate a predicate to determine if this operation should stop based on intermediate results * @return the operation id to be used for further calls */ - Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate); + Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate); /** * Key tracking remote operation that doesn't have rehash enabled. * @param the type of response * @param parallelDistribution whether or not parallel distribution is enabled * @param parallelStream whether or not the stream is paralllel + * @param ch the consistent hash to use when determining segment ownership * @param segments the segments that this request should utilize * @param keysToInclude which keys to include in the request * @param keysToExclude which keys to exclude in the request @@ -98,15 +107,16 @@ Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolea * @param callback the callback to collect individual node results * @return the operation id to be used for further calls */ - Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - KeyTrackingTerminalOperation operation, ResultsCallback> callback); + Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + KeyTrackingTerminalOperation operation, ResultsCallback> callback); /** * Key tracking remote operation that has rehash enabled * @param the type of response * @param parallelDistribution whether or not parallel distribution is enabled * @param parallelStream whether or not the stream is paralllel + * @param ch the consistent hash to use when determining segment ownership * @param segments the segments that this request should utilize * @param keysToInclude which keys to include in the request * @param keysToExclude which keys to exclude in the request @@ -115,11 +125,11 @@ Object remoteStreamOperation(boolean parallelDistribution, boolean parallelS * @param callback the callback to collect individual node results * @return the operation id to be used for further calls */ - Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, - Map> keysToExclude, boolean includeLoader, - KeyTrackingTerminalOperation operation, - ResultsCallback> callback); + Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, + Map> keysToExclude, boolean includeLoader, + KeyTrackingTerminalOperation operation, + ResultsCallback> callback); /** * Tests whether this operation is still pending or not. diff --git a/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java b/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java index 4489b114df22..43f91520845c 100644 --- a/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java +++ b/core/src/main/java/org/infinispan/stream/impl/ClusterStreamManagerImpl.java @@ -2,14 +2,12 @@ import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -20,9 +18,6 @@ import org.infinispan.commands.CommandsFactory; import org.infinispan.commons.CacheException; import org.infinispan.commons.util.SmallIntSet; -import org.infinispan.configuration.cache.Configuration; -import org.infinispan.distribution.DistributionManager; -import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; @@ -30,7 +25,6 @@ import org.infinispan.remoting.rpc.RpcManager; import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.jgroups.SuspectException; -import org.infinispan.statetransfer.StateTransferLock; import org.infinispan.util.RangeSet; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -44,22 +38,15 @@ public class ClusterStreamManagerImpl implements ClusterStreamManager { protected final AtomicInteger requestId = new AtomicInteger(); protected RpcManager rpc; protected CommandsFactory factory; - protected DistributionManager dm; - protected StateTransferLock stateTransferLock; - protected Configuration configuration; protected Address localAddress; protected final static Log log = LogFactory.getLog(ClusterStreamManagerImpl.class); @Inject - public void inject(RpcManager rpc, CommandsFactory factory, DistributionManager dm, - StateTransferLock stateTransferLock, Configuration configuration) { + public void inject(RpcManager rpc, CommandsFactory factory) { this.rpc = rpc; this.factory = factory; - this.dm = dm; - this.stateTransferLock = stateTransferLock; - this.configuration = configuration; } @Start @@ -68,29 +55,29 @@ public void start() { } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { - return commonRemoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { + return commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL, earlyTerminatePredicate); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, TerminalOperation operation, ResultsCallback callback, - Predicate earlyTerminatePredicate) { - return commonRemoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, TerminalOperation operation, ResultsCallback callback, + Predicate earlyTerminatePredicate) { + return commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL_REHASH, earlyTerminatePredicate); } private Object commonRemoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, SegmentAwareOperation operation, ResultsCallback callback, - StreamRequestCommand.Type type, Predicate earlyTerminatePredicate) { - Map> targets = determineTargets(segments); + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, SegmentAwareOperation operation, ResultsCallback callback, + StreamRequestCommand.Type type, Predicate earlyTerminatePredicate) { + Map> targets = determineTargets(ch, segments, callback); String id; if (!targets.isEmpty()) { id = localAddress.toString() + requestId.getAndIncrement(); @@ -111,26 +98,26 @@ private Object commonRemoteStreamOperation(boolean parallelDistribution, boo } } } else { - log.tracef("Not performing remote operation for request as no valid targets found"); + log.tracef("Not performing remote operation for request as no valid targets for segments %s found", segments); id = null; } return id; } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - KeyTrackingTerminalOperation operation, ResultsCallback> callback) { - return commonRemoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + KeyTrackingTerminalOperation operation, ResultsCallback> callback) { + return commonRemoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, StreamRequestCommand.Type.TERMINAL_KEY, null); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, KeyTrackingTerminalOperation operation, - ResultsCallback> callback) { - Map> targets = determineTargets(segments); + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, KeyTrackingTerminalOperation operation, + ResultsCallback> callback) { + Map> targets = determineTargets(ch, segments, callback); String id; if (!targets.isEmpty()) { id = localAddress.toString() + "-" + requestId.getAndIncrement(); @@ -172,7 +159,7 @@ public Object remoteStreamOperationRehashAware(boolean parallelDistribution } } } else { - log.tracef("Not performing remote rehash key aware operation for request as no valid targets found"); + log.tracef("Not performing remote rehash key aware operation for request as no valid targets for segments %s found", segments); id = null; } return id; @@ -265,34 +252,18 @@ private Set determineExcludedKeys(Map> keysToExclude, Set> determineTargets(Set segments) { - LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); - ConsistentHash ch = cacheTopology.getReadConsistentHash(); + private Map> determineTargets(ConsistentHash ch, Set segments, ResultsCallback callback) { if (segments == null) { segments = new RangeSet(ch.getNumSegments()); } // This has to be a concurrent hash map in case if a node completes operation while we are still iterating // over the map and submitting to others Map> targets = new ConcurrentHashMap<>(); - for (Iterator iterator = segments.iterator(); iterator.hasNext(); ) { - Integer segment = iterator.next(); + for (Integer segment : segments) { Address owner = ch.locatePrimaryOwnerForSegment(segment); if (owner == null) { - try { - // TODO: this is not the final solution as this makes invocation of some commands blocking - stateTransferLock.waitForTopology(cacheTopology.getTopologyId() + 1, - configuration.clustering().stateTransfer().timeout(), TimeUnit.MILLISECONDS); - // start from the beginning - cacheTopology = dm.getCacheTopology(); - ch = cacheTopology.getReadConsistentHash(); - iterator = segments.iterator(); - targets.clear(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new CacheException("Stream operation was interrupted", e); - } catch (TimeoutException e) { - throw new CacheException("Timed out waiting for a topology with owners for segment " + segment); - } + callback.onSegmentsLost(Collections.singleton(segment)); + callback.requestFutureTopology(); } else if (!owner.equals(localAddress)) { targets.computeIfAbsent(owner, t -> new SmallIntSet()).add(segment); } diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java index d93d7872e70f..8de8658c222f 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java @@ -54,6 +54,7 @@ import org.infinispan.commons.util.CloseableIterator; import org.infinispan.container.entries.CacheEntry; import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.factories.ComponentRegistry; import org.infinispan.remoting.transport.Address; @@ -595,7 +596,7 @@ private void ignoreRehashIteration(Consumer consumer, IteratorSupplier sup log.tracef("Thread %s submitted iterator request for stream", thread); if (!stayLocal) { - Object id = csm.remoteStreamOperation(iteratorParallelDistribute, parallel, segmentsToFilter, + Object id = csm.remoteStreamOperation(iteratorParallelDistribute, parallel, ch, segmentsToFilter, keysToFilter, Collections.>emptyMap(), includeLoader, op, remoteResults); // Make sure to run this after we submit to the manager so it can process the other nodes // asynchronously with the local operation @@ -641,8 +642,8 @@ private void rehashAwareIteration(AtomicBoolean complete, Consumer consumer, } else { listenerNotifier = null; } - KeyTrackingConsumer results = new KeyTrackingConsumer<>(keyPartitioner, segmentInfoCH, - iteratorOperation.wrapConsumer(consumer), iteratorOperation.getFunction(), + KeyTrackingConsumer results = new KeyTrackingConsumer<>(keyPartitioner, + segmentInfoCH.getNumSegments(), iteratorOperation.wrapConsumer(consumer), iteratorOperation.getFunction(), listenerNotifier); Thread thread = Thread.currentThread(); executor.execute(() -> { @@ -651,7 +652,8 @@ private void rehashAwareIteration(AtomicBoolean complete, Consumer consumer, Set segmentsToProcess = segmentsToFilter == null ? new RangeSet(segmentInfoCH.getNumSegments()) : segmentsToFilter; do { - ConsistentHash ch = dm.getReadConsistentHash(); + LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); + ConsistentHash ch = cacheTopology.getReadConsistentHash(); boolean runLocal = ch.getMembers().contains(localAddress); Set segments; Set excludedKeys; @@ -677,8 +679,8 @@ private void rehashAwareIteration(AtomicBoolean complete, Consumer consumer, intermediateOperations, supplierForSegments(ch, segmentsToProcess, excludedKeys, !stayLocal), distributedBatchSize); if (!stayLocal) { - Object id = csm.remoteStreamOperationRehashAware(iteratorParallelDistribute, parallel, - segmentsToProcess, keysToFilter, new AtomicReferenceArrayToMap<>(results.referenceArray), + Object id = csm.remoteStreamOperationRehashAware(iteratorParallelDistribute, parallel, ch, + segmentsToProcess, keysToFilter, new AtomicReferenceArrayToMap<>(results.referenceArray), includeLoader, op, results); if (id != null) { supplier.pending = id; @@ -697,14 +699,14 @@ intermediateOperations, supplierForSegments(ch, segmentsToProcess, excludedKeys, throw new CacheException(e); } } - segmentsToProcess = segmentsToProcess(supplier, results, segmentsToProcess, id); + segmentsToProcess = segmentsToProcess(supplier, results, segmentsToProcess, id, cacheTopology.getTopologyId()); } finally { csm.forgetOperation(id); } } else { performLocalRehashAwareOperation(results, segmentsToProcess, ch, segments, op, () -> ch.getSegmentsForOwner(localAddress), null); - segmentsToProcess = segmentsToProcess(supplier, results, segmentsToProcess, null); + segmentsToProcess = segmentsToProcess(supplier, results, segmentsToProcess, null, cacheTopology.getTopologyId()); } } while (!complete.get()); } catch (CacheException e) { @@ -718,12 +720,19 @@ intermediateOperations, supplierForSegments(ch, segmentsToProcess, excludedKeys, } private Set segmentsToProcess(IteratorSupplier supplier, KeyTrackingConsumer results, - Set segmentsToProcess, Object id) { + Set segmentsToProcess, Object id, int topologyId) { String strId = id == null ? "local" : id.toString(); if (!results.lostSegments.isEmpty()) { segmentsToProcess = new HashSet<>(results.lostSegments); results.lostSegments.clear(); log.tracef("Found %s lost segments for %s", segmentsToProcess, strId); + if (results.requiresNextTopology) { + try { + stateTransferLock.waitForTopology(topologyId + 1, timeout, timeoutUnit); + } catch (InterruptedException | java.util.concurrent.TimeoutException e) { + throw new CacheException(e); + } + } } else { supplier.close(); log.tracef("Finished rehash aware operation for %s", strId); diff --git a/core/src/main/java/org/infinispan/stream/impl/PartitionAwareClusterStreamManager.java b/core/src/main/java/org/infinispan/stream/impl/PartitionAwareClusterStreamManager.java index 653ee777ae88..52f6870ae0fe 100644 --- a/core/src/main/java/org/infinispan/stream/impl/PartitionAwareClusterStreamManager.java +++ b/core/src/main/java/org/infinispan/stream/impl/PartitionAwareClusterStreamManager.java @@ -8,6 +8,7 @@ import org.infinispan.Cache; import org.infinispan.configuration.cache.Configuration; +import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; import org.infinispan.notifications.Listener; @@ -70,40 +71,40 @@ public boolean awaitCompletion(Object id, long time, TimeUnit unit) throws Inter } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { checkPartitionStatus(); - return super.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + return super.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, earlyTerminatePredicate); } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - KeyTrackingTerminalOperation operation, ResultsCallback> callback) { + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + KeyTrackingTerminalOperation operation, ResultsCallback> callback) { checkPartitionStatus(); - return super.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + return super.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, TerminalOperation operation, ResultsCallback callback, - Predicate earlyTerminatePredicate) { + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, TerminalOperation operation, ResultsCallback callback, + Predicate earlyTerminatePredicate) { checkPartitionStatus(); - return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, + return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback, earlyTerminatePredicate); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, KeyTrackingTerminalOperation operation, - ResultsCallback> callback) { + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, KeyTrackingTerminalOperation operation, + ResultsCallback> callback) { checkPartitionStatus(); - return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, + return super.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, keysToExclude, includeLoader, operation, callback); } diff --git a/core/src/main/java/org/infinispan/stream/impl/tx/TxClusterStreamManager.java b/core/src/main/java/org/infinispan/stream/impl/tx/TxClusterStreamManager.java index 61ddaf5da38c..c704e797dda9 100644 --- a/core/src/main/java/org/infinispan/stream/impl/tx/TxClusterStreamManager.java +++ b/core/src/main/java/org/infinispan/stream/impl/tx/TxClusterStreamManager.java @@ -34,39 +34,39 @@ public TxClusterStreamManager(ClusterStreamManager manager, LocalTxInvocation } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + TerminalOperation operation, ResultsCallback callback, Predicate earlyTerminatePredicate) { TxExcludedKeys txExcludedKeys = new TxExcludedKeys<>(keysToExclude, ctx, hash); - return manager.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + return manager.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback, earlyTerminatePredicate); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, TerminalOperation operation, ResultsCallback callback, - Predicate earlyTerminatePredicate) { + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, TerminalOperation operation, ResultsCallback callback, + Predicate earlyTerminatePredicate) { TxExcludedKeys txExcludedKeys = new TxExcludedKeys<>(keysToExclude, ctx, hash); - return manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, + return manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback, earlyTerminatePredicate); } @Override - public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, - KeyTrackingTerminalOperation operation, ResultsCallback> callback) { + public Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch, + Set segments, Set keysToInclude, Map> keysToExclude, boolean includeLoader, + KeyTrackingTerminalOperation operation, ResultsCallback> callback) { TxExcludedKeys txExcludedKeys = new TxExcludedKeys<>(keysToExclude, ctx, hash); - return manager.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, + return manager.remoteStreamOperation(parallelDistribution, parallelStream, ch, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback); } @Override public Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, - Set segments, Set keysToInclude, Map> keysToExclude, - boolean includeLoader, KeyTrackingTerminalOperation operation, ResultsCallback> callback) { + ConsistentHash ch, Set segments, Set keysToInclude, Map> keysToExclude, + boolean includeLoader, KeyTrackingTerminalOperation operation, ResultsCallback> callback) { TxExcludedKeys txExcludedKeys = new TxExcludedKeys<>(keysToExclude, ctx, hash); - return manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, + return manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, ch, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback); }