From ddc4f5508fed85496448447820b7d1b8c0c87a4d Mon Sep 17 00:00:00 2001 From: William Burns Date: Wed, 31 Jan 2018 14:01:22 -0500 Subject: [PATCH] ISPN-8708 Remote iteration with limited batch sends finished segments * Use KeyWatchingListener for each publisher --- .../AbstractRehashPublisherDecorator.java | 65 +++++++++++ .../CompletionRehashPublisherDecorator.java | 74 ++++++++++++- .../stream/impl/DistributedCacheStream.java | 22 ++-- .../impl/IdentityPublisherDecorator.java | 1 + .../impl/KeyWatchingCompletionListener.java | 103 ++++++++++-------- .../stream/impl/PublisherDecorator.java | 15 +++ .../stream/impl/RehashPublisherDecorator.java | 48 ++------ ...ompletionRehashPublisherDecoratorTest.java | 58 +++++----- .../impl/TestRemoteIteratorPublisher.java | 1 - 9 files changed, 257 insertions(+), 130 deletions(-) create mode 100644 core/src/main/java/org/infinispan/stream/impl/AbstractRehashPublisherDecorator.java diff --git a/core/src/main/java/org/infinispan/stream/impl/AbstractRehashPublisherDecorator.java b/core/src/main/java/org/infinispan/stream/impl/AbstractRehashPublisherDecorator.java new file mode 100644 index 000000000000..b7e9829f8209 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/AbstractRehashPublisherDecorator.java @@ -0,0 +1,65 @@ +package org.infinispan.stream.impl; + +import java.util.PrimitiveIterator; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.infinispan.commons.util.IntSet; +import org.infinispan.commons.util.SmallIntSet; +import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.ch.ConsistentHash; +import org.infinispan.remoting.transport.Address; +import org.infinispan.util.logging.Log; +import org.reactivestreams.Publisher; + +import io.reactivex.Flowable; + +/** + * Abstract publisher decorator that is used to notify segment listener of loss of segments while entries are + * being retrieved. + * @author wburns + * @since 9.0 + */ +public abstract class AbstractRehashPublisherDecorator implements PublisherDecorator { + final AbstractCacheStream.IteratorOperation iteratorOperation; + final DistributionManager dm; + final Address localAddress; + final Consumer> lostSegments; + final Consumer keyConsumer; + + AbstractRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm, + Address localAddress, Consumer> lostSegments, + Consumer keyConsumer) { + this.iteratorOperation = iteratorOperation; + this.dm = dm; + this.localAddress = localAddress; + this.lostSegments = lostSegments; + this.keyConsumer = keyConsumer; + } + + abstract Log getLog(); + + Publisher decorateLocal(Consumer> completedSegments, + ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, + Publisher localPublisher) { + Publisher convertedPublisher = Flowable.fromPublisher(localPublisher).doOnComplete(() -> { + IntSet ourSegments; + if (onlyLocal) { + ourSegments = SmallIntSet.from(beginningCh.getSegmentsForOwner(localAddress)); + } else { + ourSegments = SmallIntSet.from(beginningCh.getPrimarySegmentsForOwner(localAddress)); + } + ourSegments.retainAll(segmentsToFilter); + // This will notify both completed and suspect of segments that may not even exist or were completed before + // on a rehash + if (dm.getReadConsistentHash().equals(beginningCh)) { + getLog().tracef("Local iterator has completed segments %s", ourSegments); + completedSegments.accept((Supplier) ourSegments::iterator); + } else { + getLog().tracef("Local iterator segments %s are all suspect as consistent hash has changed", ourSegments); + lostSegments.accept((Supplier) ourSegments::iterator); + } + }); + return iteratorOperation.handlePublisher(convertedPublisher, keyConsumer); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java b/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java index 8fb907f4f4a0..34bd6a4485ae 100644 --- a/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java +++ b/core/src/main/java/org/infinispan/stream/impl/CompletionRehashPublisherDecorator.java @@ -1,33 +1,95 @@ package org.infinispan.stream.impl; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.PrimitiveIterator; import java.util.function.Consumer; import java.util.function.Supplier; +import org.infinispan.commons.util.IntSet; import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.remoting.transport.Address; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; import org.reactivestreams.Publisher; import io.reactivex.Flowable; /** + * PublisherDecorator that only notifies a user listener of segment completion after the last entry for a given + * segment has been retrieved from iteration. * @author wburns * @since 9.0 */ public class CompletionRehashPublisherDecorator extends RehashPublisherDecorator { - private final KeyWatchingCompletionListener completionListener; + private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass()); + + private final Consumer> userListener; + private final List completionListeners; CompletionRehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm, - Address localAddress, KeyWatchingCompletionListener completionListener, + Address localAddress, Consumer> userListener, Consumer> completedSegments, Consumer> lostSegments, Consumer keyConsumer) { super(iteratorOperation, dm, localAddress, completedSegments, lostSegments, keyConsumer); - this.completionListener = completionListener; + this.userListener = userListener; + this.completionListeners = Collections.synchronizedList(new ArrayList<>(4)); + } + + public void valueIterated(Object obj) { + for (KeyWatchingCompletionListener kwcl : completionListeners) { + kwcl.valueIterated(obj); + } + } + + public void complete() { + completionListeners.forEach(KeyWatchingCompletionListener::completed); } @Override - protected Publisher decorateBeforeReturn(Publisher publisher) { - return Flowable.fromPublisher(super.decorateBeforeReturn(publisher)).doOnNext( - completionListener::valueAdded); + Log getLog() { + return log; + } + + @Override + public Publisher decorateRemote(ClusterStreamManager.RemoteIteratorPublisher remotePublisher) { + // We have to have a listener per remote publisher, since we receive results concurrently and we + // can't properly track the completion of keys per segment without them being separated + KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(userListener); + completionListeners.add(kwcl); + + Publisher convertedPublisher = s -> remotePublisher.subscribe(s, i -> { + // Remote we always notify the provided completed segments as it tracks segment completion + // for retries + completedSegments.accept(i); + // however we have to wait before notifying the user segment completion until + // we iterate upon the last key of the block of segments + kwcl.accept(i); + }, lostSegments); + // We have to track each key received from this publisher as it would map to all segments when completed + return Flowable.fromPublisher(iteratorOperation.handlePublisher(convertedPublisher, keyConsumer)).doOnNext( + kwcl::valueAdded); + } + + @Override + public Publisher decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, + Publisher localPublisher) { + KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(userListener); + completionListeners.add(kwcl); + + Publisher convertedLocalPublisher = decorateLocal(i -> { + // Remote we always notify the provided completed segments as it tracks segment completion + // for retries + completedSegments.accept(i); + // however we have to wait before notifying the user segment completion until + // we iterate upon the last key of the block of segments + kwcl.accept(i); + }, beginningCh, onlyLocal, segmentsToFilter, localPublisher); + // We have to track each key received from this publisher as it would map to all segments when completed + return Flowable.fromPublisher(iteratorOperation.handlePublisher(convertedLocalPublisher, keyConsumer)) + .doOnNext(kwcl::valueAdded); } } diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java index d01f73e7cba8..7a7f022256de 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java @@ -576,21 +576,23 @@ public void close() { * @param */ private class CompletionListenerRehashIterator extends RehashIterator { - private final KeyWatchingCompletionListener completionListener; + private final Consumer> userListener; + + private volatile CompletionRehashPublisherDecorator completionRehashPublisherDecorator; private CompletionListenerRehashIterator(Iterable intermediateOperations, - Consumer> completionListener) { + Consumer> userListener) { super(intermediateOperations); - this.completionListener = new KeyWatchingCompletionListener(completionListener); + this.userListener = userListener; } @Override protected S getNext() { S next = super.getNext(); if (next != null) { - completionListener.valueIterated(next); + completionRehashPublisherDecorator.valueIterated(next); } else { - completionListener.completed(); + completionRehashPublisherDecorator.complete(); } return next; } @@ -598,12 +600,10 @@ protected S getNext() { @Override PublisherDecorator publisherDecorator(Consumer> completedSegments, Consumer> lostSegments, Consumer keyConsumer) { - Consumer> ourCompleted = i -> { - completionListener.segmentsEncountered(i); - completedSegments.accept(i); - }; - return new CompletionRehashPublisherDecorator<>(iteratorOperation, dm, localAddress, completionListener, - ourCompleted, lostSegments, keyConsumer); + completionRehashPublisherDecorator = new CompletionRehashPublisherDecorator<>(iteratorOperation, dm, + localAddress, userListener, completedSegments, lostSegments, keyConsumer); + + return completionRehashPublisherDecorator; } } diff --git a/core/src/main/java/org/infinispan/stream/impl/IdentityPublisherDecorator.java b/core/src/main/java/org/infinispan/stream/impl/IdentityPublisherDecorator.java index 10497bc9f05b..dc144f8c8e47 100644 --- a/core/src/main/java/org/infinispan/stream/impl/IdentityPublisherDecorator.java +++ b/core/src/main/java/org/infinispan/stream/impl/IdentityPublisherDecorator.java @@ -5,6 +5,7 @@ import org.reactivestreams.Publisher; /** + * PublishDecorator that just returns the publisher provided by the caller * @author wburns * @since 9.0 */ diff --git a/core/src/main/java/org/infinispan/stream/impl/KeyWatchingCompletionListener.java b/core/src/main/java/org/infinispan/stream/impl/KeyWatchingCompletionListener.java index 766291d40091..d3384b707f55 100644 --- a/core/src/main/java/org/infinispan/stream/impl/KeyWatchingCompletionListener.java +++ b/core/src/main/java/org/infinispan/stream/impl/KeyWatchingCompletionListener.java @@ -4,78 +4,87 @@ import java.util.PrimitiveIterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; -import org.infinispan.commons.util.ByRef; - /** * Only notifies a completion listener when the last key for a segment has been found. The last key for a segment - * is assumed to be the last key seen {@link KeyWatchingCompletionListener#valueAdded(Object)} before segments - * are encountered {@link KeyWatchingCompletionListener#segmentsEncountered(Supplier)}. + * is assumed to be the last key seen {@link #valueAdded(Object)} before segments + * are encountered {@link #accept(Supplier)}. Note that this listener can be used for multiple calls for segments + * but it will always follow {0 - N} valueAdded invocations and then {0 - 1} accept method invocations. The + * accept method could be invoked 0 times if all segments are lost on the remote node. Also this invocation chain + * of valueAdded and accept may be done multiple times if there are multiple nodes such that they outnumber the + * number of remote publishers created. */ class KeyWatchingCompletionListener { - private final AtomicReference currentKey = new AtomicReference<>(); - private final Map> pendingSegments = new ConcurrentHashMap<>(); + private AtomicReference currentKey = new AtomicReference<>(); + private final Consumer> completionListener; - // The next 2 variables are possible assuming that the iterator is not used concurrently. This way we don't - // have to allocate them on every entry iterated - private final ByRef> ref = new ByRef<>(null); - private final BiFunction, Supplier> iteratorMapping; + private final Map> pendingSegments = new ConcurrentHashMap<>(); KeyWatchingCompletionListener(Consumer> completionListener) { this.completionListener = completionListener; - this.iteratorMapping = (k, v) -> { - if (v != null) { - ref.set(v); - } - currentKey.compareAndSet(k, null); - return null; - }; } + /** + * Method invoked for each entry added to the stream passing only the key + * @param key key of entry added to stream + */ public void valueAdded(Object key) { currentKey.set(key); } - public void valueIterated(Object key) { - pendingSegments.compute(key, iteratorMapping); - Supplier segments = ref.get(); - if (segments != null) { - ref.set(null); - completionListener.accept(segments); - } - } - - public void segmentsEncountered(Supplier segments) { - // This code assumes that valueAdded and segmentsEncountered are not invoked concurrently and that all values - // added for a response before the segments are completed. - // The valueIterated method can be invoked at any point however. - // See ClusterStreamManagerImpl$ClusterStreamSubscription.sendRequest where the added and segments are called into - ByRef> segmentsToNotify = new ByRef<>(segments); + /** + * Method to be invoked after all entries have been passed to the stream that belong to these segments + * @param segments the segments that had all entries passed down + */ + public void accept(Supplier segments) { + Supplier notifyThese; Object key = currentKey.get(); if (key != null) { - pendingSegments.compute(key, (k, v) -> { - // The iterator has consumed all the keys, so there is no reason to wait: just notify of segment - // completion immediately - if (currentKey.get() == null) { - return null; - } - // This means we didn't iterate on a key before segments were completed - means it was empty. The - // valueIterated notifies the completion of non-empty segments, but we need to notify the completion of - // empty segments here - segmentsToNotify.set(v); - return segments; - }); + pendingSegments.put(key, segments); + // We now try to go back and set current key to null + if (currentKey.getAndSet(null) == null) { + // If it was already null that means we returned our key via the iterator below + // In this case they may or may not have seen the pendingSegments so if they didn't we have to + // notify ourselves + notifyThese = pendingSegments.remove(key); + } else { + // Means that the iteration will see this + notifyThese = null; + } + } else { + // This means that we got a set of segments that had no entries in them or the iterator + // consumed all entries, so just notify right away + notifyThese = segments; } - Supplier notifyThese = segmentsToNotify.get(); if (notifyThese != null) { completionListener.accept(notifyThese); } } + /** + * This method is to be invoked on possibly a different thread at any point which states that a key has + * been iterated upon. This is the signal that if a set of segments is waiting for a key to be iterated upon + * to notify the iteration + * @param key the key just returning + */ + public void valueIterated(Object key) { + // If we set to null that tells segment completion to just notify above in accept + if (!currentKey.compareAndSet(key, null)) { + // Otherwise we have to check if this key was linked to a group of pending segments + Supplier segments = pendingSegments.remove(key); + if (segments != null) { + completionListener.accept(segments); + } + } + } + + /** + * Invoked after the iterator has completed iterating upon all entries + */ public void completed() { - pendingSegments.forEach((k, v) -> completionListener.accept(v)); + // This should always be empty + assert pendingSegments.isEmpty() : "pendingSegments should be empty but was: " + pendingSegments; } } diff --git a/core/src/main/java/org/infinispan/stream/impl/PublisherDecorator.java b/core/src/main/java/org/infinispan/stream/impl/PublisherDecorator.java index 211c86600f99..51c765ccc51d 100644 --- a/core/src/main/java/org/infinispan/stream/impl/PublisherDecorator.java +++ b/core/src/main/java/org/infinispan/stream/impl/PublisherDecorator.java @@ -5,12 +5,27 @@ import org.reactivestreams.Publisher; /** + * Decorator that decorates publishers based on if it is local or remote to handle segment completions * @author wburns * @since 9.0 */ interface PublisherDecorator { + /** + * Invoked for each remote publisher to provide additional functionality + * @param remotePublisher the provided remote publisher + * @return the resulting publisher (usually wrapped in some way) + */ Publisher decorateRemote(ClusterStreamManager.RemoteIteratorPublisher remotePublisher); + /** + * Invoked for a local publisher, which only completes segments if the consistent hash after completion is the same + * as the one provided + * @param beginningCh the consistent has to test against + * @param onlyLocal whether this publisher is only done locally (that is there are no other remote publishers) + * @param segmentsToFilter the segments to use for this invocation + * @param localPublisher the internal local publisher + * @return the resulting publisher (usually wrapped in some way) + */ Publisher decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, Publisher localPublisher); } diff --git a/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java b/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java index 687c26c96d01..d453a39ff449 100644 --- a/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java +++ b/core/src/main/java/org/infinispan/stream/impl/RehashPublisherDecorator.java @@ -6,7 +6,6 @@ import java.util.function.Supplier; import org.infinispan.commons.util.IntSet; -import org.infinispan.commons.util.SmallIntSet; import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.remoting.transport.Address; @@ -14,64 +13,37 @@ import org.infinispan.util.logging.LogFactory; import org.reactivestreams.Publisher; -import io.reactivex.Flowable; - /** + * PublisherDecorator that decorates the publisher to notify of when segments are completed for these invocations * @author wburns * @since 9.0 */ -class RehashPublisherDecorator implements PublisherDecorator { +class RehashPublisherDecorator extends AbstractRehashPublisherDecorator { private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass()); - final AbstractCacheStream.IteratorOperation iteratorOperation; - final DistributionManager dm; - final Address localAddress; final Consumer> completedSegments; - final Consumer> lostSegments; - final Consumer keyConsumer; RehashPublisherDecorator(AbstractCacheStream.IteratorOperation iteratorOperation, DistributionManager dm, Address localAddress, Consumer> completedSegments, Consumer> lostSegments, Consumer keyConsumer) { - this.iteratorOperation = iteratorOperation; - this.dm = dm; - this.localAddress = localAddress; + super(iteratorOperation, dm, localAddress, lostSegments, keyConsumer); this.completedSegments = completedSegments; - this.lostSegments = lostSegments; - this.keyConsumer = keyConsumer; + } + + @Override + Log getLog() { + return log; } @Override public Publisher decorateRemote(ClusterStreamManager.RemoteIteratorPublisher remotePublisher) { Publisher convertedPublisher = s -> remotePublisher.subscribe(s, completedSegments, lostSegments); - return decorateBeforeReturn(convertedPublisher); + return iteratorOperation.handlePublisher(convertedPublisher, keyConsumer); } @Override public Publisher decorateLocal(ConsistentHash beginningCh, boolean onlyLocal, IntSet segmentsToFilter, Publisher localPublisher) { - Publisher convertedPublisher = Flowable.fromPublisher(localPublisher).doOnComplete(() -> { - IntSet ourSegments; - if (onlyLocal) { - ourSegments = SmallIntSet.from(beginningCh.getSegmentsForOwner(localAddress)); - } else { - ourSegments = SmallIntSet.from(beginningCh.getPrimarySegmentsForOwner(localAddress)); - } - ourSegments.retainAll(segmentsToFilter); - // This will notify both completed and suspect of segments that may not even exist or were completed before - // on a rehash - if (dm.getReadConsistentHash().equals(beginningCh)) { - log.tracef("Local iterator has completed segments %s", ourSegments); - completedSegments.accept((Supplier) ourSegments::iterator); - } else { - log.tracef("Local iterator segments %s are all suspect as consistent hash has changed", ourSegments); - lostSegments.accept((Supplier) ourSegments::iterator); - } - }); - return decorateBeforeReturn(convertedPublisher); - } - - protected Publisher decorateBeforeReturn(Publisher publisher) { - return iteratorOperation.handlePublisher(publisher, keyConsumer); + return decorateLocal(completedSegments, beginningCh, onlyLocal, segmentsToFilter, localPublisher); } } diff --git a/core/src/test/java/org/infinispan/stream/impl/CompletionRehashPublisherDecoratorTest.java b/core/src/test/java/org/infinispan/stream/impl/CompletionRehashPublisherDecoratorTest.java index 013f1ab300ca..fe4dca88abbc 100644 --- a/core/src/test/java/org/infinispan/stream/impl/CompletionRehashPublisherDecoratorTest.java +++ b/core/src/test/java/org/infinispan/stream/impl/CompletionRehashPublisherDecoratorTest.java @@ -31,19 +31,23 @@ import io.reactivex.subscribers.TestSubscriber; /** + * Test to verify that user listener is notified at proper time for segment completions * @author wburns * @since 9.0 */ @Test(testName = "stream.impl.AbstractWriteSkewStressTest", groups = "functional") public class CompletionRehashPublisherDecoratorTest { - CompletionRehashPublisherDecorator createDecorator(KeyWatchingCompletionListener kwcl, Consumer entryConsumer) { - return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, null, null, kwcl, - i -> kwcl.segmentsEncountered(i), i -> {}, entryConsumer); + CompletionRehashPublisherDecorator createDecorator(Consumer> userListener, + Consumer entryConsumer) { + return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, null, null, userListener, + // Just ignore early completed segments and lost ones + i -> {}, i -> {}, entryConsumer); } CompletionRehashPublisherDecorator createDecorator(ConsistentHash ch, Set segmentsForOwner, - Set primarySegmentsForOwner, KeyWatchingCompletionListener kwcl, Consumer entryConsumer) { + Set primarySegmentsForOwner, Consumer> internalListener, + Consumer entryConsumer) { Address address = Mockito.mock(Address.class); if (segmentsForOwner != null) { @@ -55,12 +59,13 @@ CompletionRehashPublisherDecorator createDecorator(ConsistentHash ch, Set DistributionManager dm = when(mock(DistributionManager.class).getReadConsistentHash()).thenReturn(ch).getMock(); - return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, dm, address, kwcl, - i -> kwcl.segmentsEncountered(i), i -> {}, entryConsumer); + return new CompletionRehashPublisherDecorator<>(AbstractCacheStream.IteratorOperation.NO_MAP, dm, address, + // Just ignore early completed segments and lost ones + internalListener, i -> {}, i -> {}, entryConsumer); } void simpleAssert(Publisher resultingPublisher, PublishProcessor valuePublisher, - Consumer> segmentConsumer, KeyWatchingCompletionListener kwcl, + Consumer> segmentConsumer, CompletionRehashPublisherDecorator crpd, Consumer notifySegmentsCompleted, IntSet segments) { // This will store the result once the stream is done TestSubscriber test = Flowable.fromPublisher(resultingPublisher).test(); @@ -90,11 +95,11 @@ void simpleAssert(Publisher resultingPublisher, PublishProcessor Mockito.verify(segmentConsumer, Mockito.never()).accept(Mockito.any()); // Now let our iterate over the entries - kwcl.valueIterated(entry1); + crpd.valueIterated(entry1); Mockito.verify(segmentConsumer, Mockito.never()).accept(Mockito.any()); - kwcl.valueIterated(entry2); + crpd.valueIterated(entry2); Mockito.verify(segmentConsumer, Mockito.never()).accept(Mockito.any()); - kwcl.valueIterated(entry3); + crpd.valueIterated(entry3); notifySegmentsCompleted.accept(segments); @@ -115,35 +120,34 @@ public void testLocalOnlyStreamCompletes() { IntSet segments = SmallIntSet.of(1, 4); Consumer> segmentConsumer = mock(Consumer.class); - KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(segmentConsumer); Consumer entryConsumer = mock(Consumer.class); ConsistentHash ch = mock(ConsistentHash.class); - CompletionRehashPublisherDecorator crpd = createDecorator(ch, segments, null, kwcl, entryConsumer); + CompletionRehashPublisherDecorator crpd = createDecorator(ch, segments, null, segmentConsumer, + entryConsumer); PublishProcessor localPublisherProcessor = PublishProcessor.create(); // This is local only iteration Publisher localPublisher = crpd.decorateLocal(ch, true, segments, localPublisherProcessor); - simpleAssert(localPublisher, localPublisherProcessor, segmentConsumer, kwcl, s -> { }, segments); + simpleAssert(localPublisher, localPublisherProcessor, segmentConsumer, crpd, s -> { }, segments); } public void testRemoteOnlyStreamCompletes() { IntSet segments = SmallIntSet.of(1, 4); Consumer> segmentConsumer = mock(Consumer.class); - KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(segmentConsumer); Consumer entryConsumer = mock(Consumer.class); - CompletionRehashPublisherDecorator crpd = createDecorator(kwcl, entryConsumer); + CompletionRehashPublisherDecorator crpd = createDecorator(segmentConsumer, entryConsumer); TestRemoteIteratorPublisher remoteIteratorPublisher = new TestRemoteIteratorPublisher<>(); // Remote only iteration Publisher resultingPublisher = crpd.decorateRemote(remoteIteratorPublisher); - simpleAssert(resultingPublisher, remoteIteratorPublisher.publishProcessor(), segmentConsumer, kwcl, s -> + simpleAssert(resultingPublisher, remoteIteratorPublisher.publishProcessor(), segmentConsumer, crpd, s -> remoteIteratorPublisher.notifyCompleted(s::iterator), segments); } @@ -151,18 +155,18 @@ public void testRemoteAndLocal() { IntSet segments = SmallIntSet.of(1, 4); Consumer> segmentConsumer = mock(Consumer.class); - KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(segmentConsumer); Consumer entryConsumer = mock(Consumer.class); ConsistentHash ch = mock(ConsistentHash.class); - CompletionRehashPublisherDecorator crpd = createDecorator(ch, segments, null, kwcl, entryConsumer); + CompletionRehashPublisherDecorator crpd = createDecorator(ch, segments, null, segmentConsumer, + entryConsumer); PublishProcessor localPublisherProcessor = PublishProcessor.create(); Publisher localPublisher = crpd.decorateLocal(ch, true, segments, localPublisherProcessor); - simpleAssert(localPublisher, localPublisherProcessor, segmentConsumer, kwcl, s -> { }, segments); + simpleAssert(localPublisher, localPublisherProcessor, segmentConsumer, crpd, s -> { }, segments); // Reset the mock so remote can test it now reset(segmentConsumer); @@ -170,7 +174,7 @@ public void testRemoteAndLocal() { Publisher remotePublisher = crpd.decorateRemote(remoteIteratorPublisher); - simpleAssert(remotePublisher, remoteIteratorPublisher.publishProcessor(), segmentConsumer, kwcl, s -> + simpleAssert(remotePublisher, remoteIteratorPublisher.publishProcessor(), segmentConsumer, crpd, s -> remoteIteratorPublisher.notifyCompleted(s::iterator), segments); } @@ -179,11 +183,11 @@ public void testRemoteAndLocalCompleteSameTime() { IntSet remoteSegments = SmallIntSet.of(2, 3); Consumer> segmentConsumer = mock(Consumer.class); - KeyWatchingCompletionListener kwcl = new KeyWatchingCompletionListener(segmentConsumer); Consumer entryConsumer = mock(Consumer.class); ConsistentHash ch = mock(ConsistentHash.class); - CompletionRehashPublisherDecorator crpd = createDecorator(ch, localSegments, null, kwcl, entryConsumer); + CompletionRehashPublisherDecorator crpd = createDecorator(ch, localSegments, null, segmentConsumer, + entryConsumer); PublishProcessor localPublisherProcessor = PublishProcessor.create(); @@ -234,10 +238,10 @@ public void testRemoteAndLocalCompleteSameTime() { verify(segmentConsumer, never()).accept(any()); // Now we finally iterate upon them - note that 4 wasn't iterated upon yet since remote took priority - kwcl.valueIterated(entry1); - kwcl.valueIterated(entry2); - kwcl.valueIterated(entry3); - kwcl.valueIterated(entry5); + crpd.valueIterated(entry1); + crpd.valueIterated(entry2); + crpd.valueIterated(entry3); + crpd.valueIterated(entry5); ArgumentCaptor> captor = ArgumentCaptor.forClass(Supplier.class); @@ -249,7 +253,7 @@ public void testRemoteAndLocalCompleteSameTime() { reset(segmentConsumer); - kwcl.valueIterated(entry4); + crpd.valueIterated(entry4); Mockito.verify(segmentConsumer, Mockito.times(1)).accept(captor.capture()); diff --git a/core/src/test/java/org/infinispan/stream/impl/TestRemoteIteratorPublisher.java b/core/src/test/java/org/infinispan/stream/impl/TestRemoteIteratorPublisher.java index 55082bbc86cc..6cf361d4a1a2 100644 --- a/core/src/test/java/org/infinispan/stream/impl/TestRemoteIteratorPublisher.java +++ b/core/src/test/java/org/infinispan/stream/impl/TestRemoteIteratorPublisher.java @@ -8,7 +8,6 @@ import org.reactivestreams.Subscriber; import io.reactivex.processors.PublishProcessor; -import io.reactivex.subscribers.TestSubscriber; /** * Publisher that also allows signaling completed or lost signals. Note this publisher only allows a single subscription