Skip to content

Commit

Permalink
ISPN-5632 Address rework comments for ISPN-5293
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns authored and galderz committed Aug 20, 2015
1 parent 2b1a260 commit fe38df2
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 134 deletions.
110 changes: 67 additions & 43 deletions core/src/main/java/org/infinispan/CacheStream.java
Expand Up @@ -8,6 +8,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Stream;

/**
Expand All @@ -31,20 +32,23 @@
* {@link CacheStream#parallelDistribution()}. With this disabled only a single node will process the operation
* at a time (includes locally).</p>
*
* <p>Rehash aware is enabled by default for all operations which will provide guaranteed consistency for all operations
* except for {@link CacheStream#forEach(Consumer)}. Please the method above for details about its consistency
* guarantees. If you wish to disable rehash aware operations you can disable them by calling
* <p>Rehash aware is enabled by default for all operations. Any intermediate or terminal operation may be invoked
* multiple times during a rehash and thus you should ensure the are idempotent. This can be problematic for
* {@link CacheStream#forEach(Consumer)} as it may be difficult to implement with such requirements, please see it for
* more information. If you wish to disable rehash aware operations you can disable them by calling
* {@link CacheStream#disableRehashAware()} which should provide better performance for some operations. The
* performance is most affected for the key aware operations {@link CacheStream#iterator()},
* {@link CacheStream#spliterator()}, {@link CacheStream#forEach(Consumer)}</p>
* {@link CacheStream#spliterator()}, {@link CacheStream#forEach(Consumer)}. Disabling rehash can cause
* incorrect results if the terminal operation is invoked and a rehash occurs before the operation completes. If
* incorrect results do occur it is guaranteed that it will only be that entries were missed and no entries are
* duplicated.</p>
*
* <p>Some terminal operators are special in that they act like an intermediate iterator operation. That is that
* it is an intermediate operation, but it requires processing the results using an interator intermediately before
* the stream can complete.</p>
* <p>Any stateful intermediate operation requires pulling all information up to that point local to operate properly.
* Each of these methods may have slightly different behavior, so make sure you check the method you are utilizing.</p>
*
* <p>A good example of an intermediate iterator operation is using distinct intermediate operation. What will happen
* is upon calling the terminal operation an iterator operation will be ran using all of
* the intermediate operations up to the distinct operation remotely. This iterator is then used to fuel a local
* <p>An example of such an operation is using distinct intermediate operation. What will happen
* is upon calling the terminal operation a remote retrieval operation will be ran using all of
* the intermediate operations up to the distinct operation remotely. This retrieval is then used to fuel a local
* stream where all of the remaining intermediate operations are performed and then finally the terminal operation is
* applied as normal. Note in this case the intermediate iterator still obeys the
* {@link CacheStream#distributedBatchSize(int)} setting irrespective of the terminal operator.</p>
Expand All @@ -58,7 +62,7 @@ public interface CacheStream<R> extends Stream<R> {
* pressure on the originator node at the cost of performance.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @return This stream again with parallel distribution disabled
* @return a stream with parallel distribution disabled
*/
CacheStream<R> sequentialDistribution();

Expand All @@ -68,16 +72,16 @@ public interface CacheStream<R> extends Stream<R> {
* faster in the majority of cases.
* <p>Parallel distribution is enabled by default except for {@link CacheStream#iterator()} &
* {@link CacheStream#spliterator()}</p>
* @return This stream again with parallel distribution enabled.
* @return a stream with parallel distribution enabled.
*/
CacheStream<R> parallelDistribution();

/**
* Filters which entries are returned by what segment they are present in. This method can be substantially more
* efficient then using a regular {@link CacheStream#filter(Predicate)} method as this can control what nodes are
* efficient than using a regular {@link CacheStream#filter(Predicate)} method as this can control what nodes are
* asked for data and what entries are read from the underlying CacheStore if present.
* @param segments The segments to use for this stream operation. Any segments not in this set will be ignored.
* @return This stream again with the segments filtered.
* @return a stream with the segments filtered.
*/
CacheStream<R> filterKeySegments(Set<Integer> segments);

Expand All @@ -86,7 +90,7 @@ public interface CacheStream<R> extends Stream<R> {
* be faster than a regular {@link CacheStream#filter(Predicate)} if any keys must be retrieved remotely or if a
* cache store is in use.
* @param keys The keys that this stream will only operate on.
* @return This stream again with the keys filtered.
* @return a stream with the keys filtered.
*/
CacheStream<R> filterKeys(Set<?> keys);

Expand All @@ -103,7 +107,7 @@ public interface CacheStream<R> extends Stream<R> {
* <p>This value is <b>always</b> ignored when this stream is backed by a cache that is not distributed as all
* values are already local.</p>
* @param batchSize The size of each batch. This defaults to the state transfer chunk size.
* @return This stream again with the batch size updated
* @return a stream with the batch size updated
*/
CacheStream<R> distributedBatchSize(int batchSize);

Expand All @@ -116,7 +120,7 @@ public interface CacheStream<R> extends Stream<R> {
* <p>Multiple listeners may be registered upon multiple invocations of this method. The ordering of notified
* listeners is not specified.</p>
* @param listener The listener that will be called back as segments are completed.
* @return This stream again with the listener registered.
* @return a stream with the listener registered.
*/
CacheStream<R> segmentCompletionListener(SegmentCompletionListener listener);

Expand All @@ -126,7 +130,7 @@ public interface CacheStream<R> extends Stream<R> {
* Note that you will never have an entry duplicated when rehash awareness is disabled, only lost values.
* <p>Most terminal operations will run faster with rehash awareness disabled even without a rehash occuring.
* However if a rehash occurs with this disabled be prepared to possibly receive only a subset of values.</p>
* @return This stream again with rehash awareness disabled.
* @return a stream with rehash awareness disabled.
*/
CacheStream<R> disableRehashAware();

Expand Down Expand Up @@ -168,8 +172,8 @@ interface SegmentCompletionListener {
* when rehash is enabled. After those are complete the keys are sent to the originator to confirm that those were
* processed. If that node goes down during/before the response those keys will be processed a second time.</p>
* <p>This method is ran distributed by default with a distributed backing cache. However if you wish for this
* operation to run locally you can use the {@link CacheStream#iterator()} method to return all of the results
* locally and then use {@link Iterator#forEachRemaining(Consumer)} method for a single threaded variant. If you
* operation to run locally you can use the {@code stream().iterator().forEachRemaining(action)} for a single
* threaded variant. If you
* wish to have a parallel variant you can use {@link java.util.stream.StreamSupport#stream(Spliterator, boolean)}
* passing in the spliterator from the stream. In either case remember you <b>must</b> close the stream after
* you are done processing the iterator or spliterator..</p>
Expand All @@ -184,8 +188,7 @@ interface SegmentCompletionListener {
* usage is to use a try with resource block on the stream.</p>
* <p>This method has special usage with the {@link org.infinispan.CacheStream.SegmentCompletionListener} in
* that as entries are retrieved from the next method it will complete segments.</p>
* <p>This method obeys the {@link CacheStream#distributedBatchSize(int)} setting by only ever returning the
* elements that mapped to that many keys. Note that when using methods such as
* <p>This method obeys the {@link CacheStream#distributedBatchSize(int)}. Note that when using methods such as
* {@link CacheStream#flatMap(Function)} that you will have possibly more than 1 element mapped to a given key
* so this doesn't guarantee that many number of entries are returned per batch.</p>
* <p>Note that the {@link Iterator#remove()} method is only supported if no intermediate operations have been
Expand All @@ -206,21 +209,23 @@ interface SegmentCompletionListener {

/**
* {@inheritDoc}
* <p>This method has special usage when used with a distributed cache backing this set. This operation will act
* as an intermediate iterator operation requiring data be brought locally for proper behavior. This is
* described in more detail in the {@link CacheStream} documentation</p>
* <p>This intermediate iterator operation will be performed locally only requiring all elements to be in memory</p>
* <p>This operation is performed entirely on the local node irrespective of the backing cache. This
* operation will act as an intermediate iterator operation requiring data be brought locally for proper behavior.
* Beware this means it will require having all entries of this cache into memory at one time. This is described in
* more detail at {@link CacheStream}</p>
* <p>Any subsequent intermediate operations and the terminal operation are also performed locally.</p>
* @return the new stream
*/
@Override
Stream<R> sorted();

/**
* {@inheritDoc}
* <p>This method has special usage when used with a distributed cache backing this set. This operation will act
* as an intermediate iterator operation requiring data be brought locally for proper behavior. This is
* described in more detail in the {@link CacheStream} documentation</p>
* <p>This intermediate iterator operation will be performed locally only requiring all elements to be in memory</p>
* <p>This operation is performed entirely on the local node irrespective of the backing cache. This
* operation will act as an intermediate iterator operation requiring data be brought locally for proper behavior.
* Beware this means it will require having all entries of this cache into memory at one time. This is described in
* more detail at {@link CacheStream}</p>
* <p>Any subsequent intermediate operations and the terminal operation are then performed locally.</p>
* @param comparator the comparator to be used for sorting the elements
* @return the new stream
*/
Expand All @@ -229,11 +234,12 @@ interface SegmentCompletionListener {

/**
* {@inheritDoc}
* <p>This method has special usage when used with a distributed cache backing this set. This operation will act
* as an intermediate iterator operation requiring data be brought locally for proper behavior. This is
* described in more detail in the {@link CacheStream} documentation</p>
* <p>This intermediate iterator operation will be performed both remotely and locally to reduce how many elements
* are sent back from each node.</p>
* <p>This intermediate operation will be performed both remotely and locally to reduce how many elements
* are sent back from each node. More specifically this operation is applied remotely on each node to only return
* up to the <b>maxSize</b> value and then the aggregated results are limited once again on the local node.</p>
* <p>This operation will act as an intermediate iterator operation requiring data be brought locally for proper
* behavior. This is described in more detail in the {@link CacheStream} documentation</p>
* <p>Any subsequent intermediate operations and the terminal operation are then performed locally.</p>
* @param maxSize how many elements to limit this stream to.
* @return the new stream
*/
Expand All @@ -242,11 +248,12 @@ interface SegmentCompletionListener {

/**
* {@inheritDoc}
* <p>This method has special usage when used with a distributed cache backing this set. This operation will act
* as an intermediate iterator operation requiring data be brought locally for proper behavior. This is
* described in more detail in the {@link CacheStream} documentation</p>
* <p>This intermediate iterator operation will only be performed locally, however it will only have elements in
* memory controlled by the {@link CacheStream#distributedBatchSize(int)} unless the terminal operator holds them.</p>
* <p>This operation is performed entirely on the local node irrespective of the backing cache. This
* operation will act as an intermediate iterator operation requiring data be brought locally for proper behavior.
* This is described in more detail in the {@link CacheStream} documentation</p>
* <p>Depending on the terminal operator this may or may not require all entries or a subset after skip is applied
* to be in memory all at once.</p>
* <p>Any subsequent intermediate operations and the terminal operation are then performed locally.</p>
* @param n how many elements to skip from this stream
* @return the new stream
*/
Expand All @@ -255,13 +262,30 @@ interface SegmentCompletionListener {

/**
* {@inheritDoc}
* <p>This method has special usage when used with a distributed cache backing this set. This operation will act
* as an intermediate iterator operation requiring data be brought locally for proper behavior. This is
* described in more detail in the {@link CacheStream} documentation</p>
* <p>This operation will be invoked both remotely and locally when used with a distributed cache backing this stream.
* This operation will act as an intermediate iterator operation requiring data be brought locally for proper
* behavior. This is described in more detail in the {@link CacheStream} documentation</p>
* <p>This intermediate iterator operation will be performed locally and remotely requiring possibly a subset of
* all elements to be in memory</p>
* <p>Any subsequent intermediate operations and the terminal operation are then performed locally.</p>
* @return the new stream
*/
@Override
Stream<R> distinct();

/**
* {@inheritDoc}
* <p>Note when using a distributed backing cache for this stream the collector must be marshallable. This
* prevents the usage of {@link java.util.stream.Collectors} class. However you can use the
* {@link org.infinispan.stream.CacheCollectors} static factory methods to create a serializable wrapper, which then
* creates the actual collector lazily after being deserialized. This is useful to use any method from the
* {@link java.util.stream.Collectors} class as you would normally.</p>
* @param collector
* @param <R1> collected type
* @param <A> intermediate collected type if applicable
* @return the collected value
* @see org.infinispan.stream.CacheCollectors
*/
@Override
<R1, A> R1 collect(Collector<? super R, A, R1> collector);
}
Expand Up @@ -23,7 +23,7 @@
import java.util.concurrent.locks.ReentrantLock;

/**
* A closeable supplier that provides a wait to supply cache entries from a given persistence manager. On the first
* A closeable supplier that provides a way to supply cache entries from a given persistence manager. On the first
* call to get this class will submit a task to collect all of the entries from the loader (or optionally a subset
* provided a given {@link org.infinispan.filter.KeyFilter}). A timeout value is required so that if a get blocks
* for the given timeout it will throw a {@link TimeoutException}.
Expand Down
@@ -1,5 +1,6 @@
package org.infinispan.stream.impl;

import org.infinispan.CacheStream;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;

Expand All @@ -8,6 +9,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
Expand All @@ -23,7 +25,9 @@ public interface ClusterStreamManager<K> {
*/
interface ResultsCallback<R> {
/**
* Called back for intermediate data returned from an operation. This is useful
* Called back for intermediate data returned from an operation. This is useful for operations that utilized
* batch fetching such as {@link CacheStream#iterator()}, {@link CacheStream#spliterator()},
* {@link CacheStream#forEach(Consumer)} and {@link CacheStream#toArray()}.
* @param address Which node this data came from
* @param results The results obtained so far.
* @return the segments that completed with some value
Expand Down

0 comments on commit fe38df2

Please sign in to comment.