Skip to content

Commit

Permalink
8196106: Support nested infinite or recursive flat mapped streams
Browse files Browse the repository at this point in the history
Reviewed-by: psandoz
  • Loading branch information
Viktor Klang committed Apr 16, 2024
1 parent 58911cc commit 8a5b86c
Show file tree
Hide file tree
Showing 9 changed files with 541 additions and 188 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -433,6 +433,20 @@ protected final boolean hasAnyStateful() {
return result;
}

/**
* Returns whether any of the stages in the (entire) pipeline is short-circuiting
* or not.
* @return {@code true} if any stage in this pipeline is short-circuiting,
* {@code false} if not.
*/
protected final boolean isShortCircuitingPipeline() {
for (var u = sourceStage.nextStage; u != null; u = u.nextStage) {
if (StreamOpFlag.SHORT_CIRCUIT.isKnown(u.combinedFlags))
return true;
}
return false;
}

/**
* Get the source spliterator for this pipeline stage. For a sequential or
* stateless parallel pipeline, this is the source spliterator. For a
Expand Down
60 changes: 32 additions & 28 deletions src/java.base/share/classes/java/util/stream/DoublePipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -39,6 +39,7 @@
import java.util.function.DoubleToIntFunction;
import java.util.function.DoubleToLongFunction;
import java.util.function.DoubleUnaryOperator;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.ObjDoubleConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -263,43 +264,46 @@ public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper)
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;

// cache the consumer to avoid creation on every accepted element
DoubleConsumer downstreamAsDouble = downstream::accept;

@Override
public void begin(long size) {
downstream.begin(-1);
}
final DoubleConsumer fastPath =
isShortCircuitingPipeline()
? null
: (sink instanceof DoubleConsumer dc)
? dc
: sink::accept;
final class FlatMap implements Sink.OfDouble, DoublePredicate {
boolean cancel;

@Override public void begin(long size) { sink.begin(-1); }
@Override public void end() { sink.end(); }

@Override
public void accept(double t) {
try (DoubleStream result = mapper.apply(t)) {
public void accept(double e) {
try (DoubleStream result = mapper.apply(e)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsDouble);
} else {
var s = result.sequential().spliterator();
do {
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
}
if (fastPath == null)
result.sequential().allMatch(this);
else
result.sequential().forEach(fastPath);
}
}
}

@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
return cancel || (cancel |= sink.cancellationRequested());
}
};

@Override
public boolean test(double output) {
if (!cancel) {
sink.accept(output);
return !(cancel |= sink.cancellationRequested());
} else {
return false;
}
}
}
return new FlatMap();
}
};
}
Expand Down
8 changes: 0 additions & 8 deletions src/java.base/share/classes/java/util/stream/GathererOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,15 @@

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Gatherer.Integrator;

/**
Expand Down
59 changes: 31 additions & 28 deletions src/java.base/share/classes/java/util/stream/IntPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -297,43 +297,46 @@ public final IntStream flatMap(IntFunction<? extends IntStream> mapper) {
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;

// cache the consumer to avoid creation on every accepted element
IntConsumer downstreamAsInt = downstream::accept;

@Override
public void begin(long size) {
downstream.begin(-1);
}
final IntConsumer fastPath =
isShortCircuitingPipeline()
? null
: (sink instanceof IntConsumer ic)
? ic
: sink::accept;
final class FlatMap implements Sink.OfInt, IntPredicate {
boolean cancel;

@Override public void begin(long size) { sink.begin(-1); }
@Override public void end() { sink.end(); }

@Override
public void accept(int t) {
try (IntStream result = mapper.apply(t)) {
public void accept(int e) {
try (IntStream result = mapper.apply(e)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsInt);
} else {
var s = result.sequential().spliterator();
do {
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
}
if (fastPath == null)
result.sequential().allMatch(this);
else
result.sequential().forEach(fastPath);
}
}
}

@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
return cancel || (cancel |= sink.cancellationRequested());
}
};

@Override
public boolean test(int output) {
if (!cancel) {
sink.accept(output);
return !(cancel |= sink.cancellationRequested());
} else {
return false;
}
}
}
return new FlatMap();
}
};
}
Expand Down
60 changes: 32 additions & 28 deletions src/java.base/share/classes/java/util/stream/LongPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -33,6 +33,7 @@
import java.util.Spliterators;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.LongBinaryOperator;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -279,43 +280,46 @@ public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;

// cache the consumer to avoid creation on every accepted element
LongConsumer downstreamAsLong = downstream::accept;

@Override
public void begin(long size) {
downstream.begin(-1);
}
final LongConsumer fastPath =
isShortCircuitingPipeline()
? null
: (sink instanceof LongConsumer lc)
? lc
: sink::accept;
final class FlatMap implements Sink.OfLong, LongPredicate {
boolean cancel;

@Override public void begin(long size) { sink.begin(-1); }
@Override public void end() { sink.end(); }

@Override
public void accept(long t) {
try (LongStream result = mapper.apply(t)) {
public void accept(long e) {
try (LongStream result = mapper.apply(e)) {
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsLong);
} else {
var s = result.sequential().spliterator();
do {
} while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
}
if (fastPath == null)
result.sequential().allMatch(this);
else
result.sequential().forEach(fastPath);
}
}
}

@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
return cancel || (cancel |= sink.cancellationRequested());
}
};

@Override
public boolean test(long output) {
if (!cancel) {
sink.accept(output);
return !(cancel |= sink.cancellationRequested());
} else {
return false;
}
}
}
return new FlatMap();
}
};
}
Expand Down
Loading

1 comment on commit 8a5b86c

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.