Skip to content

Commit

Permalink
8319123: Implement JEP 461: Stream Gatherers (Preview)
Browse files Browse the repository at this point in the history
Reviewed-by: tvaleev, alanb, psandoz
  • Loading branch information
Viktor Klang authored and Alan Bateman committed Nov 30, 2023
1 parent 04ad98e commit 33b26f7
Show file tree
Hide file tree
Showing 24 changed files with 4,988 additions and 7 deletions.
45 changes: 42 additions & 3 deletions src/java.base/share/classes/java/util/stream/AbstractPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* The "upstream" pipeline, or null if this is the source stage.
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
protected final AbstractPipeline previousStage;

/**
* The operation flags for the intermediate operation represented by this
Expand Down Expand Up @@ -188,9 +188,13 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* The previous stage must be unlinked and unconsumed.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
Expand All @@ -205,6 +209,41 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
this.depth = previousStage.depth + 1;
}

/**
* Constructor for replacing an intermediate operation stage onto an
* existing pipeline.
*
* @param previousPreviousStage the upstream pipeline stage of the upstream pipeline stage
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousPreviousStage, AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed || !previousPreviousStage.linkedOrConsumed || previousPreviousStage.nextStage != previousStage || previousStage.previousStage != previousPreviousStage)
throw new IllegalStateException(MSG_STREAM_LINKED);

previousStage.linkedOrConsumed = true;

previousPreviousStage.nextStage = this;

this.previousStage = previousPreviousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousPreviousStage.combinedFlags);
this.sourceStage = previousPreviousStage.sourceStage;
this.depth = previousPreviousStage.depth + 1;
}

/**
* Checks that the current stage has not been already linked or consumed,
* and then sets this stage as being linked or consumed.
*/
protected void linkOrConsume() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
}

// Terminal evaluation methods

Expand Down Expand Up @@ -402,7 +441,7 @@ protected final boolean hasAnyStateful() {
* operation.
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
protected Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
Expand Down Expand Up @@ -740,6 +779,6 @@ <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
@SuppressWarnings("unchecked")
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator) {
return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
return opEvaluateParallel(helper, spliterator, Nodes.castingArray()).spliterator();
}
}
Loading

1 comment on commit 33b26f7

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.