Skip to content

Commit

Permalink
ISPN-6691 Simplify Distributed Stream local intermediate operations
Browse files Browse the repository at this point in the history
* Added Intermediate*CacheStream
** Holds original DistributedCacheStream and LocalCacheStream
  • Loading branch information
wburns authored and danberindei committed May 25, 2016
1 parent e8a562e commit acdd7f3
Show file tree
Hide file tree
Showing 25 changed files with 2,617 additions and 535 deletions.
123 changes: 123 additions & 0 deletions core/src/main/java/org/infinispan/BaseCacheStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.infinispan;

import java.util.Comparator;
import java.util.Iterator;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.BaseStream;

/**
* Interface that defines the base methods of all streams returned from a {@link Cache}. This interface
* is useful to hold a reference to any of the types while still being able to invoke some methods.
* @author wburns
* @since 9.0
*/
public interface BaseCacheStream<T, S extends BaseStream<T, S>> extends BaseStream<T, S> {
/**
* This would disable sending requests to all other remote nodes compared to one at a time. This can reduce memory
* pressure on the originator node at the cost of performance.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @return a stream with parallel distribution disabled
*/
BaseCacheStream sequentialDistribution();

/**
* This would enable sending requests to all other remote nodes when a terminal operator is performed. This
* requires additional overhead as it must process results concurrently from various nodes, but should perform
* faster in the majority of cases.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @return a stream with parallel distribution enabled.
*/
BaseCacheStream parallelDistribution();

/**
* Filters which entries are returned by what segment they are present in. This method can be substantially more
* efficient than using a regular {@link CacheStream#filter(Predicate)} method as this can control what nodes are
* asked for data and what entries are read from the underlying CacheStore if present.
* @param segments The segments to use for this stream operation. Any segments not in this set will be ignored.
* @return a stream with the segments filtered.
*/
BaseCacheStream filterKeySegments(Set<Integer> segments);

/**
* Filters which entries are returned by only returning ones that map to the given key. This method will
* be faster than a regular {@link CacheStream#filter(Predicate)} if the filter is holding references to the same
* keys.
* @param keys The keys that this stream will only operate on.
* @return a stream with the keys filtered.
*/
BaseCacheStream filterKeys(Set<?> keys);

/**
* Controls how many keys are returned from a remote node when using a stream terminal operation with a distributed
* cache to back this stream. This value is ignored when terminal operators that don't track keys are used. Key
* tracking terminal operators are {@link CacheStream#iterator()}, {@link CacheStream#spliterator()},
* {@link CacheStream#forEach(Consumer)}. Please see those methods for additional information on how this value
* may affect them.
* <p>This value may be used in the case of a a terminal operator that doesn't track keys if an intermediate
* operation is performed that requires bringing keys locally to do computations. Examples of such intermediate
* operations are {@link CacheStream#sorted()}, {@link CacheStream#sorted(Comparator)},
* {@link CacheStream#distinct()}, {@link CacheStream#limit(long)}, {@link CacheStream#skip(long)}</p>
* <p>This value is <b>always</b> ignored when this stream is backed by a cache that is not distributed as all
* values are already local.</p>
* @param batchSize The size of each batch. This defaults to the state transfer chunk size.
* @return a stream with the batch size updated
*/
BaseCacheStream distributedBatchSize(int batchSize);

/**
* Allows registration of a segment completion listener that is notified when a segment has completed
* processing. If the terminal operator has a short circuit this listener may never be called.
* <p>This method is designed for the sole purpose of use with the {@link CacheStream#iterator()} to allow for
* a user to track completion of segments as they are returned from the iterator. Behavior of other methods
* is not specified. Please see {@link CacheStream#iterator()} for more information.</p>
* <p>Multiple listeners may be registered upon multiple invocations of this method. The ordering of notified
* listeners is not specified.</p>
* @param listener The listener that will be called back as segments are completed.
* @return a stream with the listener registered.
*/
BaseCacheStream segmentCompletionListener(SegmentCompletionListener listener);

/**
* Disables tracking of rehash events that could occur to the underlying cache. If a rehash event occurs while
* a terminal operation is being performed it is possible for some values that are in the cache to not be found.
* Note that you will never have an entry duplicated when rehash awareness is disabled, only lost values.
* <p>Most terminal operations will run faster with rehash awareness disabled even without a rehash occuring.
* However if a rehash occurs with this disabled be prepared to possibly receive only a subset of values.</p>
* @return a stream with rehash awareness disabled.
*/
BaseCacheStream disableRehashAware();

/**
* Sets a given time to wait for a remote operation to respond by. This timeout does nothing if the terminal
* operation does not go remote.
* <p>If a timeout does occur then a {@link java.util.concurrent.TimeoutException} is thrown from the terminal
* operation invoking thread or on the next call to the {@link Iterator} or {@link Spliterator}.</p>
* <p>Note that if a rehash occurs this timeout value is reset for the subsequent retry if rehash aware is
* enabled.</p>
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return a stream with the timeout set
*/
BaseCacheStream timeout(long timeout, TimeUnit unit);

/**
* Functional interface that is used as a callback when segments are completed. Please see
* {@link BaseCacheStream#segmentCompletionListener(SegmentCompletionListener)} for more details.
* @author wburns
* @since 9.0
*/
@FunctionalInterface
interface SegmentCompletionListener {
/**
* Method invoked when the segment has been found to be consumed properly by the terminal operation.
* @param segments The segments that were completed
*/
void segmentCompleted(Set<Integer> segments);
}
}
76 changes: 10 additions & 66 deletions core/src/main/java/org/infinispan/CacheStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,111 +79,55 @@
* @param <R> The type of the stream
* @since 8.0
*/
public interface CacheStream<R> extends Stream<R> {
public interface CacheStream<R> extends Stream<R>, BaseCacheStream<R, Stream<R>> {
/**
* This would disable sending requests to all other remote nodes compared to one at a time. This can reduce memory
* pressure on the originator node at the cost of performance.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @return a stream with parallel distribution disabled
* {@inheritDoc}
* @return a stream with parallel distribution disabled.
*/
CacheStream<R> sequentialDistribution();

/**
* This would enable sending requests to all other remote nodes when a terminal operator is performed. This
* requires additional overhead as it must process results concurrently from various nodes, but should perform
* faster in the majority of cases.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @inheritDoc
* @return a stream with parallel distribution enabled.
*/
CacheStream<R> parallelDistribution();

/**
* Filters which entries are returned by what segment they are present in. This method can be substantially more
* efficient than using a regular {@link CacheStream#filter(Predicate)} method as this can control what nodes are
* asked for data and what entries are read from the underlying CacheStore if present.
* @param segments The segments to use for this stream operation. Any segments not in this set will be ignored.
* {@inheritDoc}
* @return a stream with the segments filtered.
*/
CacheStream<R> filterKeySegments(Set<Integer> segments);

/**
* Filters which entries are returned by only returning ones that map to the given key. This method will <b>always</b>
* be faster than a regular {@link CacheStream#filter(Predicate)} if any keys must be retrieved remotely or if a
* cache store is in use.
* @param keys The keys that this stream will only operate on.
* {@inheritDoc}
* @return a stream with the keys filtered.
*/
CacheStream<R> filterKeys(Set<?> keys);

/**
* Controls how many keys are returned from a remote node when using a stream terminal operation with a distributed
* cache to back this stream. This value is ignored when terminal operators that don't track keys are used. Key
* tracking terminal operators are {@link CacheStream#iterator()}, {@link CacheStream#spliterator()},
* {@link CacheStream#forEach(Consumer)}. Please see those methods for additional information on how this value
* may affect them.
* <p>This value may be used in the case of a a terminal operator that doesn't track keys if an intermediate
* operation is performed that requires bringing keys locally to do computations. Examples of such intermediate
* operations are {@link CacheStream#sorted()}, {@link CacheStream#sorted(Comparator)},
* {@link CacheStream#distinct()}, {@link CacheStream#limit(long)}, {@link CacheStream#skip(long)}</p>
* <p>This value is <b>always</b> ignored when this stream is backed by a cache that is not distributed as all
* values are already local.</p>
* @param batchSize The size of each batch. This defaults to the state transfer chunk size.
* {@inheritDoc}
* @return a stream with the batch size updated
*/
CacheStream<R> distributedBatchSize(int batchSize);

/**
* Allows registration of a segment completion listener that is notified when a segment has completed
* processing. If the terminal operator has a short circuit this listener may never be called.
* <p>This method is designed for the sole purpose of use with the {@link CacheStream#iterator()} to allow for
* a user to track completion of segments as they are returned from the iterator. Behavior of other methods
* is not specified. Please see {@link CacheStream#iterator()} for more information.</p>
* <p>Multiple listeners may be registered upon multiple invocations of this method. The ordering of notified
* listeners is not specified.</p>
* @param listener The listener that will be called back as segments are completed.
* {@inheritDoc}
* @return a stream with the listener registered.
*/
CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);

/**
* Disables tracking of rehash events that could occur to the underlying cache. If a rehash event occurs while
* a terminal operation is being performed it is possible for some values that are in the cache to not be found.
* Note that you will never have an entry duplicated when rehash awareness is disabled, only lost values.
* <p>Most terminal operations will run faster with rehash awareness disabled even without a rehash occuring.
* However if a rehash occurs with this disabled be prepared to possibly receive only a subset of values.</p>
* {@inheritDoc}
* @return a stream with rehash awareness disabled.
*/
CacheStream<R> disableRehashAware();

/**
* Sets a given time to wait for a remote operation to respond by. This timeout does nothing if the terminal
* operation does not go remote.
* <p>If a timeout does occur then a {@link java.util.concurrent.TimeoutException} is thrown from the terminal
* operation invoking thread or on the next call to the {@link Iterator} or {@link Spliterator}.</p>
* <p>Note that if a rehash occurs this timeout value is reset for the subsequent retry if rehash aware is
* enabled.</p>
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* {@inheritDoc}
* @return a stream with the timeout set
*/
CacheStream<R> timeout(long timeout, TimeUnit unit);

/**
* Functional interface that is used as a callback when segments are completed. Please see
* {@link CacheStream#segmentCompletionListener(SegmentCompletionListener)} for more details.
* @since 8.0
*/
@FunctionalInterface
interface SegmentCompletionListener {
/**
* Method invoked when the segment has been found to be consumed properly by the terminal operation.
* @param segments The segments that were completed
*/
void segmentCompleted(Set<Integer> segments);
}

/**
* {@inheritDoc}
* <p>This operation is performed remotely on the node that is the primary owner for the key tied to the entry(s)
Expand Down
53 changes: 52 additions & 1 deletion core/src/main/java/org/infinispan/DoubleCacheStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.infinispan.util.function.SerializableSupplier;

import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.DoubleBinaryOperator;
import java.util.function.DoubleConsumer;
Expand All @@ -31,7 +33,56 @@
* @author wburns
* @since 9.0
*/
public interface DoubleCacheStream extends DoubleStream {
public interface DoubleCacheStream extends DoubleStream, BaseCacheStream<Double, DoubleStream> {

/**
* {@inheritDoc}
* @return a stream with parallel distribution disabled.
*/
DoubleCacheStream sequentialDistribution();

/**
* @inheritDoc
* @return a stream with parallel distribution enabled.
*/
DoubleCacheStream parallelDistribution();

/**
* {@inheritDoc}
* @return a stream with the segments filtered.
*/
DoubleCacheStream filterKeySegments(Set<Integer> segments);

/**
* {@inheritDoc}
* @return a stream with the keys filtered.
*/
DoubleCacheStream filterKeys(Set<?> keys);

/**
* {@inheritDoc}
* @return a stream with the batch size updated
*/
DoubleCacheStream distributedBatchSize(int batchSize);

/**
* {@inheritDoc}
* @return a stream with the listener registered.
*/
DoubleCacheStream segmentCompletionListener(SegmentCompletionListener listener);

/**
* {@inheritDoc}
* @return a stream with rehash awareness disabled.
*/
DoubleCacheStream disableRehashAware();

/**
* {@inheritDoc}
* @return a stream with the timeout set
*/
DoubleCacheStream timeout(long timeout, TimeUnit unit);

/**
* {@inheritDoc}
* @return the new cache double stream
Expand Down
53 changes: 52 additions & 1 deletion core/src/main/java/org/infinispan/IntCacheStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.infinispan.util.function.SerializableSupplier;

import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.IntBinaryOperator;
import java.util.function.IntConsumer;
Expand All @@ -31,7 +33,56 @@
* @author wburns
* @since 9.0
*/
public interface IntCacheStream extends IntStream {
public interface IntCacheStream extends IntStream, BaseCacheStream<Integer, IntStream> {

/**
* {@inheritDoc}
* @return a stream with parallel distribution disabled.
*/
IntCacheStream sequentialDistribution();

/**
* @inheritDoc
* @return a stream with parallel distribution enabled.
*/
IntCacheStream parallelDistribution();

/**
* {@inheritDoc}
* @return a stream with the segments filtered.
*/
IntCacheStream filterKeySegments(Set<Integer> segments);

/**
* {@inheritDoc}
* @return a stream with the keys filtered.
*/
IntCacheStream filterKeys(Set<?> keys);

/**
* {@inheritDoc}
* @return a stream with the batch size updated
*/
IntCacheStream distributedBatchSize(int batchSize);

/**
* {@inheritDoc}
* @return a stream with the listener registered.
*/
IntCacheStream segmentCompletionListener(SegmentCompletionListener listener);

/**
* {@inheritDoc}
* @return a stream with rehash awareness disabled.
*/
IntCacheStream disableRehashAware();

/**
* {@inheritDoc}
* @return a stream with the timeout set
*/
IntCacheStream timeout(long timeout, TimeUnit unit);

/**
* {@inheritDoc}
* @return the new cache int stream
Expand Down
Loading

0 comments on commit acdd7f3

Please sign in to comment.