New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add flatMap method for collapsing streams into one #4227
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4227 +/- ##
===========================================
+ Coverage 0 74.09% +74.09%
- Complexity 0 20908 +20908
===========================================
Files 0 1810 +1810
Lines 0 77148 +77148
Branches 0 9867 +9867
===========================================
+ Hits 0 57161 +57161
- Misses 0 15328 +15328
- Partials 0 4659 +4659 ☔ View full report in Codecov by Sentry. |
|
||
@Override | ||
public CompletableFuture<Void> whenComplete() { | ||
return source.whenComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we complete this stream when all nested stream are completed?
|
||
@Override | ||
public long demand() { | ||
return source.demand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see we can provide the current demand with source.demand()
. How about tracking the demand manually rather than delegating it?
|
||
@Override | ||
public void abort() { | ||
source.abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also abort the nested StreamMessage
s in progress?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good idea. If I call source.abort()
here, will that propagate to the Subscription
so that I can just cancel my nested subscriptions in onError
?
|
||
private void handleRequest(long n) { | ||
requestedByDownstream = LongMath.saturatedAdd(requestedByDownstream, n); | ||
upstream.request(maxConcurrency - sourceSubscriptions.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any cases where sourceSubscriptions.size()
is greater than maxConcurrency
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should never happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add:
if (maxConcurrency - sourceSubscriptions.size() != 0) {...}
so that we don't request with 0
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we call flush
before request? so that we don't call upstream.request(...)
if we have enough elements to hand on to the downstream?
.limit(available).forEach(sub -> sub.request(1)); | ||
} | ||
|
||
private void dump() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about flush()
? I found it not easy to understand this behavior with dump()
.
this.maxConcurrency = maxConcurrency; | ||
|
||
sourceSubscriptions = new HashSet<>(); | ||
buffer = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using ArrayDeque
for less memory footprints?
} | ||
|
||
final StreamMessage<U> newStreamMessage = function.apply(item); | ||
newStreamMessage.subscribe(new FlatMapSubscriber<>(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newStreamMessage.subscribe(new FlatMapSubscriber<>(this)); | |
newStreamMessage.subscribe(new FlatMapSubscriber<>(this), executor, options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it OK to just pass executor here? I don't think there are any options that need to be applied
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ikhoon Gentle ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried passing the executor here and I'm running into an issue I'm not sure how to address.
This code calls subscribe
, which calls onSubscribe
on FlatMapSubscriber
. That will call subscribeChild
on FlatMapAggregatingSubscriber
, which will add the subscriber to sourceSubscriptions
and then call request
on FlatMapSubscriber
. Then, if newStreamMessage
is already completed it will call onComplete
, which leads to the subscription being removed from sourceSubscriptions
.
Essentially, in one call chain we go from onSubscribe
to sub.request(1)
all the way to onComplete
, but since that removes the subscription from sourceSubscriptions
I get a ConcurrentModificationException
since the call to sub.request(1)
is done in a forEach
.
I guess I should just collect the subs into a new collection before calling forEach
to avoid that? I'm not sure if there's a better approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. How about always using executor
to remove the child?
executor.execute(() -> handleCompleteChild(child));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that works, we can remove all eventLoop.inEventLoop()
check because we only use the executor.
} | ||
canceled = true; | ||
|
||
downstream.onError(cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto: Cancel all inner StreamMessage
s?
final Optional<Long> requested = sourceSubscriptions.stream().map(FlatMapSubscriber::getRequested) | ||
.reduce(LongMath::saturatedAdd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Optional<Long> requested = sourceSubscriptions.stream().map(FlatMapSubscriber::getRequested) | |
.reduce(LongMath::saturatedAdd); | |
final long requested = sourceSubscriptions.stream().mapToLong(FlatMapSubscriber::getRequested).sum(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will that handle overflowing in case getRequested
returns Long.MAX_VALUE
?
return; | ||
} | ||
|
||
final long available = getAvailableBufferSpace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If available
is Long.MAX_VALUE
, we don't need backpressure with buffering.
Gentle ping, @sleeplesslord 🙇 |
Hey, sorry for letting this PR go stale for so long, I was addressing the comments but then I lost track of things 😅 I think I've covered most of the feedback now |
* As per | ||
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13"> | ||
* Reactive Streams Specification 2.13</a>, the specified {@link Function} should not return | ||
* a {@code null} value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* a {@code null} value | |
* a {@code null} value. |
core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/StreamMessage.java
Show resolved
Hide resolved
this.source = (StreamMessage<T>) source; | ||
this.function = (Function<T, StreamMessage<U>>) function; | ||
this.maxConcurrency = maxConcurrency; | ||
completionFuture = new CompletableFuture<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We usually use EventLoopCheckingFuture
for this.
} | ||
|
||
final StreamMessage<U> newStreamMessage = function.apply(item); | ||
newStreamMessage.subscribe(new FlatMapSubscriber<>(this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. How about always using executor
to remove the child?
executor.execute(() -> handleCompleteChild(child));
.filter(sub -> sub.getRequested() | ||
== 0) | ||
.limit(available) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
.collect(Collectors.toList()); | |
.collect(toImmutableList()); |
Yeah, creating a new list also works. 😄
So, Could you remove if (executor.inEventLoop()) {..} else {...}
from this class where possible? 😄
e.g. completeChild
, onNextChild
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that because the FlatMapSubscriber
will run with the same executor, so we know that completeChild
etc will always run in the same executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes exactly. 😄
All onX
methods in FlatMapAggregatingSubscriber
and FlatMapSubscriber
are now called by the executor
so we don't have to check it. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Left some more comments. 😉
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
|
||
private void handleRequest(long n) { | ||
requestedByDownstream = LongMath.saturatedAdd(requestedByDownstream, n); | ||
upstream.request(maxConcurrency - sourceSubscriptions.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add:
if (maxConcurrency - sourceSubscriptions.size() != 0) {...}
so that we don't request with 0
.
|
||
private void handleRequest(long n) { | ||
requestedByDownstream = LongMath.saturatedAdd(requestedByDownstream, n); | ||
upstream.request(maxConcurrency - sourceSubscriptions.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we call flush
before request? so that we don't call upstream.request(...)
if we have enough elements to hand on to the downstream?
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there 🙏
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
core/src/main/java/com/linecorp/armeria/common/stream/FlatMapStreamMessage.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a small commit which:
- Addresses a
ConcurrentModificationException
thrown when theforEach
call inside#requestAllAvailable
callschildSubscribers#remove
in#completeChild
.childSubscribers
is copied to mitigate this issue. FlatMapStreamMessage
could be closed, whereas the innerFlatMapSubscriber
may not be closed. This is possible ifabort
is called beforeFlatMapSubscriber#onSubscribe
is invoked. I've changed so thatFlatMapSubscriber
is added tochildSubscribers
immediately.- Miscellaneous possible NPEs were dealt with
- Flaky tests were dealt with
Let me know if any changes don't make sense.
Lastly, sorry this PR took so long 😅 Looks good to me for merging 👍
Thanks! I also put some miscellaneous changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, @sleeplesslord and @jrhee17!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! 🚀 💯
Thanks for the fixes and the merge, happy to see this make it in 🙏 |
Motivation:
StreamMessage
hasmap
, but it is inconvenient to use it when the mapping function itself returns aStreamMessage
. This PR addsflatMap
which allows applying a function that returns aStreamMessage
without ending up with nestedStreamMessage
s.Modifications:
FlatMapStreamMessage
, which uses a function that returns aStreamMessage
to modify the stream.StreamMessage.flatMap()
to allow easily creating aFlatMapStreamMessage
from an existing stream.Result:
StreamMessage.flatMap()
#3966StreamMessage.flatMap
to modify a stream using a function that returns aStreamMessage
.