Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
8265029: Preserve SIZED characteristics on slice operations (skip, li…
…mit)

Reviewed-by: psandoz
  • Loading branch information
amaembo committed May 28, 2021
1 parent 95b1fa7 commit 0c9daa7
Show file tree
Hide file tree
Showing 12 changed files with 711 additions and 96 deletions.
29 changes: 27 additions & 2 deletions src/java.base/share/classes/java/util/stream/AbstractPipeline.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -466,7 +466,32 @@ final StreamShape getSourceShape() {

@Override
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) {
return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1;
int flags = getStreamAndOpFlags();
long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
// Currently, we have no stateless SIZE_ADJUSTING intermediate operations,
// so we can simply ignore SIZE_ADJUSTING in parallel streams, since adjustments
// are already accounted in the input spliterator.
//
// If we ever have a stateless SIZE_ADJUSTING intermediate operation,
// we would need step back until depth == 0, then call exactOutputSize() for
// the subsequent stages.
if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel()) {
// Skip the source stage as it's never SIZE_ADJUSTING
for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; stage != null; stage = stage.nextStage) {
size = stage.exactOutputSize(size);
}
}
return size;
}

/**
* Returns the exact output size of the pipeline given the exact size reported by the previous stage.
*
* @param previousSize the exact size reported by the previous stage
* @return the output size of this stage
*/
long exactOutputSize(long previousSize) {
return previousSize;
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -82,7 +82,8 @@ abstract class PipelineHelper<P_OUT> {
* The exact output size is known if the {@code Spliterator} has the
* {@code SIZED} characteristic, and the operation flags
* {@link StreamOpFlag#SIZED} is known on the combined stream and operation
* flags.
* flags. The exact output size may differ from spliterator size,
* if pipeline contains a slice operation.
*
* @param spliterator the spliterator describing the relevant portion of the
* source data
Expand Down
42 changes: 25 additions & 17 deletions src/java.base/share/classes/java/util/stream/ReduceOps.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -252,16 +252,18 @@ public ReducingSink makeSink() {
@Override
public <P_IN> Long evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateSequential(helper, spliterator);
}

@Override
public <P_IN> Long evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateParallel(helper, spliterator);
}

Expand Down Expand Up @@ -426,16 +428,18 @@ public ReducingSink makeSink() {
@Override
public <P_IN> Long evaluateSequential(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateSequential(helper, spliterator);
}

@Override
public <P_IN> Long evaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateParallel(helper, spliterator);
}

Expand Down Expand Up @@ -600,16 +604,18 @@ public ReducingSink makeSink() {
@Override
public <P_IN> Long evaluateSequential(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateSequential(helper, spliterator);
}

@Override
public <P_IN> Long evaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateParallel(helper, spliterator);
}

Expand Down Expand Up @@ -774,16 +780,18 @@ public ReducingSink makeSink() {
@Override
public <P_IN> Long evaluateSequential(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateSequential(helper, spliterator);
}

@Override
public <P_IN> Long evaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.SIZED.isKnown(helper.getStreamAndOpFlags()))
return spliterator.getExactSizeIfKnown();
long size = helper.exactOutputSizeIfKnown(spliterator);
if (size != -1)
return size;
return super.evaluateParallel(helper, spliterator);
}

Expand Down
75 changes: 47 additions & 28 deletions src/java.base/share/classes/java/util/stream/SliceOps.java
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -50,7 +50,7 @@ private SliceOps() { }
* @return the sliced size
*/
private static long calcSize(long size, long skip, long limit) {
return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
return size >= 0 ? Math.max(0, Math.min(size - skip, limit)) : -1;
}

/**
Expand All @@ -72,28 +72,23 @@ private static long calcSliceFence(long skip, long limit) {
* spliterator type. Requires that the underlying Spliterator
* be SUBSIZED.
*/
@SuppressWarnings("unchecked")
private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
Spliterator<P_IN> s,
long skip, long limit) {
assert s.hasCharacteristics(Spliterator.SUBSIZED);
long sliceFence = calcSliceFence(skip, limit);
switch (shape) {
case REFERENCE:
return new StreamSpliterators
.SliceSpliterator.OfRef<>(s, skip, sliceFence);
case INT_VALUE:
return (Spliterator<P_IN>) new StreamSpliterators
.SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
case LONG_VALUE:
return (Spliterator<P_IN>) new StreamSpliterators
.SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
case DOUBLE_VALUE:
return (Spliterator<P_IN>) new StreamSpliterators
.SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
default:
throw new IllegalStateException("Unknown shape " + shape);
}
@SuppressWarnings("unchecked")
Spliterator<P_IN> sliceSpliterator = (Spliterator<P_IN>) switch (shape) {
case REFERENCE
-> new StreamSpliterators.SliceSpliterator.OfRef<>(s, skip, sliceFence);
case INT_VALUE
-> new StreamSpliterators.SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
case LONG_VALUE
-> new StreamSpliterators.SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
case DOUBLE_VALUE
-> new StreamSpliterators.SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
};
return sliceSpliterator;
}

/**
Expand All @@ -110,9 +105,15 @@ public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;

return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
flags(limit)) {
@Override
long exactOutputSize(long previousSize) {
return calcSize(previousSize, skip, normalizedLimit);
}

Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
Expand Down Expand Up @@ -182,9 +183,9 @@ <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,

@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
return new Sink.ChainedReference<>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
long m = normalizedLimit;

@Override
public void begin(long size) {
Expand Down Expand Up @@ -226,9 +227,15 @@ public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;

return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
flags(limit)) {
@Override
long exactOutputSize(long previousSize) {
return calcSize(previousSize, skip, normalizedLimit);
}

Spliterator.OfInt unorderedSkipLimitSpliterator(
Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
Expand Down Expand Up @@ -291,9 +298,9 @@ <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,

@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<Integer>(sink) {
return new Sink.ChainedInt<>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
long m = normalizedLimit;

@Override
public void begin(long size) {
Expand Down Expand Up @@ -335,9 +342,15 @@ public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;

return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
flags(limit)) {
@Override
long exactOutputSize(long previousSize) {
return calcSize(previousSize, skip, normalizedLimit);
}

Spliterator.OfLong unorderedSkipLimitSpliterator(
Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
Expand Down Expand Up @@ -400,9 +413,9 @@ <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,

@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<Long>(sink) {
return new Sink.ChainedLong<>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
long m = normalizedLimit;

@Override
public void begin(long size) {
Expand Down Expand Up @@ -444,9 +457,15 @@ public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
long normalizedLimit = limit >= 0 ? limit : Long.MAX_VALUE;

return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
flags(limit)) {
@Override
long exactOutputSize(long previousSize) {
return calcSize(previousSize, skip, normalizedLimit);
}

Spliterator.OfDouble unorderedSkipLimitSpliterator(
Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
Expand Down Expand Up @@ -509,9 +528,9 @@ <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,

@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<Double>(sink) {
return new Sink.ChainedDouble<>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
long m = normalizedLimit;

@Override
public void begin(long size) {
Expand Down Expand Up @@ -541,7 +560,7 @@ public boolean cancellationRequested() {
}

private static int flags(long limit) {
return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
return StreamOpFlag.IS_SIZE_ADJUSTING | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
}

/**
Expand Down
21 changes: 19 additions & 2 deletions src/java.base/share/classes/java/util/stream/StreamOpFlag.java
Expand Up @@ -325,12 +325,24 @@ enum StreamOpFlag {
*/
// 12, 0x01000000
SHORT_CIRCUIT(12,
set(Type.OP).set(Type.TERMINAL_OP));
set(Type.OP).set(Type.TERMINAL_OP)),

/**
* Characteristic value signifying that an operation may adjust the
* total size of the stream.
* <p>
* The flag, if present, is only valid when SIZED is present;
* and is only valid for sequential streams.
* <p>
* An intermediate operation can preserve or inject this value.
*/
// 13, 0x04000000
SIZE_ADJUSTING(13,
set(Type.OP));

// The following 2 flags are currently undefined and a free for any further
// stream flags if/when required
//
// 13, 0x04000000
// 14, 0x10000000
// 15, 0x40000000

Expand Down Expand Up @@ -629,6 +641,11 @@ private static int createFlagMask() {
*/
static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;

/**
* The bit value to inject {@link #SIZE_ADJUSTING}.
*/
static final int IS_SIZE_ADJUSTING = SIZE_ADJUSTING.set;

private static int getMask(int flags) {
return (flags == 0)
? FLAG_MASK
Expand Down

1 comment on commit 0c9daa7

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.