Skip to content

Commit

Permalink
ISPN-8218 ScatteredStreamIteratorTest.waitUntilProcessingResults rand…
Browse files Browse the repository at this point in the history
…om failure

Reverts most of the stream related changes from ISPN-6645, fixing the
consistent hash instance used for both remote and local iteration. The
retry with newer topology is moved to xCacheStream instead.
Non-rehash-aware iteration through scattered cache may miss segments
that don't have owner during rehash.
  • Loading branch information
rvansa authored and slaskawi committed Aug 25, 2017
1 parent 8c00b86 commit 08060f5
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 121 deletions.
Expand Up @@ -25,11 +25,13 @@
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.stream.impl.termop.SegmentRetryingOperation;
import org.infinispan.stream.impl.termop.SingleRunOperation;
Expand Down Expand Up @@ -60,6 +62,7 @@ public abstract class AbstractCacheStream<T, S extends BaseStream<T, S>, S2 exte
protected final ComponentRegistry registry;
protected final PartitionHandlingManager partition;
protected final KeyPartitioner keyPartitioner;
protected final StateTransferLock stateTransferLock;

protected Runnable closeRunnable = null;

Expand Down Expand Up @@ -94,6 +97,7 @@ protected AbstractCacheStream(Address localAddress, boolean parallel, Distributi
this.registry = registry;
this.partition = registry.getComponent(PartitionHandlingManager.class);
this.keyPartitioner = registry.getComponent(KeyPartitioner.class);
this.stateTransferLock = registry.getComponent(StateTransferLock.class);
intermediateOperations = new ArrayDeque<>();
}

Expand All @@ -108,6 +112,7 @@ protected AbstractCacheStream(AbstractCacheStream<T, S, S2> other) {
this.registry = other.registry;
this.partition = other.partition;
this.keyPartitioner = other.keyPartitioner;
this.stateTransferLock = other.stateTransferLock;

this.closeRunnable = other.closeRunnable;

Expand Down Expand Up @@ -206,7 +211,7 @@ <R> R performOperation(Function<? super S2, ? extends R> function, ResultsAccumu
ConsistentHash ch = dm.getWriteConsistentHash();
TerminalOperation<R> op = new SingleRunOperation(intermediateOperations,
supplierForSegments(ch, segmentsToFilter, null), function);
Object id = csm.remoteStreamOperation(getParallelDistribution(), parallel, segmentsToFilter, keysToFilter,
Object id = csm.remoteStreamOperation(getParallelDistribution(), parallel, ch, segmentsToFilter, keysToFilter,
Collections.emptyMap(), includeLoader, op, remoteResults, earlyTerminatePredicate);
try {
R localValue = op.performOperation();
Expand Down Expand Up @@ -235,15 +240,16 @@ <R> R performOperationRehashAware(Function<? super S2, ? extends R> function, bo
Set<Integer> segmentsToProcess = segmentsToFilter;
TerminalOperation<R> op;
do {
ConsistentHash ch = dm.getReadConsistentHash();
LocalizedCacheTopology cacheTopology = dm.getCacheTopology();
ConsistentHash ch = cacheTopology.getReadConsistentHash();
if (retryOnRehash) {
op = new SegmentRetryingOperation(intermediateOperations, supplierForSegments(ch, segmentsToProcess,
null), function);
} else {
op = new SingleRunOperation(intermediateOperations, supplierForSegments(ch, segmentsToProcess, null),
function);
}
Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, segmentsToProcess,
Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess,
keysToFilter, Collections.emptyMap(), includeLoader, op, remoteResults, earlyTerminatePredicate);
try {
R localValue;
Expand Down Expand Up @@ -285,6 +291,13 @@ <R> R performOperationRehashAware(Function<? super S2, ? extends R> function, bo
segmentsToProcess = new HashSet<>(remoteResults.lostSegments);
remoteResults.lostSegments.clear();
log.tracef("Found %s lost segments for identifier %s", segmentsToProcess, id);
if (remoteResults.requiresNextTopology) {
try {
stateTransferLock.waitForTopology(cacheTopology.getTopologyId(), timeout, timeoutUnit);
} catch (InterruptedException | java.util.concurrent.TimeoutException e) {
throw new CacheException(e);
}
}
} else {
// If we didn't lose any segments we don't need to process anymore
if (segmentsToProcess != null) {
Expand All @@ -305,12 +318,13 @@ void performRehashKeyTrackingOperation(
final AtomicBoolean complete = new AtomicBoolean();

ConsistentHash segmentInfoCH = dm.getReadConsistentHash();
KeyTrackingConsumer<Object, Object> results = new KeyTrackingConsumer<>(keyPartitioner, segmentInfoCH, (c) -> {},
c -> c, null);
KeyTrackingConsumer<Object, Object> results = new KeyTrackingConsumer<>(keyPartitioner, segmentInfoCH.getNumSegments(), c -> {},
c -> c, null);
Set<Integer> segmentsToProcess = segmentsToFilter == null ?
new RangeSet(segmentInfoCH.getNumSegments()) : segmentsToFilter;
do {
ConsistentHash ch = dm.getReadConsistentHash();
LocalizedCacheTopology cacheTopology = dm.getCacheTopology();
ConsistentHash ch = cacheTopology.getReadConsistentHash();
boolean localRun = ch.getMembers().contains(localAddress);
Set<Integer> segments;
Set<Object> excludedKeys;
Expand All @@ -328,7 +342,7 @@ void performRehashKeyTrackingOperation(
KeyTrackingTerminalOperation<Object, ? extends T, Object> op = function.apply(supplierForSegments(ch,
segmentsToProcess, excludedKeys));
op.handleInjection(registry);
Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, segmentsToProcess,
Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess,
keysToFilter, new AtomicReferenceArrayToMap<>(results.referenceArray), includeLoader, op,
results);
try {
Expand Down Expand Up @@ -360,6 +374,14 @@ void performRehashKeyTrackingOperation(
segmentsToProcess = new HashSet<>(results.lostSegments);
results.lostSegments.clear();
log.tracef("Found %s lost segments for identifier %s", segmentsToProcess, id);
if (results.requiresNextTopology) {
try {
stateTransferLock.waitForTopology(cacheTopology.getTopologyId() + 1, timeout, timeoutUnit);
results.requiresNextTopology = false;
} catch (InterruptedException | java.util.concurrent.TimeoutException e) {
throw new CacheException(e);
}
}
} else {
log.tracef("Finished rehash aware operation for id %s", id);
complete.set(true);
Expand Down Expand Up @@ -425,26 +447,25 @@ public Set<Entry<Integer, R>> entrySet() {
class KeyTrackingConsumer<K, V> implements ClusterStreamManager.ResultsCallback<Collection<CacheEntry<K, Object>>>,
KeyTrackingTerminalOperation.IntermediateCollector<Collection<CacheEntry<K, Object>>> {
final KeyPartitioner keyPartitioner;
final ConsistentHash ch;
final Consumer<V> consumer;
final Set<Integer> lostSegments = new ConcurrentHashSet<>();
final Function<CacheEntry<K, Object>, V> valueFunction;
boolean requiresNextTopology;

final AtomicReferenceArray<Set<K>> referenceArray;

final DistributedCacheStream.SegmentListenerNotifier listenerNotifier;

KeyTrackingConsumer(KeyPartitioner keyPartitioner, ConsistentHash ch, Consumer<V> consumer,
KeyTrackingConsumer(KeyPartitioner keyPartitioner, int numSegments, Consumer<V> consumer,
Function<CacheEntry<K, Object>, V> valueFunction,
DistributedCacheStream.SegmentListenerNotifier completedSegments) {
this.keyPartitioner = keyPartitioner;
this.ch = ch;
this.consumer = consumer;
this.valueFunction = valueFunction;

this.listenerNotifier = completedSegments;

this.referenceArray = new AtomicReferenceArray<>(ch.getNumSegments());
this.referenceArray = new AtomicReferenceArray<>(numSegments);
for (int i = 0; i < referenceArray.length(); ++i) {
// We only allow 1 request per id
referenceArray.set(i, new HashSet<>());
Expand Down Expand Up @@ -515,6 +536,11 @@ public void onSegmentsLost(Set<Integer> segments) {
}
}

@Override
public void requestFutureTopology() {
requiresNextTopology = true;
}

@Override
public void sendDataResonse(Collection<CacheEntry<K, Object>> response) {
onIntermediateResult(null, response);
Expand All @@ -525,6 +551,7 @@ static class ResultsAccumulator<R> implements ClusterStreamManager.ResultsCallba
private final BinaryOperator<R> binaryOperator;
private final Set<Integer> lostSegments = new ConcurrentHashSet<>();
R currentValue;
boolean requiresNextTopology;

ResultsAccumulator(BinaryOperator<R> binaryOperator) {
this.binaryOperator = binaryOperator;
Expand Down Expand Up @@ -556,6 +583,11 @@ public void onSegmentsLost(Set<Integer> segments) {
lostSegments.add(segment);
}
}

@Override
public void requestFutureTopology() {
requiresNextTopology = true;
}
}

static class CollectionConsumer<R> implements ClusterStreamManager.ResultsCallback<Collection<R>>,
Expand Down Expand Up @@ -583,6 +615,10 @@ public void onCompletion(Address address, Set<Integer> completedSegments, Collec
public void onSegmentsLost(Set<Integer> segments) {
}

@Override
public void requestFutureTopology() {
}

@Override
public void sendDataResonse(Collection<R> response) {
onIntermediateResult(null, response);
Expand Down
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Predicate;

import org.infinispan.CacheStream;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;

/**
Expand Down Expand Up @@ -47,13 +48,19 @@ interface ResultsCallback<R> {
* @param segments The segments that were requested but are now local
*/
void onSegmentsLost(Set<Integer> segments);

/**
* Called when a an owner of a segment is not available in the provided {@link ConsistentHash}
*/
void requestFutureTopology();
}

/**
* Performs the remote stream operation without rehash awareness.
* @param <R> the type of response
* @param parallelDistribution whether or not parallel distribution is enabled
* @param parallelStream whether or not the stream is paralllel
* @param ch the consistent hash to use when determining segment ownership
* @param segments the segments that this request should utilize
* @param keysToInclude which keys to include in the request
* @param keysToExclude which keys to exclude in the request
Expand All @@ -63,15 +70,16 @@ interface ResultsCallback<R> {
* @param earlyTerminatePredicate a predicate to determine if this operation should stop based on intermediate results
* @return the operation id to be used for further calls
*/
<R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
TerminalOperation<R> operation, ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate);
<R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
TerminalOperation<R> operation, ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate);

/**
* Performs the remote stream operation with rehash awareness.
* @param <R> the type of response
* @param parallelDistribution whether or not parallel distribution is enabled
* @param parallelStream whether or not the stream is paralllel
* @param ch the consistent hash to use when determining segment ownership
* @param segments the segments that this request should utilize
* @param keysToInclude which keys to include in the request
* @param keysToExclude which keys to exclude in the request
Expand All @@ -81,15 +89,16 @@ <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelS
* @param earlyTerminatePredicate a predicate to determine if this operation should stop based on intermediate results
* @return the operation id to be used for further calls
*/
<R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
TerminalOperation<R> operation, ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate);
<R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
TerminalOperation<R> operation, ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate);

/**
* Key tracking remote operation that doesn't have rehash enabled.
* @param <R> the type of response
* @param parallelDistribution whether or not parallel distribution is enabled
* @param parallelStream whether or not the stream is paralllel
* @param ch the consistent hash to use when determining segment ownership
* @param segments the segments that this request should utilize
* @param keysToInclude which keys to include in the request
* @param keysToExclude which keys to exclude in the request
Expand All @@ -98,15 +107,16 @@ <R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolea
* @param callback the callback to collect individual node results
* @return the operation id to be used for further calls
*/
<R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
KeyTrackingTerminalOperation<K, R, ?> operation, ResultsCallback<Collection<R>> callback);
<R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch,
Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
KeyTrackingTerminalOperation<K, R, ?> operation, ResultsCallback<Collection<R>> callback);

/**
* Key tracking remote operation that has rehash enabled
* @param <R2> the type of response
* @param parallelDistribution whether or not parallel distribution is enabled
* @param parallelStream whether or not the stream is paralllel
* @param ch the consistent hash to use when determining segment ownership
* @param segments the segments that this request should utilize
* @param keysToInclude which keys to include in the request
* @param keysToExclude which keys to exclude in the request
Expand All @@ -115,11 +125,11 @@ <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelS
* @param callback the callback to collect individual node results
* @return the operation id to be used for further calls
*/
<R2> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream,
Set<Integer> segments, Set<K> keysToInclude,
Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
KeyTrackingTerminalOperation<K, ?, R2> operation,
ResultsCallback<Map<K, R2>> callback);
<R2> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, ConsistentHash ch,
Set<Integer> segments, Set<K> keysToInclude,
Map<Integer, Set<K>> keysToExclude, boolean includeLoader,
KeyTrackingTerminalOperation<K, ?, R2> operation,
ResultsCallback<Map<K, R2>> callback);

/**
* Tests whether this operation is still pending or not.
Expand Down

0 comments on commit 08060f5

Please sign in to comment.