Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
8238286: Add new flatMap stream operation that is more amenable to pu…
…shing

This patch adds a new flatmap-like operation called mapMulti to the java.util.Stream class as well as the primitive variations of this operation i.e. mapMultiToInt, IntStream mapMulti, etc.

Reviewed-by: psandoz, smarks
  • Loading branch information
pconcannon committed Aug 31, 2020
1 parent 208b120 commit 83e0ecb953d7b27282460aed767c95bb59471593
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -304,6 +304,31 @@ public boolean cancellationRequested() {
};
}

@Override
public final DoubleStream mapMulti(DoubleMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<>(this, StreamShape.DOUBLE_VALUE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {

@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<>(sink) {

@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
@SuppressWarnings("unchecked")
public void accept(double t) {
mapper.accept(t, (DoubleConsumer) downstream);
}
};
}
};
}

@Override
public DoubleStream unordered() {
if (!isOrdered())
@@ -163,6 +163,43 @@ public interface DoubleStream extends BaseStream<Double, DoubleStream> {
*/
DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper);

/**
* Returns a stream consisting of the results of replacing each element of
* this stream with multiple elements, specifically zero or more elements.
* Replacement is performed by applying the provided mapping function to each
* element in conjunction with a {@linkplain DoubleConsumer consumer} argument
* that accepts replacement elements. The mapping function calls the consumer
* zero or more times to provide the replacement elements.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* <p>If the {@linkplain DoubleConsumer consumer} argument is used outside the scope of
* its application to the mapping function, the results are undefined.
*
* @implSpec
* The default implementation invokes {@link #flatMap flatMap} on this stream,
* passing a function that behaves as follows. First, it calls the mapper function
* with a {@code DoubleConsumer} that accumulates replacement elements into a newly created
* internal buffer. When the mapper function returns, it creates a {@code DoubleStream} from the
* internal buffer. Finally, it returns this stream to {@code flatMap}.
*
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function that generates replacement elements
* @return the new stream
* @see Stream#mapMulti Stream.mapMulti
* @since 16
*/
default DoubleStream mapMulti(DoubleMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return flatMap(e -> {
SpinedBuffer.OfDouble buffer = new SpinedBuffer.OfDouble();
mapper.accept(e, buffer);
return StreamSupport.doubleStream(buffer.spliterator(), false);
});
}

/**
* Returns a stream consisting of the distinct elements of this stream. The
* elements are compared for equality according to
@@ -1180,4 +1217,30 @@ default Builder add(double t) {
*/
DoubleStream build();
}

/**
* Represents an operation that accepts a {@code double}-valued argument
* and a DoubleConsumer, and returns no result. This functional interface is
* used by {@link DoubleStream#mapMulti(DoubleMapMultiConsumer) DoubleStream.mapMulti}
* to replace a double value with zero or more double values.
*
* <p>This is a <a href="../function/package-summary.html">functional interface</a>
* whose functional method is {@link #accept(double, DoubleConsumer)}.
*
* @see DoubleStream#mapMulti(DoubleMapMultiConsumer)
*
* @since 16
*/
@FunctionalInterface
interface DoubleMapMultiConsumer {

/**
* Replaces the given {@code value} with zero or more values by feeding the mapped
* values to the {@code dc} consumer.
*
* @param value the double value coming from upstream
* @param dc a {@code DoubleConsumer} accepting the mapped values
*/
void accept(double value, DoubleConsumer dc);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -338,6 +338,30 @@ public boolean cancellationRequested() {
};
}

@Override
public final IntStream mapMulti(IntMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<>(this, StreamShape.INT_VALUE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<>(sink) {

@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
@SuppressWarnings("unchecked")
public void accept(int t) {
mapper.accept(t, (IntConsumer) downstream);
}
};
}
};
}

@Override
public IntStream unordered() {
if (!isOrdered())
@@ -164,6 +164,43 @@ public interface IntStream extends BaseStream<Integer, IntStream> {
*/
IntStream flatMap(IntFunction<? extends IntStream> mapper);

/**
* Returns a stream consisting of the results of replacing each element of
* this stream with multiple elements, specifically zero or more elements.
* Replacement is performed by applying the provided mapping function to each
* element in conjunction with a {@linkplain IntConsumer consumer} argument
* that accepts replacement elements. The mapping function calls the consumer
* zero or more times to provide the replacement elements.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* <p>If the {@linkplain IntConsumer consumer} argument is used outside the scope of
* its application to the mapping function, the results are undefined.
*
* @implSpec
* The default implementation invokes {@link #flatMap flatMap} on this stream,
* passing a function that behaves as follows. First, it calls the mapper function
* with an {@code IntConsumer} that accumulates replacement elements into a newly created
* internal buffer. When the mapper function returns, it creates an {@code IntStream} from the
* internal buffer. Finally, it returns this stream to {@code flatMap}.
*
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function that generates replacement elements
* @return the new stream
* @see Stream#mapMulti Stream.mapMulti
* @since 16
*/
default IntStream mapMulti(IntMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return flatMap(e -> {
SpinedBuffer.OfInt buffer = new SpinedBuffer.OfInt();
mapper.accept(e, buffer);
return StreamSupport.intStream(buffer.spliterator(), false);
});
}

/**
* Returns a stream consisting of the distinct elements of this stream.
*
@@ -1173,4 +1210,30 @@ default Builder add(int t) {
*/
IntStream build();
}

/**
* Represents an operation that accepts an {@code int}-valued argument
* and an IntConsumer, and returns no result. This functional interface is
* used by {@link IntStream#mapMulti(IntMapMultiConsumer) IntStream.mapMulti}
* to replace an int value with zero or more int values.
*
* <p>This is a <a href="../function/package-summary.html">functional interface</a>
* whose functional method is {@link #accept(int, IntConsumer)}.
*
* @see IntStream#mapMulti(IntMapMultiConsumer)
*
* @since 16
*/
@FunctionalInterface
interface IntMapMultiConsumer {

/**
* Replaces the given {@code value} with zero or more values by feeding the mapped
* values to the {@code ic} consumer.
*
* @param value the int value coming from upstream
* @param ic an {@code IntConsumer} accepting the mapped values
*/
void accept(int value, IntConsumer ic);
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -320,6 +320,30 @@ public boolean cancellationRequested() {
};
}

@Override
public final LongStream mapMulti(LongMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp<>(this, StreamShape.LONG_VALUE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<>(sink) {

@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
@SuppressWarnings("unchecked")
public void accept(long t) {
mapper.accept(t, (LongConsumer) downstream);
}
};
}
};
}

@Override
public LongStream unordered() {
if (!isOrdered())
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013, 2016, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -164,6 +164,43 @@ public interface LongStream extends BaseStream<Long, LongStream> {
*/
LongStream flatMap(LongFunction<? extends LongStream> mapper);

/**
* Returns a stream consisting of the results of replacing each element of
* this stream with multiple elements, specifically zero or more elements.
* Replacement is performed by applying the provided mapping function to each
* element in conjunction with a {@linkplain LongConsumer consumer} argument
* that accepts replacement elements. The mapping function calls the consumer
* zero or more times to provide the replacement elements.
*
* <p>This is an <a href="package-summary.html#StreamOps">intermediate
* operation</a>.
*
* <p>If the {@linkplain LongConsumer consumer} argument is used outside the scope of
* its application to the mapping function, the results are undefined.
*
* @implSpec
* The default implementation invokes {@link #flatMap flatMap} on this stream,
* passing a function that behaves as follows. First, it calls the mapper function
* with a {@code LongConsumer} that accumulates replacement elements into a newly created
* internal buffer. When the mapper function returns, it creates a {@code LongStream} from the
* internal buffer. Finally, it returns this stream to {@code flatMap}.
*
* @param mapper a <a href="package-summary.html#NonInterference">non-interfering</a>,
* <a href="package-summary.html#Statelessness">stateless</a>
* function that generates replacement elements
* @return the new stream
* @see Stream#mapMulti Stream.mapMulti
* @since 16
*/
default LongStream mapMulti(LongMapMultiConsumer mapper) {
Objects.requireNonNull(mapper);
return flatMap(e -> {
SpinedBuffer.OfLong buffer = new SpinedBuffer.OfLong();
mapper.accept(e, buffer);
return StreamSupport.longStream(buffer.spliterator(), false);
});
}

/**
* Returns a stream consisting of the distinct elements of this stream.
*
@@ -1177,4 +1214,30 @@ default Builder add(long t) {
*/
LongStream build();
}

/**
* Represents an operation that accepts a {@code long}-valued argument
* and a LongConsumer, and returns no result. This functional interface is
* used by {@link LongStream#mapMulti(LongStream.LongMapMultiConsumer) LongStream.mapMulti}
* to replace a long value with zero or more long values.
*
* <p>This is a <a href="../function/package-summary.html">functional interface</a>
* whose functional method is {@link #accept(long, LongConsumer)}.
*
* @see LongStream#mapMulti(LongStream.LongMapMultiConsumer)
*
* @since 16
*/
@FunctionalInterface
interface LongMapMultiConsumer {

/**
* Replaces the given {@code value} with zero or more values by feeding the mapped
* values to the {@code lc} consumer.
*
* @param value the long value coming from upstream
* @param lc a {@code LongConsumer} accepting the mapped values
*/
void accept(long value, LongConsumer lc);
}
}

0 comments on commit 83e0ecb

Please sign in to comment.