Skip to content

Commit

Permalink
ISPN-8708 Remote iteration with limited batch sends finished segments
Browse files Browse the repository at this point in the history
* Use KeyWatchingListener for each publisher
  • Loading branch information
wburns authored and rvansa committed Feb 1, 2018
1 parent 32a4db4 commit ddc4f55
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 130 deletions.
@@ -0,0 +1,65 @@
package org.infinispan.stream.impl;

import java.util.PrimitiveIterator;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.reactivestreams.Publisher;

import io.reactivex.Flowable;

/**
* Abstract publisher decorator that is used to notify segment listener of loss of segments while entries are
* being retrieved.
* @author wburns
* @since 9.0
*/
public abstract class AbstractRehashPublisherDecorator<S> implements PublisherDecorator<S> {
final AbstractCacheStream.IteratorOperation iteratorOperation;
final DistributionManager dm;
final Address localAddress;
final Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments;
final Consumer<Object> keyConsumer;

AbstractRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm,
Address localAddress, Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments,
Consumer<Object> keyConsumer) {
this.iteratorOperation = iteratorOperation;
this.dm = dm;
this.localAddress = localAddress;
this.lostSegments = lostSegments;
this.keyConsumer = keyConsumer;
}

abstract Log getLog();

Publisher<S> decorateLocal(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments,
ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter,
Publisher<S> localPublisher) {
Publisher<S> convertedPublisher = Flowable.fromPublisher(localPublisher).doOnComplete(() -> {
IntSet ourSegments;
if (onlyLocal) {
ourSegments = SmallIntSet.from(beginningCh.getSegmentsForOwner(localAddress));
} else {
ourSegments = SmallIntSet.from(beginningCh.getPrimarySegmentsForOwner(localAddress));
}
ourSegments.retainAll(segmentsToFilter);
// This will notify both completed and suspect of segments that may not even exist or were completed before
// on a rehash
if (dm.getReadConsistentHash().equals(beginningCh)) {
getLog().tracef("Local iterator has completed segments %s", ourSegments);
completedSegments.accept((Supplier<PrimitiveIterator.OfInt>) ourSegments::iterator);
} else {
getLog().tracef("Local iterator segments %s are all suspect as consistent hash has changed", ourSegments);
lostSegments.accept((Supplier<PrimitiveIterator.OfInt>) ourSegments::iterator);
}
});
return iteratorOperation.handlePublisher(convertedPublisher, keyConsumer);
}
}
@@ -1,33 +1,95 @@
package org.infinispan.stream.impl; package org.infinispan.stream.impl;


import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.PrimitiveIterator; import java.util.PrimitiveIterator;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.infinispan.commons.util.IntSet;
import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;


import io.reactivex.Flowable; import io.reactivex.Flowable;


/** /**
* PublisherDecorator that only notifies a user listener of segment completion after the last entry for a given
* segment has been retrieved from iteration.
* @author wburns * @author wburns
* @since 9.0 * @since 9.0
*/ */
public class CompletionRehashPublisherDecorator<S> extends RehashPublisherDecorator<S> { public class CompletionRehashPublisherDecorator<S> extends RehashPublisherDecorator<S> {
private final KeyWatchingCompletionListener completionListener; private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener;
private final List<KeyWatchingCompletionListener> completionListeners;


CompletionRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm, CompletionRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm,
Address localAddress, KeyWatchingCompletionListener completionListener, Address localAddress, Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener,
Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments, Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments,
Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) { Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) {
super(iteratorOperation, dm, localAddress, completedSegments, lostSegments, keyConsumer); super(iteratorOperation, dm, localAddress, completedSegments, lostSegments, keyConsumer);
this.completionListener = completionListener; this.userListener = userListener;
this.completionListeners = Collections.synchronizedList(new ArrayList<>(4));
}

public void valueIterated(Object obj) {
for (KeyWatchingCompletionListener kwcl : completionListeners) {
kwcl.valueIterated(obj);
}
}

public void complete() {
completionListeners.forEach(KeyWatchingCompletionListener::completed);
} }


@Override @Override
protected Publisher<S> decorateBeforeReturn(Publisher<S> publisher) { Log getLog() {
return Flowable.fromPublisher(super.decorateBeforeReturn(publisher)).doOnNext( return log;
completionListener::valueAdded); }

@Override
public Publisher<S> decorateRemote(ClusterStreamManager.RemoteIteratorPublisher<S> remotePublisher) {
// We have to have a listener per remote publisher, since we receive results concurrently and we
// can't properly track the completion of keys per segment without them being separated
KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(userListener);
completionListeners.add(kwcl);

Publisher<S> convertedPublisher = s -> remotePublisher.subscribe(s, i -> {
// Remote we always notify the provided completed segments as it tracks segment completion
// for retries
completedSegments.accept(i);
// however we have to wait before notifying the user segment completion until
// we iterate upon the last key of the block of segments
kwcl.accept(i);
}, lostSegments);
// We have to track each key received from this publisher as it would map to all segments when completed
return Flowable.fromPublisher(iteratorOperation.handlePublisher(convertedPublisher, keyConsumer)).doOnNext(
kwcl::valueAdded);
}

@Override
public Publisher<S> decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter,
Publisher<S> localPublisher) {
KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(userListener);
completionListeners.add(kwcl);

Publisher<S> convertedLocalPublisher = decorateLocal(i -> {
// Remote we always notify the provided completed segments as it tracks segment completion
// for retries
completedSegments.accept(i);
// however we have to wait before notifying the user segment completion until
// we iterate upon the last key of the block of segments
kwcl.accept(i);
}, beginningCh, onlyLocal, segmentsToFilter, localPublisher);
// We have to track each key received from this publisher as it would map to all segments when completed
return Flowable.fromPublisher(iteratorOperation.handlePublisher(convertedLocalPublisher, keyConsumer))
.doOnNext(kwcl::valueAdded);
} }
} }
Expand Up @@ -576,34 +576,34 @@ public void close() {
* @param <S> * @param <S>
*/ */
private class CompletionListenerRehashIterator<S> extends RehashIterator<S> { private class CompletionListenerRehashIterator<S> extends RehashIterator<S> {
private final KeyWatchingCompletionListener completionListener; private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener;

private volatile CompletionRehashPublisherDecorator completionRehashPublisherDecorator;


private CompletionListenerRehashIterator(Iterable<IntermediateOperation> intermediateOperations, private CompletionListenerRehashIterator(Iterable<IntermediateOperation> intermediateOperations,
Consumer<? super Supplier<PrimitiveIterator.OfInt>> completionListener) { Consumer<? super Supplier<PrimitiveIterator.OfInt>> userListener) {
super(intermediateOperations); super(intermediateOperations);
this.completionListener = new KeyWatchingCompletionListener(completionListener); this.userListener = userListener;
} }


@Override @Override
protected S getNext() { protected S getNext() {
S next = super.getNext(); S next = super.getNext();
if (next != null) { if (next != null) {
completionListener.valueIterated(next); completionRehashPublisherDecorator.valueIterated(next);
} else { } else {
completionListener.completed(); completionRehashPublisherDecorator.complete();
} }
return next; return next;
} }


@Override @Override
PublisherDecorator<S> publisherDecorator(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments, PublisherDecorator<S> publisherDecorator(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completedSegments,
Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) { Consumer<? super Supplier<PrimitiveIterator.OfInt>> lostSegments, Consumer<Object> keyConsumer) {
Consumer<? super Supplier<PrimitiveIterator.OfInt>> ourCompleted = i -> { completionRehashPublisherDecorator = new CompletionRehashPublisherDecorator<>(iteratorOperation, dm,
completionListener.segmentsEncountered(i); localAddress, userListener, completedSegments, lostSegments, keyConsumer);
completedSegments.accept(i);
}; return completionRehashPublisherDecorator;
return new CompletionRehashPublisherDecorator<>(iteratorOperation, dm, localAddress, completionListener,
ourCompleted, lostSegments, keyConsumer);
} }
} }


Expand Down
Expand Up @@ -5,6 +5,7 @@
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;


/** /**
* PublishDecorator that just returns the publisher provided by the caller
* @author wburns * @author wburns
* @since 9.0 * @since 9.0
*/ */
Expand Down
Expand Up @@ -4,78 +4,87 @@
import java.util.PrimitiveIterator; import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.infinispan.commons.util.ByRef;

/** /**
* Only notifies a completion listener when the last key for a segment has been found. The last key for a segment * Only notifies a completion listener when the last key for a segment has been found. The last key for a segment
* is assumed to be the last key seen {@link KeyWatchingCompletionListener#valueAdded(Object)} before segments * is assumed to be the last key seen {@link #valueAdded(Object)} before segments
* are encountered {@link KeyWatchingCompletionListener#segmentsEncountered(Supplier)}. * are encountered {@link #accept(Supplier)}. Note that this listener can be used for multiple calls for segments
* but it will always follow {0 - N} valueAdded invocations and then {0 - 1} accept method invocations. The
* accept method could be invoked 0 times if all segments are lost on the remote node. Also this invocation chain
* of valueAdded and accept may be done multiple times if there are multiple nodes such that they outnumber the
* number of remote publishers created.
*/ */
class KeyWatchingCompletionListener { class KeyWatchingCompletionListener {
private final AtomicReference<Object> currentKey = new AtomicReference<>(); private AtomicReference<Object> currentKey = new AtomicReference<>();
private final Map<Object, Supplier<PrimitiveIterator.OfInt>> pendingSegments = new ConcurrentHashMap<>();
private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> completionListener; private final Consumer<? super Supplier<PrimitiveIterator.OfInt>> completionListener;
// The next 2 variables are possible assuming that the iterator is not used concurrently. This way we don't private final Map<Object, Supplier<PrimitiveIterator.OfInt>> pendingSegments = new ConcurrentHashMap<>();
// have to allocate them on every entry iterated
private final ByRef<Supplier<PrimitiveIterator.OfInt>> ref = new ByRef<>(null);
private final BiFunction<Object, Supplier<PrimitiveIterator.OfInt>, Supplier<PrimitiveIterator.OfInt>> iteratorMapping;


KeyWatchingCompletionListener(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completionListener) { KeyWatchingCompletionListener(Consumer<? super Supplier<PrimitiveIterator.OfInt>> completionListener) {
this.completionListener = completionListener; this.completionListener = completionListener;
this.iteratorMapping = (k, v) -> {
if (v != null) {
ref.set(v);
}
currentKey.compareAndSet(k, null);
return null;
};
} }


/**
* Method invoked for each entry added to the stream passing only the key
* @param key key of entry added to stream
*/
public void valueAdded(Object key) { public void valueAdded(Object key) {
currentKey.set(key); currentKey.set(key);
} }


public void valueIterated(Object key) { /**
pendingSegments.compute(key, iteratorMapping); * Method to be invoked after all entries have been passed to the stream that belong to these segments
Supplier<PrimitiveIterator.OfInt> segments = ref.get(); * @param segments the segments that had all entries passed down
if (segments != null) { */
ref.set(null); public void accept(Supplier<PrimitiveIterator.OfInt> segments) {
completionListener.accept(segments); Supplier<PrimitiveIterator.OfInt> notifyThese;
}
}

public void segmentsEncountered(Supplier<PrimitiveIterator.OfInt> segments) {
// This code assumes that valueAdded and segmentsEncountered are not invoked concurrently and that all values
// added for a response before the segments are completed.
// The valueIterated method can be invoked at any point however.
// See ClusterStreamManagerImpl$ClusterStreamSubscription.sendRequest where the added and segments are called into
ByRef<Supplier<PrimitiveIterator.OfInt>> segmentsToNotify = new ByRef<>(segments);
Object key = currentKey.get(); Object key = currentKey.get();
if (key != null) { if (key != null) {
pendingSegments.compute(key, (k, v) -> { pendingSegments.put(key, segments);
// The iterator has consumed all the keys, so there is no reason to wait: just notify of segment // We now try to go back and set current key to null
// completion immediately if (currentKey.getAndSet(null) == null) {
if (currentKey.get() == null) { // If it was already null that means we returned our key via the iterator below
return null; // In this case they may or may not have seen the pendingSegments so if they didn't we have to
} // notify ourselves
// This means we didn't iterate on a key before segments were completed - means it was empty. The notifyThese = pendingSegments.remove(key);
// valueIterated notifies the completion of non-empty segments, but we need to notify the completion of } else {
// empty segments here // Means that the iteration will see this
segmentsToNotify.set(v); notifyThese = null;
return segments; }
}); } else {
// This means that we got a set of segments that had no entries in them or the iterator
// consumed all entries, so just notify right away
notifyThese = segments;
} }
Supplier<PrimitiveIterator.OfInt> notifyThese = segmentsToNotify.get();
if (notifyThese != null) { if (notifyThese != null) {
completionListener.accept(notifyThese); completionListener.accept(notifyThese);
} }
} }


/**
* This method is to be invoked on possibly a different thread at any point which states that a key has
* been iterated upon. This is the signal that if a set of segments is waiting for a key to be iterated upon
* to notify the iteration
* @param key the key just returning
*/
public void valueIterated(Object key) {
// If we set to null that tells segment completion to just notify above in accept
if (!currentKey.compareAndSet(key, null)) {
// Otherwise we have to check if this key was linked to a group of pending segments
Supplier<PrimitiveIterator.OfInt> segments = pendingSegments.remove(key);
if (segments != null) {
completionListener.accept(segments);
}
}
}

/**
* Invoked after the iterator has completed iterating upon all entries
*/
public void completed() { public void completed() {
pendingSegments.forEach((k, v) -> completionListener.accept(v)); // This should always be empty
assert pendingSegments.isEmpty() : "pendingSegments should be empty but was: " + pendingSegments;
} }
} }
Expand Up @@ -5,12 +5,27 @@
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;


/** /**
* Decorator that decorates publishers based on if it is local or remote to handle segment completions
* @author wburns * @author wburns
* @since 9.0 * @since 9.0
*/ */
interface PublisherDecorator<S> { interface PublisherDecorator<S> {
/**
* Invoked for each remote publisher to provide additional functionality
* @param remotePublisher the provided remote publisher
* @return the resulting publisher (usually wrapped in some way)
*/
Publisher<S> decorateRemote(ClusterStreamManager.RemoteIteratorPublisher<S> remotePublisher); Publisher<S> decorateRemote(ClusterStreamManager.RemoteIteratorPublisher<S> remotePublisher);


/**
* Invoked for a local publisher, which only completes segments if the consistent hash after completion is the same
* as the one provided
* @param beginningCh the consistent has to test against
* @param onlyLocal whether this publisher is only done locally (that is there are no other remote publishers)
* @param segmentsToFilter the segments to use for this invocation
* @param localPublisher the internal local publisher
* @return the resulting publisher (usually wrapped in some way)
*/
Publisher<S> decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, Publisher<S> decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter,
Publisher<S> localPublisher); Publisher<S> localPublisher);
} }

0 comments on commit ddc4f55

Please sign in to comment.