diff --git a/core/src/main/java/org/infinispan/CacheStream.java b/core/src/main/java/org/infinispan/CacheStream.java index 6454358d85cb..29c9a1ed837f 100644 --- a/core/src/main/java/org/infinispan/CacheStream.java +++ b/core/src/main/java/org/infinispan/CacheStream.java @@ -217,6 +217,27 @@ interface SegmentCompletionListener { */ void forEach(SerializableConsumer action); + /** + * Same as {@link CacheStream#forEach(Consumer)} except that it takes a {@link BiConsumer} that provides access + * to the underlying {@link Cache} that is backing this stream. + *

+ * Note that the CacheAware interface is not supported for injection using this method as the cache + * is provided in the consumer directly. + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(BiConsumer, ? super R> action); + + /** + * Same as {@link CacheStream#forEach(BiConsumer)} except that the BiConsumer must also implement + * Serializable + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(SerializableBiConsumer, ? super R> action); + /** * {@inheritDoc} *

Usage of this operator requires closing this stream after you are done with the iterator. The preferred diff --git a/core/src/main/java/org/infinispan/DoubleCacheStream.java b/core/src/main/java/org/infinispan/DoubleCacheStream.java index 6de3bf444954..0cba19b562a4 100644 --- a/core/src/main/java/org/infinispan/DoubleCacheStream.java +++ b/core/src/main/java/org/infinispan/DoubleCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan; +import org.infinispan.util.function.SerializableBiConsumer; import org.infinispan.util.function.SerializableDoubleBinaryOperator; import org.infinispan.util.function.SerializableDoubleConsumer; import org.infinispan.util.function.SerializableDoubleFunction; @@ -8,7 +9,6 @@ import org.infinispan.util.function.SerializableDoubleToLongFunction; import org.infinispan.util.function.SerializableDoubleUnaryOperator; import org.infinispan.util.function.SerializableObjDoubleConsumer; -import org.infinispan.util.function.SerializableBiConsumer; import org.infinispan.util.function.SerializableSupplier; import java.util.OptionalDouble; @@ -198,6 +198,27 @@ public interface DoubleCacheStream extends DoubleStream { */ void forEach(SerializableDoubleConsumer action); + /** + * Same as {@link DoubleCacheStream#forEach(DoubleConsumer)} except that it takes an {@link ObjDoubleConsumer} that + * provides access to the underlying {@link Cache} that is backing this stream. + *

+ * Note that the CacheAware interface is not supported for injection using this method as the cache + * is provided in the consumer directly. + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(ObjDoubleConsumer> action); + + /** + * Same as {@link DoubleCacheStream#forEach(ObjDoubleConsumer)} except that the BiConsumer must also implement + * Serializable + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(SerializableObjDoubleConsumer> action); + /** * Same as {@link DoubleCacheStream#reduce(double, DoubleBinaryOperator)} except that the DoubleBinaryOperator must * also implement Serializable diff --git a/core/src/main/java/org/infinispan/IntCacheStream.java b/core/src/main/java/org/infinispan/IntCacheStream.java index 5965149c05e7..9d57b7cf6632 100644 --- a/core/src/main/java/org/infinispan/IntCacheStream.java +++ b/core/src/main/java/org/infinispan/IntCacheStream.java @@ -198,6 +198,27 @@ public interface IntCacheStream extends IntStream { */ void forEach(SerializableIntConsumer action); + /** + * Same as {@link IntCacheStream#forEach(IntConsumer)} except that it takes an {@link ObjIntConsumer} that + * provides access to the underlying {@link Cache} that is backing this stream. + *

+ * Note that the CacheAware interface is not supported for injection using this method as the cache + * is provided in the consumer directly. + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(ObjIntConsumer> action); + + /** + * Same as {@link IntCacheStream#forEach(ObjIntConsumer)} except that the BiConsumer must also implement + * Serializable + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(SerializableObjIntConsumer> action); + /** * Same as {@link IntCacheStream#reduce(int, IntBinaryOperator)} except that the IntBinaryOperator * must also implement Serializable diff --git a/core/src/main/java/org/infinispan/LongCacheStream.java b/core/src/main/java/org/infinispan/LongCacheStream.java index 70847ff197e9..a0975fa53566 100644 --- a/core/src/main/java/org/infinispan/LongCacheStream.java +++ b/core/src/main/java/org/infinispan/LongCacheStream.java @@ -206,6 +206,27 @@ public interface LongCacheStream extends LongStream { */ void forEach(SerializableLongConsumer action); + /** + * Same as {@link LongCacheStream#forEach(LongConsumer)} except that it takes an {@link ObjLongConsumer} that + * provides access to the underlying {@link Cache} that is backing this stream. + *

+ * Note that the CacheAware interface is not supported for injection using this method as the cache + * is provided in the consumer directly. + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(ObjLongConsumer> action); + + /** + * Same as {@link LongCacheStream#forEach(ObjLongConsumer)} except that the BiConsumer must also implement + * Serializable + * @param action consumer to be ran for each element in the stream + * @param key type of the cache + * @param value type of the cache + */ + void forEach(SerializableObjLongConsumer> action); + /** * Same as {@link LongCacheStream#reduce(long, LongBinaryOperator)} except that the LongBinaryOperator must * also implement Serializable. diff --git a/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java index 0b9064b55230..e1f2e4013695 100644 --- a/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/AbstractCacheStream.java @@ -43,9 +43,8 @@ * primitive types. * @param The type returned by the stream * @param The stream interface - * @param The consumer for this stream */ -public abstract class AbstractCacheStream, S2 extends S, T_CONS> implements BaseStream { +public abstract class AbstractCacheStream, S2 extends S> implements BaseStream { protected final Log log = LogFactory.getLog(getClass()); protected final Queue intermediateOperations; @@ -100,7 +99,7 @@ protected AbstractCacheStream(Address localAddress, boolean parallel, Distributi intermediateOperations = new ArrayDeque<>(); } - protected AbstractCacheStream(AbstractCacheStream other) { + protected AbstractCacheStream(AbstractCacheStream other) { this.intermediateOperations = other.intermediateOperations; this.localIntermediateOperations = other.localIntermediateOperations; this.localAddress = other.localAddress; @@ -234,12 +233,12 @@ public void close() { } } - R performOperation(Function function, boolean retryOnRehash, BinaryOperator accumulator, + R performOperation(Function function, boolean retryOnRehash, BinaryOperator accumulator, Predicate earlyTerminatePredicate) { return performOperation(function, retryOnRehash, accumulator, earlyTerminatePredicate, true); } - R performOperation(Function function, boolean retryOnRehash, BinaryOperator accumulator, + R performOperation(Function function, boolean retryOnRehash, BinaryOperator accumulator, Predicate earlyTerminatePredicate, boolean ignoreSorting) { // These operations are not affected by sorting, only by distinct if (intermediateType.shouldUseIntermediate(!ignoreSorting && sorted, distinct)) { @@ -254,7 +253,7 @@ R performOperation(Function function, boolean retryOnRehash, } } - R performOperation(Function function, ResultsAccumulator remoteResults, + R performOperation(Function function, ResultsAccumulator remoteResults, Predicate earlyTerminatePredicate) { ConsistentHash ch = dm.getConsistentHash(); TerminalOperation op = new SingleRunOperation<>(intermediateOperations, @@ -283,7 +282,7 @@ R performOperation(Function function, ResultsAccumulator } } - R performOperationRehashAware(Function function, boolean retryOnRehash, + R performOperationRehashAware(Function function, boolean retryOnRehash, ResultsAccumulator remoteResults, Predicate earlyTerminatePredicate) { Set segmentsToProcess = segmentsToFilter; TerminalOperation op; @@ -353,10 +352,8 @@ R performOperationRehashAware(Function function, boolean ret return remoteResults.currentValue; } - abstract KeyTrackingTerminalOperation getForEach(T_CONS consumer, - Supplier> supplier); - - void performRehashForEach(T_CONS consumer) { + void performRehashKeyTrackingOperation( + Function>, KeyTrackingTerminalOperation> function) { final AtomicBoolean complete = new AtomicBoolean(); ConsistentHash segmentInfoCH = dm.getReadConsistentHash(); @@ -380,7 +377,7 @@ void performRehashForEach(T_CONS consumer) { segments = null; excludedKeys = Collections.emptySet(); } - KeyTrackingTerminalOperation op = getForEach(consumer, supplierForSegments(ch, + KeyTrackingTerminalOperation op = function.apply(supplierForSegments(ch, segmentsToProcess, excludedKeys)); op.handleInjection(registry); Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess, @@ -782,7 +779,7 @@ public boolean shouldUseIntermediate(boolean sorted, boolean distinct) { } } - R performIntermediateRemoteOperation(Function function) { + R performIntermediateRemoteOperation(Function function) { switch (intermediateType) { case OBJ: return performObjIntermediateRemoteOperation(function); @@ -797,39 +794,39 @@ R performIntermediateRemoteOperation(Function function) { } } - R performIntegerIntermediateRemoteOperation(Function function) { + R performIntegerIntermediateRemoteOperation(Function function) { // TODO: once we don't have to box for primitive iterators we can remove this copy Queue copyOperations = new ArrayDeque<>(localIntermediateOperations); PrimitiveIterator.OfInt iterator = new DistributedIntCacheStream(this).remoteIterator(); - SingleRunOperation op = new SingleRunOperation<>(copyOperations, + SingleRunOperation op = new SingleRunOperation<>(copyOperations, () -> StreamSupport.intStream(Spliterators.spliteratorUnknownSize( iterator, Spliterator.CONCURRENT), parallel), function); return op.performOperation(); } - R performDoubleIntermediateRemoteOperation(Function function) { + R performDoubleIntermediateRemoteOperation(Function function) { // TODO: once we don't have to box for primitive iterators we can remove this copy Queue copyOperations = new ArrayDeque<>(localIntermediateOperations); PrimitiveIterator.OfDouble iterator = new DistributedDoubleCacheStream(this).remoteIterator(); - SingleRunOperation op = new SingleRunOperation<>(copyOperations, + SingleRunOperation op = new SingleRunOperation<>(copyOperations, () -> StreamSupport.doubleStream(Spliterators.spliteratorUnknownSize( iterator, Spliterator.CONCURRENT), parallel), function); return op.performOperation(); } - R performLongIntermediateRemoteOperation(Function function) { + R performLongIntermediateRemoteOperation(Function function) { // TODO: once we don't have to box for primitive iterators we can remove this copy Queue copyOperations = new ArrayDeque<>(localIntermediateOperations); PrimitiveIterator.OfLong iterator = new DistributedLongCacheStream(this).remoteIterator(); - SingleRunOperation op = new SingleRunOperation<>(copyOperations, + SingleRunOperation op = new SingleRunOperation<>(copyOperations, () -> StreamSupport.longStream(Spliterators.spliteratorUnknownSize( iterator, Spliterator.CONCURRENT), parallel), function); return op.performOperation(); } - R performObjIntermediateRemoteOperation(Function function) { + R performObjIntermediateRemoteOperation(Function function) { Iterator iterator = new DistributedCacheStream<>(this).remoteIterator(); - SingleRunOperation op = new SingleRunOperation<>(localIntermediateOperations, + SingleRunOperation op = new SingleRunOperation<>(localIntermediateOperations, () -> StreamSupport.stream(Spliterators.spliteratorUnknownSize( iterator, Spliterator.CONCURRENT), parallel), function); return op.performOperation(); diff --git a/core/src/main/java/org/infinispan/stream/impl/AbstractDelegatingCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/AbstractDelegatingCacheStream.java index c3f4b5da3455..cf73fa125d7d 100644 --- a/core/src/main/java/org/infinispan/stream/impl/AbstractDelegatingCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/AbstractDelegatingCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.util.function.SerializableBiConsumer; import org.infinispan.util.function.SerializableBiFunction; @@ -100,7 +101,17 @@ public void forEach(Consumer action) { @Override public void forEach(SerializableConsumer action) { - forEach((Consumer) action); + castStream(underlyingStream).forEach(action); + } + + @Override + public void forEach(BiConsumer, ? super R> action) { + castStream(underlyingStream).forEach(action); + } + + @Override + public void forEach(SerializableBiConsumer, ? super R> action) { + castStream(underlyingStream).forEach(action); } @Override @@ -120,7 +131,7 @@ public A[] toArray(IntFunction generator) { @Override public A[] toArray(SerializableIntFunction generator) { - return toArray((IntFunction) generator); + return underlyingStream.toArray(generator); } @Override @@ -130,7 +141,7 @@ public R reduce(R identity, BinaryOperator accumulator) { @Override public R reduce(R identity, SerializableBinaryOperator accumulator) { - return reduce(identity, (BinaryOperator) accumulator); + return castStream(underlyingStream).reduce(identity, accumulator); } @Override @@ -140,7 +151,7 @@ public Optional reduce(BinaryOperator accumulator) { @Override public Optional reduce(SerializableBinaryOperator accumulator) { - return reduce((BinaryOperator) accumulator); + return castStream(underlyingStream).reduce(accumulator); } @Override @@ -150,7 +161,7 @@ public U reduce(U identity, BiFunction accumulator, BinaryO @Override public U reduce(U identity, SerializableBiFunction accumulator, SerializableBinaryOperator combiner) { - return reduce(identity, (BiFunction) accumulator, combiner); + return castStream(underlyingStream).reduce(identity, accumulator, combiner); } @Override @@ -160,7 +171,7 @@ public R1 collect(Supplier supplier, BiConsumer accumula @Override public R1 collect(SerializableSupplier supplier, SerializableBiConsumer accumulator, SerializableBiConsumer combiner) { - return collect((Supplier) supplier, accumulator, combiner); + return castStream(underlyingStream).collect(supplier, accumulator, combiner); } @Override @@ -221,7 +232,8 @@ public CacheStream sorted(Comparator comparator) { @Override public CacheStream sorted(SerializableComparator comparator) { - return sorted((Comparator) comparator); + underlyingStream = castStream(underlyingStream).sorted(comparator); + return this; } @Override @@ -232,7 +244,8 @@ public CacheStream peek(Consumer action) { @Override public CacheStream peek(SerializableConsumer action) { - return peek((Consumer) action); + underlyingStream = castStream(underlyingStream).peek(action); + return this; } @Override @@ -255,7 +268,8 @@ public CacheStream filter(Predicate predicate) { @Override public CacheStream filter(SerializablePredicate predicate) { - return filter((Predicate) predicate); + underlyingStream = castStream(underlyingStream).filter(predicate); + return this; } @Override @@ -266,7 +280,8 @@ public CacheStream map(Function mapper) { @Override public CacheStream map(SerializableFunction mapper) { - return map((Function) mapper); + underlyingStream = castStream(underlyingStream).map(mapper); + return (CacheStream) this; } @Override @@ -277,7 +292,8 @@ public CacheStream flatMap(Function CacheStream flatMap(SerializableFunction> mapper) { - return flatMap((Function>) mapper); + underlyingStream = castStream(underlyingStream).flatMap(mapper); + return (CacheStream) this; } @Override @@ -298,7 +314,7 @@ public Optional min(Comparator comparator) { @Override public Optional min(SerializableComparator comparator) { - return min((Comparator) comparator); + return castStream(underlyingStream).min(comparator); } @Override @@ -308,7 +324,7 @@ public Optional max(Comparator comparator) { @Override public Optional max(SerializableComparator comparator) { - return max((Comparator) comparator); + return castStream(underlyingStream).max(comparator); } @Override @@ -323,7 +339,7 @@ public boolean anyMatch(Predicate predicate) { @Override public boolean anyMatch(SerializablePredicate predicate) { - return anyMatch((Predicate) predicate); + return castStream(underlyingStream).anyMatch(predicate); } @Override @@ -333,7 +349,7 @@ public boolean allMatch(Predicate predicate) { @Override public boolean allMatch(SerializablePredicate predicate) { - return allMatch((Predicate) predicate); + return castStream(underlyingStream).allMatch(predicate); } @Override @@ -343,7 +359,7 @@ public boolean noneMatch(Predicate predicate) { @Override public boolean noneMatch(SerializablePredicate predicate) { - return noneMatch((Predicate) predicate); + return castStream(underlyingStream).noneMatch(predicate); } @Override diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java index 5e951fa85383..f1ed0938e9bc 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.DoubleCacheStream; import org.infinispan.IntCacheStream; @@ -16,6 +17,7 @@ import org.infinispan.remoting.transport.Address; import org.infinispan.stream.impl.intops.object.*; import org.infinispan.stream.impl.termop.SingleRunOperation; +import org.infinispan.stream.impl.termop.object.ForEachBiOperation; import org.infinispan.stream.impl.termop.object.ForEachOperation; import org.infinispan.stream.impl.termop.object.NoMapIteratorOperation; import org.infinispan.util.*; @@ -60,7 +62,7 @@ * nodes * @param The type of the stream */ -public class DistributedCacheStream extends AbstractCacheStream, CacheStream, Consumer> +public class DistributedCacheStream extends AbstractCacheStream, CacheStream> implements CacheStream { // This is a hack to allow for cast to work properly, since Java doesn't work as well with nested generics @@ -506,9 +508,10 @@ public boolean noneMatch(SerializablePredicate predicate) { public Optional findFirst() { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { Iterator iterator = iterator(); - SingleRunOperation, R, Stream> op = new SingleRunOperation<>(localIntermediateOperations, + SingleRunOperation, R, Stream, Stream> op = new SingleRunOperation<>(localIntermediateOperations, () -> StreamSupport.stream(Spliterators.spliteratorUnknownSize( - iterator, Spliterator.CONCURRENT | Spliterator.NONNULL), parallel), Stream::findFirst); + iterator, Spliterator.CONCURRENT | Spliterator.NONNULL), parallel), + (Stream r) -> r.findFirst()); return op.performOperation(); } else { return findAny(); @@ -533,7 +536,7 @@ public long count() { @Override public Iterator iterator() { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - return performIntermediateRemoteOperation(Stream::iterator); + return performIntermediateRemoteOperation((Stream s) -> s.iterator()); } else { return remoteIterator(); } @@ -924,7 +927,8 @@ public void forEach(Consumer action) { if (!rehashAware) { performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); } else { - performRehashForEach(action); + performRehashKeyTrackingOperation(s -> new ForEachOperation<>(intermediateOperations, s, distributedBatchSize, + action)); } } @@ -934,17 +938,27 @@ public void forEach(SerializableConsumer action) { } @Override - KeyTrackingTerminalOperation getForEach(Consumer consumer, Supplier> supplier) { - return new ForEachOperation<>(intermediateOperations, supplier, distributedBatchSize, consumer); + public void forEach(BiConsumer, ? super R> action) { + if (!rehashAware) { + performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); + } else { + performRehashKeyTrackingOperation(s -> new ForEachBiOperation(intermediateOperations, s, + distributedBatchSize, action)); + } + } + + @Override + public void forEach(SerializableBiConsumer, ? super R> action) { + forEach((BiConsumer, ? super R>) action); } @Override public void forEachOrdered(Consumer action) { if (sorted) { Iterator iterator = iterator(); - SingleRunOperation> op = new SingleRunOperation<>(localIntermediateOperations, + SingleRunOperation, Stream> op = new SingleRunOperation<>(localIntermediateOperations, () -> StreamSupport.stream(Spliterators.spliteratorUnknownSize( - iterator, Spliterator.CONCURRENT | Spliterator.NONNULL), parallel), s -> { + iterator, Spliterator.CONCURRENT | Spliterator.NONNULL), parallel), (Stream s) -> { s.forEachOrdered(action); return null; }); diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedDoubleCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedDoubleCacheStream.java index 0e52f674968e..e0231212eadd 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedDoubleCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedDoubleCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.DoubleCacheStream; import org.infinispan.IntCacheStream; @@ -19,6 +20,8 @@ import org.infinispan.stream.impl.intops.primitive.d.SortedDoubleOperation; import org.infinispan.stream.impl.termop.primitive.ForEachDoubleOperation; import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapDoubleOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjDoubleOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjDoubleOperation; import org.infinispan.util.function.SerializableDoubleBinaryOperator; import org.infinispan.util.function.SerializableDoubleConsumer; import org.infinispan.util.function.SerializableDoubleFunction; @@ -57,7 +60,7 @@ * class is only able to be created using {@link org.infinispan.CacheStream#mapToDouble(ToDoubleFunction)} or similar * methods from the {@link org.infinispan.CacheStream} interface. */ -public class DistributedDoubleCacheStream extends AbstractCacheStream +public class DistributedDoubleCacheStream extends AbstractCacheStream implements DoubleCacheStream { /** * This constructor is to be used only when a user calls a map or flat map method changing to a DoubleStream @@ -191,7 +194,7 @@ public void forEach(DoubleConsumer action) { if (!rehashAware) { performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); } else { - performRehashForEach(action); + performRehashKeyTrackingOperation(s -> getForEach(action, s)); } } @@ -201,6 +204,19 @@ public void forEach(SerializableDoubleConsumer action) { } @Override + public void forEach(ObjDoubleConsumer> action) { + if (!rehashAware) { + performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); + } else { + performRehashKeyTrackingOperation(s -> getForEach(action, s)); + } + } + + @Override + public void forEach(SerializableObjDoubleConsumer> action) { + forEach((ObjDoubleConsumer>) action); + } + KeyTrackingTerminalOperation getForEach(DoubleConsumer consumer, Supplier> supplier) { if (iteratorOperation == IteratorOperation.FLAT_MAP) { @@ -210,10 +226,19 @@ KeyTrackingTerminalOperation getForEach(DoubleConsumer c } } + KeyTrackingTerminalOperation getForEach(ObjDoubleConsumer> consumer, + Supplier> supplier) { + if (iteratorOperation == IteratorOperation.FLAT_MAP) { + return new ForEachFlatMapObjDoubleOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } else { + return new ForEachObjDoubleOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } + } + @Override public void forEachOrdered(DoubleConsumer action) { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - performIntermediateRemoteOperation(s -> { + performIntermediateRemoteOperation((DoubleStream s) -> { s.forEachOrdered(action); return null; }); @@ -380,7 +405,7 @@ public boolean noneMatch(SerializableDoublePredicate predicate) { @Override public OptionalDouble findFirst() { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - return performIntermediateRemoteOperation(DoubleStream::findFirst); + return performIntermediateRemoteOperation((DoubleStream s) -> s.findFirst()); } else { return findAny(); } diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedIntCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedIntCacheStream.java index e21136e3f13e..feb1a0762f68 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedIntCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedIntCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.DoubleCacheStream; import org.infinispan.IntCacheStream; @@ -20,7 +21,9 @@ import org.infinispan.stream.impl.intops.primitive.i.SkipIntOperation; import org.infinispan.stream.impl.intops.primitive.i.SortedIntOperation; import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapIntOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjIntOperation; import org.infinispan.stream.impl.termop.primitive.ForEachIntOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjIntOperation; import org.infinispan.util.function.SerializableIntBinaryOperator; import org.infinispan.util.function.SerializableIntConsumer; import org.infinispan.util.function.SerializableIntPredicate; @@ -60,7 +63,7 @@ * class is only able to be created using {@link org.infinispan.CacheStream#mapToInt(ToIntFunction)} or similar * methods from the {@link org.infinispan.CacheStream} interface. */ -public class DistributedIntCacheStream extends AbstractCacheStream +public class DistributedIntCacheStream extends AbstractCacheStream implements IntCacheStream { /** * This constructor is to be used only when a user calls a map or flat map method changing to an IntStream @@ -181,11 +184,6 @@ public IntCacheStream skip(long n) { return addIntermediateOperation(op); } - @Override - public void forEach(SerializableIntConsumer action) { - forEach((IntConsumer) action); - } - @Override public LongCacheStream asLongStream() { addIntermediateOperationMap(AsLongIntOperation.getInstance()); @@ -212,11 +210,29 @@ public void forEach(IntConsumer action) { if (!rehashAware) { performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); } else { - performRehashForEach(action); + performRehashKeyTrackingOperation(s -> getForEach(action, s)); } } @Override + public void forEach(SerializableIntConsumer action) { + forEach((IntConsumer) action); + } + + @Override + public void forEach(ObjIntConsumer> action) { + if (!rehashAware) { + performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); + } else { + performRehashKeyTrackingOperation(s -> getForEach(action, s)); + } + } + + @Override + public void forEach(SerializableObjIntConsumer> action) { + forEach((ObjIntConsumer>) action); + } + KeyTrackingTerminalOperation getForEach(IntConsumer consumer, Supplier> supplier) { if (iteratorOperation == IteratorOperation.FLAT_MAP) { @@ -226,10 +242,19 @@ KeyTrackingTerminalOperation getForEach(IntConsumer con } } + KeyTrackingTerminalOperation getForEach(ObjIntConsumer> consumer, + Supplier> supplier) { + if (iteratorOperation == IteratorOperation.FLAT_MAP) { + return new ForEachFlatMapObjIntOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } else { + return new ForEachObjIntOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } + } + @Override public void forEachOrdered(IntConsumer action) { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - performIntermediateRemoteOperation(s -> { + performIntermediateRemoteOperation((IntStream s) -> { s.forEachOrdered(action); return null; }); @@ -396,7 +421,7 @@ public boolean noneMatch(SerializableIntPredicate predicate) { @Override public OptionalInt findFirst() { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - return performIntermediateRemoteOperation(IntStream::findFirst); + return performIntermediateRemoteOperation((IntStream s) -> s.findFirst()); } else { return findAny(); } diff --git a/core/src/main/java/org/infinispan/stream/impl/DistributedLongCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/DistributedLongCacheStream.java index d2ccc537c999..e1b03d3b4e2d 100644 --- a/core/src/main/java/org/infinispan/stream/impl/DistributedLongCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/DistributedLongCacheStream.java @@ -1,5 +1,6 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.DoubleCacheStream; import org.infinispan.IntCacheStream; @@ -18,7 +19,9 @@ import org.infinispan.stream.impl.intops.primitive.l.PeekLongOperation; import org.infinispan.stream.impl.intops.primitive.l.SortedLongOperation; import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapLongOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjLongOperation; import org.infinispan.stream.impl.termop.primitive.ForEachLongOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjLongOperation; import org.infinispan.util.function.SerializableLongBinaryOperator; import org.infinispan.util.function.SerializableLongConsumer; import org.infinispan.util.function.SerializableLongFunction; @@ -58,7 +61,7 @@ * class is only able to be created using {@link org.infinispan.CacheStream#mapToInt(ToIntFunction)} or similar * methods from the {@link org.infinispan.CacheStream} interface. */ -public class DistributedLongCacheStream extends AbstractCacheStream +public class DistributedLongCacheStream extends AbstractCacheStream implements LongCacheStream { /** * This constructor is to be used only when a user calls a map or flat map method changing to an IntStream @@ -198,7 +201,7 @@ public void forEach(LongConsumer action) { if (!rehashAware) { performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); } else { - performRehashForEach(action); + performRehashKeyTrackingOperation(s -> getForEach(action, s)); } } @@ -208,6 +211,19 @@ public void forEach(SerializableLongConsumer action) { } @Override + public void forEach(ObjLongConsumer> action) { + if (!rehashAware) { + performOperation(TerminalFunctions.forEachFunction(action), false, (v1, v2) -> null, null); + } else { + performRehashKeyTrackingOperation(s -> getForEach(action, s)); + } + } + + @Override + public void forEach(SerializableObjLongConsumer> action) { + forEach((ObjLongConsumer>) action); + } + KeyTrackingTerminalOperation getForEach(LongConsumer consumer, Supplier> supplier) { if (iteratorOperation == IteratorOperation.FLAT_MAP) { @@ -217,10 +233,19 @@ KeyTrackingTerminalOperation getForEach(LongConsumer consu } } + KeyTrackingTerminalOperation getForEach(ObjLongConsumer> consumer, + Supplier> supplier) { + if (iteratorOperation == IteratorOperation.FLAT_MAP) { + return new ForEachFlatMapObjLongOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } else { + return new ForEachObjLongOperation(intermediateOperations, supplier, distributedBatchSize, consumer); + } + } + @Override public void forEachOrdered(LongConsumer action) { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - performIntermediateRemoteOperation(s -> { + performIntermediateRemoteOperation((LongStream s) -> { s.forEachOrdered(action); return null; }); @@ -387,7 +412,7 @@ public boolean noneMatch(SerializableLongPredicate predicate) { @Override public OptionalLong findFirst() { if (intermediateType.shouldUseIntermediate(sorted, distinct)) { - return performIntermediateRemoteOperation(LongStream::findFirst); + return performIntermediateRemoteOperation((LongStream s) -> s.findFirst()); } else { return findAny(); } diff --git a/core/src/main/java/org/infinispan/stream/impl/TerminalFunctions.java b/core/src/main/java/org/infinispan/stream/impl/TerminalFunctions.java index cb444e37f220..6da3e0ebbe49 100644 --- a/core/src/main/java/org/infinispan/stream/impl/TerminalFunctions.java +++ b/core/src/main/java/org/infinispan/stream/impl/TerminalFunctions.java @@ -1,5 +1,10 @@ package org.infinispan.stream.impl; +import org.infinispan.Cache; +import org.infinispan.CacheStream; +import org.infinispan.DoubleCacheStream; +import org.infinispan.IntCacheStream; +import org.infinispan.LongCacheStream; import org.infinispan.commons.marshall.Externalizer; import org.infinispan.commons.marshall.SerializeWith; @@ -130,18 +135,34 @@ public static Function, Void> forEachFunction(Consumer return new ForEachFunction<>(consumer); } + public static Function, Void> forEachFunction(BiConsumer, ? super T> consumer) { + return new ForEachBiConsumerFunction<>(consumer); + } + public static Function forEachFunction(DoubleConsumer consumer) { return new ForEachDoubleFunction<>(consumer); } + public static Function forEachFunction(ObjDoubleConsumer> consumer) { + return new ForEachObjDoubleFunction<>(consumer); + } + public static Function forEachFunction(IntConsumer consumer) { return new ForEachIntFunction<>(consumer); } + public static Function forEachFunction(ObjIntConsumer> consumer) { + return new ForEachObjIntFunction<>(consumer); + } + public static Function forEachFunction(LongConsumer consumer) { return new ForEachLongFunction<>(consumer); } + public static Function forEachFunction(ObjLongConsumer> consumer) { + return new ForEachObjLongFunction<>(consumer); + } + public static Function, T> maxFunction(Comparator comparator) { return new MaxFunction<>(comparator); } @@ -1087,6 +1108,33 @@ public ForEachFunction readObject(ObjectInput input) throws IOException, ClassNo } } + @SerializeWith(value = ForEachBiConsumerFunction.ForEachBiConsumerFunctionExternalizer.class) + private static final class ForEachBiConsumerFunction implements Function, Void> { + private final BiConsumer, ? super T> consumer; + + private ForEachBiConsumerFunction(BiConsumer, ? super T> consumer) { + this.consumer = consumer; + } + + @Override + public Void apply(CacheStream stream) { + stream.forEach(consumer); + return null; + } + + public static final class ForEachBiConsumerFunctionExternalizer implements Externalizer { + @Override + public void writeObject(ObjectOutput output, ForEachBiConsumerFunction object) throws IOException { + output.writeObject(object.consumer); + } + + @Override + public ForEachBiConsumerFunction readObject(ObjectInput input) throws IOException, ClassNotFoundException { + return new ForEachBiConsumerFunction((BiConsumer) input.readObject()); + } + } + } + @SerializeWith(value = ForEachDoubleFunction.ForEachDoubleFunctionExternalizer.class) private static final class ForEachDoubleFunction implements Function { private final DoubleConsumer consumer; @@ -1114,6 +1162,33 @@ public ForEachDoubleFunction readObject(ObjectInput input) throws IOException, C } } + @SerializeWith(value = ForEachObjDoubleFunction.ForEachObjDoubleFunctionExternalizer.class) + private static final class ForEachObjDoubleFunction implements Function { + private final ObjDoubleConsumer> consumer; + + private ForEachObjDoubleFunction(ObjDoubleConsumer> consumer) { + this.consumer = consumer; + } + + @Override + public Void apply(DoubleCacheStream stream) { + stream.forEach(consumer); + return null; + } + + public static final class ForEachObjDoubleFunctionExternalizer implements Externalizer { + @Override + public void writeObject(ObjectOutput output, ForEachObjDoubleFunction object) throws IOException { + output.writeObject(object.consumer); + } + + @Override + public ForEachObjDoubleFunction readObject(ObjectInput input) throws IOException, ClassNotFoundException { + return new ForEachObjDoubleFunction((ObjDoubleConsumer) input.readObject()); + } + } + } + @SerializeWith(value = ForEachIntFunction.ForEachIntFunctionExternalizer.class) private static final class ForEachIntFunction implements Function { private final IntConsumer consumer; @@ -1141,6 +1216,33 @@ public ForEachIntFunction readObject(ObjectInput input) throws IOException, Clas } } + @SerializeWith(value = ForEachObjIntFunction.ForEachObjIntFunctionExternalizer.class) + private static final class ForEachObjIntFunction implements Function { + private final ObjIntConsumer> consumer; + + private ForEachObjIntFunction(ObjIntConsumer> consumer) { + this.consumer = consumer; + } + + @Override + public Void apply(IntCacheStream stream) { + stream.forEach(consumer); + return null; + } + + public static final class ForEachObjIntFunctionExternalizer implements Externalizer { + @Override + public void writeObject(ObjectOutput output, ForEachObjIntFunction object) throws IOException { + output.writeObject(object.consumer); + } + + @Override + public ForEachObjIntFunction readObject(ObjectInput input) throws IOException, ClassNotFoundException { + return new ForEachObjIntFunction((ObjIntConsumer) input.readObject()); + } + } + } + @SerializeWith(value = ForEachLongFunction.ForEachLongFunctionExternalizer.class) private static final class ForEachLongFunction implements Function { private final LongConsumer consumer; @@ -1168,6 +1270,33 @@ public ForEachLongFunction readObject(ObjectInput input) throws IOException, Cla } } + @SerializeWith(value = ForEachObjLongFunction.ForEachObjLongFunctionExternalizer.class) + private static final class ForEachObjLongFunction implements Function { + private final ObjLongConsumer> consumer; + + private ForEachObjLongFunction(ObjLongConsumer> consumer) { + this.consumer = consumer; + } + + @Override + public Void apply(LongCacheStream stream) { + stream.forEach(consumer); + return null; + } + + public static final class ForEachObjLongFunctionExternalizer implements Externalizer { + @Override + public void writeObject(ObjectOutput output, ForEachObjLongFunction object) throws IOException { + output.writeObject(object.consumer); + } + + @Override + public ForEachObjLongFunction readObject(ObjectInput input) throws IOException, ClassNotFoundException { + return new ForEachObjLongFunction((ObjLongConsumer) input.readObject()); + } + } + } + @SerializeWith(value = MaxFunction.MaxFunctionExternalizer.class) private static final class MaxFunction implements Function, T> { private final Comparator comparator; diff --git a/core/src/main/java/org/infinispan/stream/impl/local/LocalCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/local/LocalCacheStream.java index 194eecabf217..14bcfe9100c1 100644 --- a/core/src/main/java/org/infinispan/stream/impl/local/LocalCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/local/LocalCacheStream.java @@ -248,6 +248,17 @@ public void forEach(SerializableConsumer action) { forEach((Consumer) action); } + @Override + public void forEach(BiConsumer, ? super R> action) { + Cache cache = registry.getComponent(Cache.class); + createStream().forEach(e -> action.accept(cache, e)); + } + + @Override + public void forEach(SerializableBiConsumer, ? super R> action) { + forEach((BiConsumer, ? super R>) action); + } + @Override public void forEachOrdered(Consumer action) { injectCache(action); diff --git a/core/src/main/java/org/infinispan/stream/impl/local/LocalDoubleCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/local/LocalDoubleCacheStream.java index ee67681d81ca..5b122f337eeb 100644 --- a/core/src/main/java/org/infinispan/stream/impl/local/LocalDoubleCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/local/LocalDoubleCacheStream.java @@ -169,6 +169,17 @@ public void forEach(SerializableDoubleConsumer action) { forEach((DoubleConsumer) action); } + @Override + public void forEach(ObjDoubleConsumer> action) { + Cache cache = registry.getComponent(Cache.class); + createStream().forEach(d -> action.accept(cache, d)); + } + + @Override + public void forEach(SerializableObjDoubleConsumer> action) { + forEach((ObjDoubleConsumer>) action); + } + @Override public void forEachOrdered(DoubleConsumer action) { injectCache(action); diff --git a/core/src/main/java/org/infinispan/stream/impl/local/LocalIntCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/local/LocalIntCacheStream.java index 29f844d19894..e6fa9f4799e8 100644 --- a/core/src/main/java/org/infinispan/stream/impl/local/LocalIntCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/local/LocalIntCacheStream.java @@ -170,6 +170,17 @@ public void forEach(SerializableIntConsumer action) { forEach((IntConsumer) action); } + @Override + public void forEach(ObjIntConsumer> action) { + Cache cache = registry.getComponent(Cache.class); + createStream().forEach(i -> action.accept(cache, i)); + } + + @Override + public void forEach(SerializableObjIntConsumer> action) { + forEach((ObjIntConsumer>) action); + } + @Override public void forEachOrdered(IntConsumer action) { injectCache(action); diff --git a/core/src/main/java/org/infinispan/stream/impl/local/LocalLongCacheStream.java b/core/src/main/java/org/infinispan/stream/impl/local/LocalLongCacheStream.java index 45a80b03134a..165f6505d7a8 100644 --- a/core/src/main/java/org/infinispan/stream/impl/local/LocalLongCacheStream.java +++ b/core/src/main/java/org/infinispan/stream/impl/local/LocalLongCacheStream.java @@ -170,6 +170,17 @@ public void forEach(SerializableLongConsumer action) { forEach((LongConsumer) action); } + @Override + public void forEach(ObjLongConsumer> action) { + Cache cache = registry.getComponent(Cache.class); + createStream().forEach(l -> action.accept(cache, l)); + } + + @Override + public void forEach(SerializableObjLongConsumer> action) { + forEach((ObjLongConsumer>) action); + } + @Override public void forEachOrdered(LongConsumer action) { injectCache(action); diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/AbstractForEachOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/AbstractForEachOperation.java index fd51ce59af77..0dafe139053a 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/AbstractForEachOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/AbstractForEachOperation.java @@ -55,10 +55,10 @@ public List performOperation(IntermediateCollector> response) { @Override public Collection> performOperationRehashAware( IntermediateCollector>> response) { - // We only support sequential streams for iterator rehash aware + // We only support sequential streams for forEach rehash aware BaseStream stream = supplier.get().sequential(); - List> collectedValues = new ArrayList(batchSize); + List> collectedValues = new ArrayList<>(batchSize); List currentList = new ArrayList<>(); ByRef currentKey = new ByRef<>(null); @@ -79,7 +79,7 @@ public Collection> performOperationRehashAware( } S convertedStream = ((S) stream); - // We rely on the fact that iterator processes 1 entry at a time + // We rely on the fact that forEach processes 1 entry at a time handleStreamForEach(convertedStream, currentList); if (!currentList.isEmpty()) { handleList(currentList); diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/SegmentRetryingOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/SegmentRetryingOperation.java index fe192fa971fe..289b5e4bad54 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/SegmentRetryingOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/SegmentRetryingOperation.java @@ -21,18 +21,18 @@ * @param type of the stream entries * @param type of the stream itself */ -public class SegmentRetryingOperation> extends BaseTerminalOperation +public class SegmentRetryingOperation, S2 extends S> extends BaseTerminalOperation implements TerminalOperation { private static final Log log = LogFactory.getLog(SegmentRetryingOperation.class); private static final BaseStream EMPTY = Stream.empty(); - private final Function function; + private final Function function; private transient AtomicReference> streamRef = new AtomicReference<>(EMPTY); private transient AtomicBoolean continueTrying = new AtomicBoolean(true); public SegmentRetryingOperation(Iterable intermediateOperations, - Supplier> supplier, Function function) { + Supplier> supplier, Function function) { super(intermediateOperations, supplier); this.function = function; } @@ -65,7 +65,7 @@ private E innerPerformOperation(BaseStream stream) { for (IntermediateOperation intOp : intermediateOperations) { stream = intOp.perform(stream); } - return function.apply((S) stream); + return function.apply((S2) stream); } @Override @@ -83,7 +83,7 @@ public E performOperation() { return keepTrying ? value : null; } - public Function getFunction() { + public Function getFunction() { return function; } } diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/SingleRunOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/SingleRunOperation.java index ce2a9ad0d980..e46696f6f277 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/SingleRunOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/SingleRunOperation.java @@ -15,13 +15,13 @@ * @param type of the stream entries * @param type of the stream itself */ -public class SingleRunOperation> extends BaseTerminalOperation +public class SingleRunOperation, S2 extends S> extends BaseTerminalOperation implements TerminalOperation { - private final Function function; + private final Function function; private transient AtomicBoolean complete; public SingleRunOperation(Iterable intermediateOperations, - Supplier> supplier, Function function) { + Supplier> supplier, Function function) { super(intermediateOperations, supplier); this.function = function; this.complete = new AtomicBoolean(); @@ -38,12 +38,12 @@ public E performOperation() { for (IntermediateOperation intOp : intermediateOperations) { stream = intOp.perform(stream); } - E value = function.apply((S) stream); + E value = function.apply((S2) stream); complete.set(true); return value; } - public Function getFunction() { + public Function getFunction() { return function; } } diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/TerminalOperationExternalizer.java b/core/src/main/java/org/infinispan/stream/impl/termop/TerminalOperationExternalizer.java index b0da25f99a60..1ffdd48c855a 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/TerminalOperationExternalizer.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/TerminalOperationExternalizer.java @@ -1,11 +1,13 @@ package org.infinispan.stream.impl.termop; +import org.infinispan.Cache; import org.infinispan.commons.io.UnsignedNumeric; import org.infinispan.commons.marshall.AdvancedExternalizer; import org.infinispan.commons.util.Util; import org.infinispan.marshall.core.Ids; import org.infinispan.stream.impl.intops.IntermediateOperation; import org.infinispan.stream.impl.termop.object.FlatMapIteratorOperation; +import org.infinispan.stream.impl.termop.object.ForEachBiOperation; import org.infinispan.stream.impl.termop.object.ForEachOperation; import org.infinispan.stream.impl.termop.object.MapIteratorOperation; import org.infinispan.stream.impl.termop.object.NoMapIteratorOperation; @@ -13,19 +15,29 @@ import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapDoubleOperation; import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapIntOperation; import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapLongOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjDoubleOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjIntOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachFlatMapObjLongOperation; import org.infinispan.stream.impl.termop.primitive.ForEachIntOperation; import org.infinispan.stream.impl.termop.primitive.ForEachLongOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjDoubleOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjIntOperation; +import org.infinispan.stream.impl.termop.primitive.ForEachObjLongOperation; import org.jboss.marshalling.util.IdentityIntMap; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Set; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.LongConsumer; +import java.util.function.ObjDoubleConsumer; +import java.util.function.ObjIntConsumer; +import java.util.function.ObjLongConsumer; /** * {@link AdvancedExternalizer} that provides functionality required for marshalling all of the various terminal @@ -44,6 +56,13 @@ public class TerminalOperationExternalizer implements AdvancedExternalizer> operations = new IdentityIntMap<>(); @@ -60,6 +79,13 @@ public TerminalOperationExternalizer() { operations.put(ForEachDoubleOperation.class, FOREACH_DOUBLE); operations.put(ForEachIntOperation.class, FOREACH_INT); operations.put(ForEachLongOperation.class, FOREACH_LONG); + operations.put(ForEachBiOperation.class, FOREACH_BI); + operations.put(ForEachObjDoubleOperation.class, FOREACH_OBJ_DOUBLE); + operations.put(ForEachObjIntOperation.class, FOREACH_OBJ_INT); + operations.put(ForEachObjLongOperation.class, FOREACH_OBJ_LONG); + operations.put(ForEachFlatMapObjDoubleOperation.class, FOREACH_FLAT_OBJ_DOUBLE); + operations.put(ForEachFlatMapObjIntOperation.class, FOREACH_FLAT_OBJ_INT); + operations.put(ForEachFlatMapObjLongOperation.class, FOREACH_FLAT_OBJ_LONG); } @Override @@ -68,7 +94,10 @@ public Set> getTypeClasses() { SegmentRetryingOperation.class, FlatMapIteratorOperation.class, MapIteratorOperation.class, NoMapIteratorOperation.class, ForEachOperation.class, ForEachFlatMapDoubleOperation.class, ForEachFlatMapIntOperation.class, ForEachFlatMapLongOperation.class, - ForEachDoubleOperation.class, ForEachIntOperation.class, ForEachLongOperation.class); + ForEachDoubleOperation.class, ForEachIntOperation.class, ForEachLongOperation.class, + ForEachBiOperation.class, ForEachObjDoubleOperation.class, ForEachObjIntOperation.class, + ForEachObjLongOperation.class, ForEachFlatMapObjDoubleOperation.class, + ForEachFlatMapObjIntOperation.class, ForEachFlatMapObjLongOperation.class); } @Override @@ -125,6 +154,36 @@ public void writeObject(ObjectOutput output, BaseTerminalOperation object) throw UnsignedNumeric.writeUnsignedInt(output, ((ForEachLongOperation) object).getBatchSize()); output.writeObject(((ForEachLongOperation) object).getConsumer()); break; + case FOREACH_BI: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachBiOperation) object).getBatchSize()); + output.writeObject(((ForEachBiOperation) object).getConsumer()); + break; + case FOREACH_OBJ_DOUBLE: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachObjDoubleOperation) object).getBatchSize()); + output.writeObject(((ForEachObjDoubleOperation) object).getConsumer()); + break; + case FOREACH_OBJ_INT: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachObjIntOperation) object).getBatchSize()); + output.writeObject(((ForEachObjIntOperation) object).getConsumer()); + break; + case FOREACH_OBJ_LONG: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachObjLongOperation) object).getBatchSize()); + output.writeObject(((ForEachObjLongOperation) object).getConsumer()); + break; + case FOREACH_FLAT_OBJ_DOUBLE: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachFlatMapObjDoubleOperation) object).getBatchSize()); + output.writeObject(((ForEachFlatMapObjDoubleOperation) object).getConsumer()); + break; + case FOREACH_FLAT_OBJ_INT: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachFlatMapObjIntOperation) object).getBatchSize()); + output.writeObject(((ForEachFlatMapObjIntOperation) object).getConsumer()); + break; + case FOREACH_FLAT_OBJ_LONG: + UnsignedNumeric.writeUnsignedInt(output, ((ForEachFlatMapObjLongOperation) object).getBatchSize()); + output.writeObject(((ForEachFlatMapObjLongOperation) object).getConsumer()); + break; + default: + throw new IllegalArgumentException(); } } @@ -168,6 +227,27 @@ public BaseTerminalOperation readObject(ObjectInput input) throws IOException, C case FOREACH_LONG: return new ForEachLongOperation<>((Iterable) input.readObject(), null, UnsignedNumeric.readUnsignedInt(input), (LongConsumer) input.readObject()); + case FOREACH_BI: + return new ForEachBiOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (BiConsumer) input.readObject()); + case FOREACH_OBJ_DOUBLE: + return new ForEachObjDoubleOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjDoubleConsumer) input.readObject()); + case FOREACH_OBJ_INT: + return new ForEachObjIntOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjIntConsumer) input.readObject()); + case FOREACH_OBJ_LONG: + return new ForEachObjLongOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjLongConsumer) input.readObject()); + case FOREACH_FLAT_OBJ_DOUBLE: + return new ForEachFlatMapObjDoubleOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjDoubleConsumer) input.readObject()); + case FOREACH_FLAT_OBJ_INT: + return new ForEachFlatMapObjIntOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjIntConsumer) input.readObject()); + case FOREACH_FLAT_OBJ_LONG: + return new ForEachFlatMapObjLongOperation<>((Iterable) input.readObject(), null, + UnsignedNumeric.readUnsignedInt(input), (ObjLongConsumer) input.readObject()); default: throw new IllegalArgumentException("Found invalid number " + number); diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/object/ForEachBiOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/object/ForEachBiOperation.java new file mode 100644 index 000000000000..857adf19c868 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/object/ForEachBiOperation.java @@ -0,0 +1,50 @@ +package org.infinispan.stream.impl.termop.object; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.CacheAware; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.AbstractForEachOperation; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * Terminal operation that handles for each where no map operations are defined + * @param key type of the supplied stream + * @param resulting value type + */ +public class ForEachBiOperation extends AbstractForEachOperation> { + private final BiConsumer, ? super V> consumer; + private transient Cache cache; + + public ForEachBiOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, BiConsumer, ? super V> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleList(List list) { + list.forEach(e -> consumer.accept(cache, e)); + } + + @Override + protected void handleStreamForEach(Stream stream, List list) { + stream.forEach(list::add); + } + + public BiConsumer, ? super V> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachDoubleOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachDoubleOperation.java new file mode 100644 index 000000000000..df4ee30da013 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachDoubleOperation.java @@ -0,0 +1,90 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.container.entries.ImmortalCacheEntry; +import org.infinispan.stream.impl.KeyTrackingTerminalOperation; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.BaseTerminalOperation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.BaseStream; +import java.util.stream.DoubleStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link DoubleStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public abstract class AbstractForEachDoubleOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { + private final int batchSize; + + public AbstractForEachDoubleOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize) { + super(intermediateOperations, supplier); + this.batchSize = batchSize; + } + + @Override + public boolean lostSegment(boolean stopIfLost) { + // TODO: stop this early + return true; + } + + @Override + public List performOperation(IntermediateCollector> response) { + /** + * This is for rehash only! {@link org.infinispan.stream.impl.termop.SingleRunOperation} should always be used for + * non rehash + */ + throw new UnsupportedOperationException(); + } + + protected abstract void handleArray(double[] array, int size); + + @Override + public Collection> performOperationRehashAware( + IntermediateCollector>> response) { + // We only support sequential streams for iterator rehash aware + BaseStream stream = supplier.get().sequential(); + + List> collectedValues = new ArrayList<>(batchSize); + + double[] list = new double[batchSize]; + AtomicInteger offset = new AtomicInteger(); + Object[] currentKey = new Object[1]; + stream = ((Stream>) stream).peek(e -> { + if (offset.get() > 0) { + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + if (collectedValues.size() >= batchSize) { + handleArray(list, offset.get()); + response.sendDataResonse(collectedValues); + collectedValues.clear(); + offset.set(0); + } + } + currentKey[0] = e.getKey(); + }); + for (IntermediateOperation intermediateOperation : intermediateOperations) { + stream = intermediateOperation.perform(stream); + } + + DoubleStream convertedStream = ((DoubleStream)stream); + // We rely on the fact that iterator processes 1 entry at a time when sequential + convertedStream.forEach(d -> list[offset.getAndIncrement()] = d); + if (offset.get() > 0) { + handleArray(list, offset.get()); + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + } + return collectedValues; + } + + public int getBatchSize() { + return batchSize; + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachIntOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachIntOperation.java new file mode 100644 index 000000000000..5cd3d08dd851 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachIntOperation.java @@ -0,0 +1,90 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.container.entries.ImmortalCacheEntry; +import org.infinispan.stream.impl.KeyTrackingTerminalOperation; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.BaseTerminalOperation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.BaseStream; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link IntStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public abstract class AbstractForEachIntOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { + private final int batchSize; + + public AbstractForEachIntOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize) { + super(intermediateOperations, supplier); + this.batchSize = batchSize; + } + + @Override + public boolean lostSegment(boolean stopIfLost) { + // TODO: stop this early + return true; + } + + @Override + public List performOperation(IntermediateCollector> response) { + /** + * This is for rehash only! {@link org.infinispan.stream.impl.termop.SingleRunOperation} should always be used for + * non rehash + */ + throw new UnsupportedOperationException(); + } + + protected abstract void handleArray(int[] array, int size); + + @Override + public Collection> performOperationRehashAware( + IntermediateCollector>> response) { + // We only support sequential streams for iterator rehash aware + BaseStream stream = supplier.get().sequential(); + + List> collectedValues = new ArrayList<>(batchSize); + + int[] list = new int[batchSize]; + AtomicInteger offset = new AtomicInteger(); + Object[] currentKey = new Object[1]; + stream = ((Stream>) stream).peek(e -> { + if (offset.get() > 0) { + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + if (collectedValues.size() >= batchSize) { + handleArray(list, offset.get()); + response.sendDataResonse(collectedValues); + collectedValues.clear(); + offset.set(0); + } + } + currentKey[0] = e.getKey(); + }); + for (IntermediateOperation intermediateOperation : intermediateOperations) { + stream = intermediateOperation.perform(stream); + } + + IntStream convertedStream = ((IntStream)stream); + // We rely on the fact that iterator processes 1 entry at a time when sequential + convertedStream.forEach(d -> list[offset.getAndIncrement()] = d); + if (offset.get() > 0) { + handleArray(list, offset.get()); + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + } + return collectedValues; + } + + public int getBatchSize() { + return batchSize; + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachLongOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachLongOperation.java new file mode 100644 index 000000000000..034849de6430 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/AbstractForEachLongOperation.java @@ -0,0 +1,90 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.container.entries.ImmortalCacheEntry; +import org.infinispan.stream.impl.KeyTrackingTerminalOperation; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.BaseTerminalOperation; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.BaseStream; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link LongStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public abstract class AbstractForEachLongOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { + private final int batchSize; + + public AbstractForEachLongOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize) { + super(intermediateOperations, supplier); + this.batchSize = batchSize; + } + + @Override + public boolean lostSegment(boolean stopIfLost) { + // TODO: stop this early + return true; + } + + @Override + public List performOperation(IntermediateCollector> response) { + /** + * This is for rehash only! {@link org.infinispan.stream.impl.termop.SingleRunOperation} should always be used for + * non rehash + */ + throw new UnsupportedOperationException(); + } + + protected abstract void handleArray(long[] array, int size); + + @Override + public Collection> performOperationRehashAware( + IntermediateCollector>> response) { + // We only support sequential streams for iterator rehash aware + BaseStream stream = supplier.get().sequential(); + + List> collectedValues = new ArrayList<>(batchSize); + + long[] list = new long[batchSize]; + AtomicInteger offset = new AtomicInteger(); + Object[] currentKey = new Object[1]; + stream = ((Stream>) stream).peek(e -> { + if (offset.get() > 0) { + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + if (collectedValues.size() >= batchSize) { + handleArray(list, offset.get()); + response.sendDataResonse(collectedValues); + collectedValues.clear(); + offset.set(0); + } + } + currentKey[0] = e.getKey(); + }); + for (IntermediateOperation intermediateOperation : intermediateOperations) { + stream = intermediateOperation.perform(stream); + } + + LongStream convertedStream = ((LongStream)stream); + // We rely on the fact that iterator processes 1 entry at a time when sequential + convertedStream.forEach(d -> list[offset.getAndIncrement()] = d); + if (offset.get() > 0) { + handleArray(list, offset.get()); + collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); + } + return collectedValues; + } + + public int getBatchSize() { + return batchSize; + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachDoubleOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachDoubleOperation.java index 469efc807c6e..2b0e516e74fe 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachDoubleOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachDoubleOperation.java @@ -2,21 +2,12 @@ import org.infinispan.Cache; import org.infinispan.container.entries.CacheEntry; -import org.infinispan.container.entries.ImmortalCacheEntry; import org.infinispan.factories.ComponentRegistry; -import org.infinispan.stream.impl.KeyTrackingTerminalOperation; -import org.infinispan.stream.impl.intops.IntermediateOperation; -import org.infinispan.stream.impl.termop.BaseTerminalOperation; import org.infinispan.stream.CacheAware; +import org.infinispan.stream.impl.intops.IntermediateOperation; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.DoubleConsumer; import java.util.function.Supplier; -import java.util.stream.BaseStream; import java.util.stream.DoubleStream; import java.util.stream.Stream; @@ -25,75 +16,20 @@ * {@link DoubleStream}. Note this means it is an implied map intermediate operation. * @param key type of the supplied stream */ -public class ForEachDoubleOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { - private final int batchSize; +public class ForEachDoubleOperation extends AbstractForEachDoubleOperation { private final DoubleConsumer consumer; public ForEachDoubleOperation(Iterable intermediateOperations, Supplier> supplier, int batchSize, DoubleConsumer consumer) { - super(intermediateOperations, supplier); - this.batchSize = batchSize; + super(intermediateOperations, supplier, batchSize); this.consumer = consumer; } @Override - public boolean lostSegment(boolean stopIfLost) { - // TODO: stop this early - return true; - } - - @Override - public List performOperation(IntermediateCollector> response) { - /** - * This is for rehash only! {@link org.infinispan.stream.impl.termop.SingleRunOperation} should always be used for - * non rehash - */ - throw new UnsupportedOperationException(); - } - - @Override - public Collection> performOperationRehashAware( - IntermediateCollector>> response) { - // We only support sequential streams for iterator rehash aware - BaseStream stream = supplier.get().sequential(); - - List> collectedValues = new ArrayList(batchSize); - - double[] list = new double[batchSize]; - AtomicInteger offset = new AtomicInteger(); - Object[] currentKey = new Object[1]; - stream = ((Stream>) stream).peek(e -> { - if (offset.get() > 0) { - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - if (collectedValues.size() >= batchSize) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - response.sendDataResonse(collectedValues); - collectedValues.clear(); - offset.set(0); - } - } - currentKey[0] = e.getKey(); - }); - for (IntermediateOperation intermediateOperation : intermediateOperations) { - stream = intermediateOperation.perform(stream); + protected void handleArray(double[] array, int size) { + for (int i = 0; i < size; ++i) { + consumer.accept(array[i]); } - - DoubleStream convertedStream = ((DoubleStream)stream); - // We rely on the fact that iterator processes 1 entry at a time when sequential - convertedStream.forEach(d -> list[offset.getAndIncrement()] = d); - if (offset.get() > 0) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - } - return collectedValues; - } - - public int getBatchSize() { - return batchSize; } public DoubleConsumer getConsumer() { diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjDoubleOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjDoubleOperation.java new file mode 100644 index 000000000000..c9e59a5f9447 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjDoubleOperation.java @@ -0,0 +1,51 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.CacheAware; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.AbstractForEachOperation; + +import java.util.List; +import java.util.function.DoubleConsumer; +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where flat map operation is performed on a + * {@link DoubleStream}. + * @param key type of the supplied stream + */ +public class ForEachFlatMapObjDoubleOperation extends AbstractForEachOperation { + private final ObjDoubleConsumer> consumer; + private transient Cache cache; + + public ForEachFlatMapObjDoubleOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjDoubleConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleList(List list) { + list.forEach(d -> consumer.accept(cache, d)); + } + + @Override + protected void handleStreamForEach(DoubleStream stream, List list) { + stream.forEach(list::add); + } + + public ObjDoubleConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjIntOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjIntOperation.java new file mode 100644 index 000000000000..71128c7b8498 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjIntOperation.java @@ -0,0 +1,49 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.AbstractForEachOperation; + +import java.util.List; +import java.util.function.ObjIntConsumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where flat map operation is performed on a + * {@link IntStream}. + * @param key type of the supplied stream + */ +public class ForEachFlatMapObjIntOperation extends AbstractForEachOperation { + private final ObjIntConsumer> consumer; + private transient Cache cache; + + public ForEachFlatMapObjIntOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjIntConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleList(List list) { + list.forEach(d -> consumer.accept(cache, d)); + } + + @Override + protected void handleStreamForEach(IntStream stream, List list) { + stream.forEach(list::add); + } + + public ObjIntConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjLongOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjLongOperation.java new file mode 100644 index 000000000000..742101e03723 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachFlatMapObjLongOperation.java @@ -0,0 +1,49 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.impl.intops.IntermediateOperation; +import org.infinispan.stream.impl.termop.AbstractForEachOperation; + +import java.util.List; +import java.util.function.ObjLongConsumer; +import java.util.function.Supplier; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where flat map operation is performed on a + * {@link LongStream}. + * @param key type of the supplied stream + */ +public class ForEachFlatMapObjLongOperation extends AbstractForEachOperation { + private final ObjLongConsumer> consumer; + private transient Cache cache; + + public ForEachFlatMapObjLongOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjLongConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleList(List list) { + list.forEach(d -> consumer.accept(cache, d)); + } + + @Override + protected void handleStreamForEach(LongStream stream, List list) { + stream.forEach(list::add); + } + + public ObjLongConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachIntOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachIntOperation.java index d1fc35ac0aec..41689e789184 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachIntOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachIntOperation.java @@ -2,22 +2,12 @@ import org.infinispan.Cache; import org.infinispan.container.entries.CacheEntry; -import org.infinispan.container.entries.ImmortalCacheEntry; import org.infinispan.factories.ComponentRegistry; -import org.infinispan.stream.impl.KeyTrackingTerminalOperation; -import org.infinispan.stream.impl.intops.IntermediateOperation; -import org.infinispan.stream.impl.termop.BaseTerminalOperation; -import org.infinispan.stream.impl.termop.object.NoMapIteratorOperation; import org.infinispan.stream.CacheAware; +import org.infinispan.stream.impl.intops.IntermediateOperation; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntConsumer; import java.util.function.Supplier; -import java.util.stream.BaseStream; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -26,74 +16,20 @@ * {@link IntStream}. Note this means it is an implied map intermediate operation. * @param key type of the supplied stream */ -public class ForEachIntOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { - private final int batchSize; +public class ForEachIntOperation extends AbstractForEachIntOperation { private final IntConsumer consumer; public ForEachIntOperation(Iterable intermediateOperations, Supplier> supplier, int batchSize, IntConsumer consumer) { - super(intermediateOperations, supplier); - this.batchSize = batchSize; + super(intermediateOperations, supplier, batchSize); this.consumer = consumer; } @Override - public boolean lostSegment(boolean stopIfLost) { - // TODO: stop this early - return true; - } - - @Override - public List performOperation(IntermediateCollector> response) { - /** - * This is for rehash only! {@link NoMapIteratorOperation} should always be used for non rehash - */ - throw new UnsupportedOperationException(); - } - - @Override - public Collection> performOperationRehashAware( - IntermediateCollector>> response) { - // We only support sequential streams for iterator rehash aware - BaseStream stream = supplier.get().sequential(); - - List> collectedValues = new ArrayList(batchSize); - - int[] list = new int[batchSize]; - AtomicInteger offset = new AtomicInteger(); - Object[] currentKey = new Object[1]; - stream = ((Stream>) stream).peek(e -> { - if (offset.get() > 0) { - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - if (collectedValues.size() >= batchSize) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - response.sendDataResonse(collectedValues); - collectedValues.clear(); - offset.set(0); - } - } - currentKey[0] = e.getKey(); - }); - for (IntermediateOperation intermediateOperation : intermediateOperations) { - stream = intermediateOperation.perform(stream); + protected void handleArray(int[] array, int size) { + for (int i = 0; i < size; ++i) { + consumer.accept(array[i]); } - - IntStream convertedStream = ((IntStream)stream); - // We rely on the fact that iterator processes 1 entry at a time when sequential - convertedStream.forEach(i -> list[offset.getAndIncrement()] = i); - if (offset.get() > 0) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - } - return collectedValues; - } - - public int getBatchSize() { - return batchSize; } public IntConsumer getConsumer() { diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachLongOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachLongOperation.java index 2ddb7d896e91..d068748c1eb1 100644 --- a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachLongOperation.java +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachLongOperation.java @@ -2,22 +2,12 @@ import org.infinispan.Cache; import org.infinispan.container.entries.CacheEntry; -import org.infinispan.container.entries.ImmortalCacheEntry; import org.infinispan.factories.ComponentRegistry; -import org.infinispan.stream.impl.KeyTrackingTerminalOperation; -import org.infinispan.stream.impl.intops.IntermediateOperation; -import org.infinispan.stream.impl.termop.BaseTerminalOperation; -import org.infinispan.stream.impl.termop.object.NoMapIteratorOperation; import org.infinispan.stream.CacheAware; +import org.infinispan.stream.impl.intops.IntermediateOperation; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongConsumer; import java.util.function.Supplier; -import java.util.stream.BaseStream; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -26,74 +16,20 @@ * {@link LongStream}. Note this means it is an implied map intermediate operation. * @param key type of the supplied stream */ -public class ForEachLongOperation extends BaseTerminalOperation implements KeyTrackingTerminalOperation { - private final int batchSize; +public class ForEachLongOperation extends AbstractForEachLongOperation { private final LongConsumer consumer; public ForEachLongOperation(Iterable intermediateOperations, Supplier> supplier, int batchSize, LongConsumer consumer) { - super(intermediateOperations, supplier); - this.batchSize = batchSize; + super(intermediateOperations, supplier, batchSize); this.consumer = consumer; } @Override - public boolean lostSegment(boolean stopIfLost) { - // TODO: stop this early - return true; - } - - @Override - public List performOperation(IntermediateCollector> response) { - /** - * This is for rehash only! {@link NoMapIteratorOperation} should always be used for non rehash - */ - throw new UnsupportedOperationException(); - } - - @Override - public Collection> performOperationRehashAware( - IntermediateCollector>> response) { - // We only support sequential streams for iterator rehash aware - BaseStream stream = supplier.get().sequential(); - - List> collectedValues = new ArrayList(batchSize); - - long[] list = new long[batchSize]; - AtomicInteger offset = new AtomicInteger(); - Object[] currentKey = new Object[1]; - stream = ((Stream>) stream).peek(e -> { - if (offset.get() > 0) { - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - if (collectedValues.size() >= batchSize) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - response.sendDataResonse(collectedValues); - collectedValues.clear(); - offset.set(0); - } - } - currentKey[0] = e.getKey(); - }); - for (IntermediateOperation intermediateOperation : intermediateOperations) { - stream = intermediateOperation.perform(stream); + protected void handleArray(long[] array, int size) { + for (int i = 0; i < size; i++) { + consumer.accept(array[i]); } - - LongStream convertedStream = ((LongStream)stream); - // We rely on the fact that iterator processes 1 entry at a time when sequential - convertedStream.forEach(d -> list[offset.getAndIncrement()] = d); - if (offset.get() > 0) { - for (int i = 0; i < offset.get(); ++i) { - consumer.accept(list[i]); - } - collectedValues.add(new ImmortalCacheEntry(currentKey[0], currentKey[0])); - } - return collectedValues; - } - - public int getBatchSize() { - return batchSize; } public LongConsumer getConsumer() { diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjDoubleOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjDoubleOperation.java new file mode 100644 index 000000000000..86002d6ec5c4 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjDoubleOperation.java @@ -0,0 +1,44 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.impl.intops.IntermediateOperation; + +import java.util.function.ObjDoubleConsumer; +import java.util.function.Supplier; +import java.util.stream.DoubleStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link DoubleStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public class ForEachObjDoubleOperation extends AbstractForEachDoubleOperation { + private final ObjDoubleConsumer> consumer; + private transient Cache cache; + + public ForEachObjDoubleOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjDoubleConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleArray(double[] array, int size) { + for (int i = 0; i < size; ++i) { + consumer.accept(cache, array[i]); + } + } + + public ObjDoubleConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjIntOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjIntOperation.java new file mode 100644 index 000000000000..bb73ff5f8097 --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjIntOperation.java @@ -0,0 +1,44 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.impl.intops.IntermediateOperation; + +import java.util.function.ObjIntConsumer; +import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link IntStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public class ForEachObjIntOperation extends AbstractForEachIntOperation { + private final ObjIntConsumer> consumer; + private transient Cache cache; + + public ForEachObjIntOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjIntConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleArray(int[] array, int size) { + for (int i = 0; i < size; ++i) { + consumer.accept(cache, array[i]); + } + } + + public ObjIntConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjLongOperation.java b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjLongOperation.java new file mode 100644 index 000000000000..ab90ad52719e --- /dev/null +++ b/core/src/main/java/org/infinispan/stream/impl/termop/primitive/ForEachObjLongOperation.java @@ -0,0 +1,44 @@ +package org.infinispan.stream.impl.termop.primitive; + +import org.infinispan.Cache; +import org.infinispan.container.entries.CacheEntry; +import org.infinispan.factories.ComponentRegistry; +import org.infinispan.stream.impl.intops.IntermediateOperation; + +import java.util.function.ObjLongConsumer; +import java.util.function.Supplier; +import java.util.stream.LongStream; +import java.util.stream.Stream; + +/** + * Terminal rehash aware operation that handles for each where no flat map operations are defined on a + * {@link LongStream}. Note this means it is an implied map intermediate operation. + * @param key type of the supplied stream + */ +public class ForEachObjLongOperation extends AbstractForEachLongOperation { + private final ObjLongConsumer> consumer; + private transient Cache cache; + + public ForEachObjLongOperation(Iterable intermediateOperations, + Supplier> supplier, int batchSize, ObjLongConsumer> consumer) { + super(intermediateOperations, supplier, batchSize); + this.consumer = consumer; + } + + @Override + protected void handleArray(long[] array, int size) { + for (int i = 0; i < size; ++i) { + consumer.accept(cache, array[i]); + } + } + + public ObjLongConsumer> getConsumer() { + return consumer; + } + + @Override + public void handleInjection(ComponentRegistry registry) { + super.handleInjection(registry); + cache = registry.getComponent(Cache.class); + } +} diff --git a/core/src/test/java/org/infinispan/stream/BaseStreamTest.java b/core/src/test/java/org/infinispan/stream/BaseStreamTest.java index 7cd219cd7035..ca06ad189703 100644 --- a/core/src/test/java/org/infinispan/stream/BaseStreamTest.java +++ b/core/src/test/java/org/infinispan/stream/BaseStreamTest.java @@ -317,6 +317,29 @@ public void testObjForEachCacheInjected() { } } + public void testObjForEachBiConsumer() { + Cache cache = getCache(0); + + int cacheOffset = populateNextForEachStructure(cache); + int atomicOffset = populateNextForEachStructure(new AtomicInteger()); + try { + testIntOperation(() -> { + createStream(cache.entrySet()).forEach((c, e) -> { + Cache localCache = getForEachObject(cacheOffset); + if (c != null && localCache != null && c.getName().equals(localCache.getName())) { + ((AtomicInteger) getForEachObject(atomicOffset)).addAndGet(e.getKey()); + } else { + fail("Did not receive correct cache!"); + } + }); + return ((AtomicInteger) getForEachObject(atomicOffset)).get(); + }, cache); + } finally { + clearForEachObject(cacheOffset); + clearForEachObject(atomicOffset); + } + } + public void testObjKeySetForEachCacheInjected() { Cache cache = getCache(0); @@ -808,6 +831,53 @@ public void testIntForEachCacheInjected() { } } + public void testIntForEachBiConsumer() { + Cache cache = getCache(0); + int cacheOffset = populateNextForEachStructure(cache); + int atomicOffset = populateNextForEachStructure(new AtomicInteger()); + + try { + testIntOperation(() -> { + createStream(cache.entrySet()).mapToInt(toInt).forEach((c, i) -> { + Cache localCache = getForEachObject(cacheOffset); + if (c != null && localCache != null && c.getName().equals(localCache.getName())) { + AtomicInteger atomicInteger = getForEachObject(atomicOffset); + atomicInteger.addAndGet(i); + } + }); + return ((AtomicInteger) getForEachObject(atomicOffset)).get(); + }, cache); + } finally { + clearForEachObject(cacheOffset); + clearForEachObject(atomicOffset); + } + } + + public void testIntFlatMapObjConsumerForEach() { + Cache cache = getCache(0); + String cacheName = cache.getName(); + int range = 10; + // First populate the cache with a bunch of values + IntStream.range(0, range).boxed().forEach(i -> cache.put(i, i + "-value")); + + assertEquals(range, cache.size()); + CacheSet> entrySet = cache.entrySet(); + + int offset = populateNextForEachStructure(new AtomicInteger()); + try { + createStream(entrySet).distributedBatchSize(5).mapToInt(toInt).flatMap(i -> IntStream.of(i, 2)) + .forEach((c, e) -> { + assertEquals(cacheName, c.getName()); + AtomicInteger atomic = getForEachObject(offset); + atomic.addAndGet(e); + }); + AtomicInteger atomic = getForEachObject(offset); + assertEquals((range - 1) * (range / 2) + 2 * range, atomic.get()); + } finally { + clearForEachObject(offset); + } + } + public void testIntIterator() { Cache cache = getCache(0); @@ -1186,6 +1256,53 @@ public void testLongForEachCacheInjected() { } } + public void testLongForEachBiConsumer() { + Cache cache = getCache(0); + int cacheOffset = populateNextForEachStructure(cache); + int atomicOffset = populateNextForEachStructure(new AtomicLong()); + + try { + testLongOperation(() -> { + createStream(cache.entrySet()).mapToLong(toLong).forEach((c, i) -> { + Cache localCache = getForEachObject(cacheOffset); + if (c != null && localCache != null && c.getName().equals(localCache.getName())) { + AtomicLong atomicLong = getForEachObject(atomicOffset); + atomicLong.addAndGet(i); + } + }); + return ((AtomicLong) getForEachObject(atomicOffset)).get(); + }, cache); + } finally { + clearForEachObject(cacheOffset); + clearForEachObject(atomicOffset); + } + } + + public void testLongFlatMapObjConsumerForEach() { + Cache cache = getCache(0); + String cacheName = cache.getName(); + int range = 10; + // First populate the cache with a bunch of values + LongStream.range(0, range).boxed().forEach(i -> cache.put(i, i + "-value")); + + assertEquals(range, cache.size()); + CacheSet> entrySet = cache.entrySet(); + + int offset = populateNextForEachStructure(new AtomicLong()); + try { + createStream(entrySet).distributedBatchSize(5).mapToLong(toLong).flatMap(i -> LongStream.of(i, 2)) + .forEach((c, e) -> { + assertEquals(cacheName, c.getName()); + AtomicLong atomic = getForEachObject(offset); + atomic.addAndGet(e); + }); + AtomicLong atomic = getForEachObject(offset); + assertEquals((range - 1) * (range / 2) + 2 * range, atomic.get()); + } finally { + clearForEachObject(offset); + } + } + public void testLongIterator() { Cache cache = getCache(0); @@ -1582,6 +1699,62 @@ public void testDoubleForEachCacheInjected() { } } + public void testDoubleForEachBiConsumer() { + Cache cache = getCache(0); + int cacheOffset = populateNextForEachStructure(cache); + int offset = populateNextForEachStructure(new DoubleSummaryStatistics()); + + try { + testDoubleOperation(() -> { + createStream(cache.entrySet()).mapToDouble(toDouble).forEach((c, d) -> { + Cache localCache = getForEachObject(cacheOffset); + if (c != null && localCache != null && c.getName().equals(localCache.getName())) { + DoubleSummaryStatistics stats = getForEachObject(offset); + synchronized (stats) { + stats.accept(d); + } + } + }); + DoubleSummaryStatistics stats = getForEachObject(offset); + return stats; + }, cache); + } finally { + clearForEachObject(cacheOffset); + clearForEachObject(offset); + } + } + + public void testDoubleFlatMapObjConsumerForEach() { + Cache cache = getCache(0); + String cacheName = cache.getName(); + int range = 10; + // First populate the cache with a bunch of values + DoubleStream.iterate(0.0, d -> d + .5).limit(10).boxed().forEach(i -> cache.put(i, i + "-value")); + + assertEquals(range, cache.size()); + CacheSet> entrySet = cache.entrySet(); + + int offset = populateNextForEachStructure(new DoubleSummaryStatistics()); + try { + createStream(entrySet).distributedBatchSize(5).mapToDouble(toDouble).flatMap(e -> DoubleStream.of(e, 2.25)) + .forEach((c, e) -> { + assertEquals(cacheName, c.getName()); + DoubleSummaryStatistics stats = getForEachObject(offset); + synchronized (stats) { + stats.accept(e); + } + }); + DoubleSummaryStatistics stats = getForEachObject(offset); + assertEquals(2.25, stats.getAverage()); + assertEquals(0.0, stats.getMin()); + assertEquals(4.5, stats.getMax()); + assertEquals(20, stats.getCount()); + assertEquals(45.0, stats.getSum()); + } finally { + clearForEachObject(offset); + } + } + public void testDoubleIterator() { Cache cache = getCache(0); testDoubleOperation(() -> {