Skip to content

Commit

Permalink
ISPN-6383 Add forEach method to CacheStream that takes BiConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns authored and tristantarrant committed Mar 18, 2016
1 parent 0798c57 commit e455a8c
Show file tree
Hide file tree
Showing 33 changed files with 1,309 additions and 288 deletions.
21 changes: 21 additions & 0 deletions core/src/main/java/org/infinispan/CacheStream.java
Expand Up @@ -217,6 +217,27 @@ interface SegmentCompletionListener {
*/
void forEach(SerializableConsumer<? super R> action);

/**
* Same as {@link CacheStream#forEach(Consumer)} except that it takes a {@link BiConsumer} that provides access
* to the underlying {@link Cache} that is backing this stream.
* <p>
* Note that the <code>CacheAware</code> interface is not supported for injection using this method as the cache
* is provided in the consumer directly.
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(BiConsumer<Cache<K, V>, ? super R> action);

/**
* Same as {@link CacheStream#forEach(BiConsumer)} except that the <code>BiConsumer</code> must also implement
* <code>Serializable</code>
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(SerializableBiConsumer<Cache<K, V>, ? super R> action);

/**
* {@inheritDoc}
* <p>Usage of this operator requires closing this stream after you are done with the iterator. The preferred
Expand Down
23 changes: 22 additions & 1 deletion core/src/main/java/org/infinispan/DoubleCacheStream.java
@@ -1,5 +1,6 @@
package org.infinispan;

import org.infinispan.util.function.SerializableBiConsumer;
import org.infinispan.util.function.SerializableDoubleBinaryOperator;
import org.infinispan.util.function.SerializableDoubleConsumer;
import org.infinispan.util.function.SerializableDoubleFunction;
Expand All @@ -8,7 +9,6 @@
import org.infinispan.util.function.SerializableDoubleToLongFunction;
import org.infinispan.util.function.SerializableDoubleUnaryOperator;
import org.infinispan.util.function.SerializableObjDoubleConsumer;
import org.infinispan.util.function.SerializableBiConsumer;
import org.infinispan.util.function.SerializableSupplier;

import java.util.OptionalDouble;
Expand Down Expand Up @@ -198,6 +198,27 @@ public interface DoubleCacheStream extends DoubleStream {
*/
void forEach(SerializableDoubleConsumer action);

/**
* Same as {@link DoubleCacheStream#forEach(DoubleConsumer)} except that it takes an {@link ObjDoubleConsumer} that
* provides access to the underlying {@link Cache} that is backing this stream.
* <p>
* Note that the <code>CacheAware</code> interface is not supported for injection using this method as the cache
* is provided in the consumer directly.
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(ObjDoubleConsumer<Cache<K, V>> action);

/**
* Same as {@link DoubleCacheStream#forEach(ObjDoubleConsumer)} except that the <code>BiConsumer</code> must also implement
* <code>Serializable</code>
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(SerializableObjDoubleConsumer<Cache<K, V>> action);

/**
* Same as {@link DoubleCacheStream#reduce(double, DoubleBinaryOperator)} except that the DoubleBinaryOperator must
* also implement <code>Serializable</code>
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/infinispan/IntCacheStream.java
Expand Up @@ -198,6 +198,27 @@ public interface IntCacheStream extends IntStream {
*/
void forEach(SerializableIntConsumer action);

/**
* Same as {@link IntCacheStream#forEach(IntConsumer)} except that it takes an {@link ObjIntConsumer} that
* provides access to the underlying {@link Cache} that is backing this stream.
* <p>
* Note that the <code>CacheAware</code> interface is not supported for injection using this method as the cache
* is provided in the consumer directly.
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(ObjIntConsumer<Cache<K, V>> action);

/**
* Same as {@link IntCacheStream#forEach(ObjIntConsumer)} except that the <code>BiConsumer</code> must also implement
* <code>Serializable</code>
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(SerializableObjIntConsumer<Cache<K, V>> action);

/**
* Same as {@link IntCacheStream#reduce(int, IntBinaryOperator)} except that the IntBinaryOperator
* must also implement <code>Serializable</code>
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/org/infinispan/LongCacheStream.java
Expand Up @@ -206,6 +206,27 @@ public interface LongCacheStream extends LongStream {
*/
void forEach(SerializableLongConsumer action);

/**
* Same as {@link LongCacheStream#forEach(LongConsumer)} except that it takes an {@link ObjLongConsumer} that
* provides access to the underlying {@link Cache} that is backing this stream.
* <p>
* Note that the <code>CacheAware</code> interface is not supported for injection using this method as the cache
* is provided in the consumer directly.
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(ObjLongConsumer<Cache<K, V>> action);

/**
* Same as {@link LongCacheStream#forEach(ObjLongConsumer)} except that the <code>BiConsumer</code> must also implement
* <code>Serializable</code>
* @param action consumer to be ran for each element in the stream
* @param <K> key type of the cache
* @param <V> value type of the cache
*/
<K, V> void forEach(SerializableObjLongConsumer<Cache<K, V>> action);

/**
* Same as {@link LongCacheStream#reduce(long, LongBinaryOperator)} except that the LongBinaryOperator must
* also implement Serializable.
Expand Down
Expand Up @@ -43,9 +43,8 @@
* primitive types.
* @param <T> The type returned by the stream
* @param <S> The stream interface
* @param <T_CONS> The consumer for this stream
*/
public abstract class AbstractCacheStream<T, S extends BaseStream<T, S>, S2 extends S, T_CONS> implements BaseStream<T, S> {
public abstract class AbstractCacheStream<T, S extends BaseStream<T, S>, S2 extends S> implements BaseStream<T, S> {
protected final Log log = LogFactory.getLog(getClass());

protected final Queue<IntermediateOperation> intermediateOperations;
Expand Down Expand Up @@ -100,7 +99,7 @@ protected AbstractCacheStream(Address localAddress, boolean parallel, Distributi
intermediateOperations = new ArrayDeque<>();
}

protected AbstractCacheStream(AbstractCacheStream<T, S, S2, T_CONS> other) {
protected AbstractCacheStream(AbstractCacheStream<T, S, S2> other) {
this.intermediateOperations = other.intermediateOperations;
this.localIntermediateOperations = other.localIntermediateOperations;
this.localAddress = other.localAddress;
Expand Down Expand Up @@ -234,12 +233,12 @@ public void close() {
}
}

<R> R performOperation(Function<S, ? extends R> function, boolean retryOnRehash, BinaryOperator<R> accumulator,
<R> R performOperation(Function<? super S2, ? extends R> function, boolean retryOnRehash, BinaryOperator<R> accumulator,
Predicate<? super R> earlyTerminatePredicate) {
return performOperation(function, retryOnRehash, accumulator, earlyTerminatePredicate, true);
}

<R> R performOperation(Function<S, ? extends R> function, boolean retryOnRehash, BinaryOperator<R> accumulator,
<R> R performOperation(Function<? super S2, ? extends R> function, boolean retryOnRehash, BinaryOperator<R> accumulator,
Predicate<? super R> earlyTerminatePredicate, boolean ignoreSorting) {
// These operations are not affected by sorting, only by distinct
if (intermediateType.shouldUseIntermediate(!ignoreSorting && sorted, distinct)) {
Expand All @@ -254,7 +253,7 @@ <R> R performOperation(Function<S, ? extends R> function, boolean retryOnRehash,
}
}

<R> R performOperation(Function<S, ? extends R> function, ResultsAccumulator<R> remoteResults,
<R> R performOperation(Function<? super S2, ? extends R> function, ResultsAccumulator<R> remoteResults,
Predicate<? super R> earlyTerminatePredicate) {
ConsistentHash ch = dm.getConsistentHash();
TerminalOperation<R> op = new SingleRunOperation<>(intermediateOperations,
Expand Down Expand Up @@ -283,7 +282,7 @@ <R> R performOperation(Function<S, ? extends R> function, ResultsAccumulator<R>
}
}

<R> R performOperationRehashAware(Function<S, ? extends R> function, boolean retryOnRehash,
<R> R performOperationRehashAware(Function<? super S2, ? extends R> function, boolean retryOnRehash,
ResultsAccumulator<R> remoteResults, Predicate<? super R> earlyTerminatePredicate) {
Set<Integer> segmentsToProcess = segmentsToFilter;
TerminalOperation<R> op;
Expand Down Expand Up @@ -353,10 +352,8 @@ <R> R performOperationRehashAware(Function<S, ? extends R> function, boolean ret
return remoteResults.currentValue;
}

abstract KeyTrackingTerminalOperation<Object, T, Object> getForEach(T_CONS consumer,
Supplier<Stream<CacheEntry>> supplier);

void performRehashForEach(T_CONS consumer) {
void performRehashKeyTrackingOperation(
Function<Supplier<Stream<CacheEntry>>, KeyTrackingTerminalOperation<Object, ? extends T, Object>> function) {
final AtomicBoolean complete = new AtomicBoolean();

ConsistentHash segmentInfoCH = dm.getReadConsistentHash();
Expand All @@ -380,7 +377,7 @@ void performRehashForEach(T_CONS consumer) {
segments = null;
excludedKeys = Collections.emptySet();
}
KeyTrackingTerminalOperation<Object, T, Object> op = getForEach(consumer, supplierForSegments(ch,
KeyTrackingTerminalOperation<Object, ? extends T, Object> op = function.apply(supplierForSegments(ch,
segmentsToProcess, excludedKeys));
op.handleInjection(registry);
Object id = csm.remoteStreamOperationRehashAware(getParallelDistribution(), parallel, ch, segmentsToProcess,
Expand Down Expand Up @@ -782,7 +779,7 @@ public boolean shouldUseIntermediate(boolean sorted, boolean distinct) {
}
}

<R> R performIntermediateRemoteOperation(Function<S, ? extends R> function) {
<R> R performIntermediateRemoteOperation(Function<? super S2, ? extends R> function) {
switch (intermediateType) {
case OBJ:
return performObjIntermediateRemoteOperation(function);
Expand All @@ -797,39 +794,39 @@ <R> R performIntermediateRemoteOperation(Function<S, ? extends R> function) {
}
}

<R> R performIntegerIntermediateRemoteOperation(Function<S, ? extends R> function) {
<R> R performIntegerIntermediateRemoteOperation(Function<? super S2, ? extends R> function) {
// TODO: once we don't have to box for primitive iterators we can remove this copy
Queue<IntermediateOperation> copyOperations = new ArrayDeque<>(localIntermediateOperations);
PrimitiveIterator.OfInt iterator = new DistributedIntCacheStream(this).remoteIterator();
SingleRunOperation<R, T, S> op = new SingleRunOperation<>(copyOperations,
SingleRunOperation<R, T, S, S2> op = new SingleRunOperation<>(copyOperations,
() -> StreamSupport.intStream(Spliterators.spliteratorUnknownSize(
iterator, Spliterator.CONCURRENT), parallel), function);
return op.performOperation();
}

<R> R performDoubleIntermediateRemoteOperation(Function<S, ? extends R> function) {
<R> R performDoubleIntermediateRemoteOperation(Function<? super S2, ? extends R> function) {
// TODO: once we don't have to box for primitive iterators we can remove this copy
Queue<IntermediateOperation> copyOperations = new ArrayDeque<>(localIntermediateOperations);
PrimitiveIterator.OfDouble iterator = new DistributedDoubleCacheStream(this).remoteIterator();
SingleRunOperation<R, T, S> op = new SingleRunOperation<>(copyOperations,
SingleRunOperation<R, T, S, S2> op = new SingleRunOperation<>(copyOperations,
() -> StreamSupport.doubleStream(Spliterators.spliteratorUnknownSize(
iterator, Spliterator.CONCURRENT), parallel), function);
return op.performOperation();
}

<R> R performLongIntermediateRemoteOperation(Function<S, ? extends R> function) {
<R> R performLongIntermediateRemoteOperation(Function<? super S2, ? extends R> function) {
// TODO: once we don't have to box for primitive iterators we can remove this copy
Queue<IntermediateOperation> copyOperations = new ArrayDeque<>(localIntermediateOperations);
PrimitiveIterator.OfLong iterator = new DistributedLongCacheStream(this).remoteIterator();
SingleRunOperation<R, T, S> op = new SingleRunOperation<>(copyOperations,
SingleRunOperation<R, T, S, S2> op = new SingleRunOperation<>(copyOperations,
() -> StreamSupport.longStream(Spliterators.spliteratorUnknownSize(
iterator, Spliterator.CONCURRENT), parallel), function);
return op.performOperation();
}

<R> R performObjIntermediateRemoteOperation(Function<S, ? extends R> function) {
<R> R performObjIntermediateRemoteOperation(Function<? super S2, ? extends R> function) {
Iterator<Object> iterator = new DistributedCacheStream<>(this).remoteIterator();
SingleRunOperation<R, T, S> op = new SingleRunOperation<>(localIntermediateOperations,
SingleRunOperation<R, T, S, S2> op = new SingleRunOperation<>(localIntermediateOperations,
() -> StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iterator, Spliterator.CONCURRENT), parallel), function);
return op.performOperation();
Expand Down

0 comments on commit e455a8c

Please sign in to comment.