Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flatMap method for collapsing streams into one #4227

Merged
merged 46 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d95055c
Add flatMap method for collapsing streams into one
sleeplesslord Apr 16, 2022
d3fba09
Manually handle completion tracking for flatMap
sleeplesslord May 5, 2022
49fbc2e
Cancel inner subscriptions when FlatMap is cancelled
sleeplesslord May 5, 2022
761da4c
Get demand from inner subscription
sleeplesslord May 5, 2022
74bf0b7
Rename method for clarity
sleeplesslord May 5, 2022
80141c3
Use ArrayDeque for smaller memory footprint
sleeplesslord May 5, 2022
0082442
Request Long.MAX_VALUE from source when downstream requests it
sleeplesslord May 5, 2022
3219a78
Improve testing and add associativity test
sleeplesslord Nov 26, 2022
589296b
Improve documentation for flatMap
sleeplesslord Dec 24, 2022
8d0df8e
Add check for maxConcurrency > 0 in flatMap
sleeplesslord Dec 24, 2022
a9f6ab4
Use EventLoopCheckingFuture
sleeplesslord Dec 24, 2022
9a6d5fc
Use same executor for FlatMapSubscribers
sleeplesslord Dec 24, 2022
9bfcb8a
Remove unnecessary executor checks
sleeplesslord Dec 25, 2022
87ec0fb
Use toImmutableList
sleeplesslord Dec 25, 2022
dc7c776
Pass SubscriptionOptions into FlatMapSubscriber
sleeplesslord Dec 29, 2022
400d092
Cancel properly when getting invalid request
sleeplesslord Dec 29, 2022
84c4556
Prevent requesting 0 from upstream
sleeplesslord Dec 29, 2022
08cf0ff
Flush before requesting to upstream to avoid unnecessary requests
sleeplesslord Dec 29, 2022
2f5cb04
Handle Long.MAX_VALUE in FlatMapSubscriber
sleeplesslord Dec 29, 2022
0155dcc
Prevent pendingSubscriptions from becoming negative
sleeplesslord Dec 29, 2022
d1c4319
Rename field and methods for clarity
sleeplesslord Dec 29, 2022
d2f053d
Avoid making unnecessary additional requests when available is MAX
sleeplesslord Dec 29, 2022
06f80eb
Fix lint
sleeplesslord Dec 31, 2022
0ebe59a
Merge branch 'master' of github.com:line/armeria into flatmap
jrhee17 Feb 16, 2023
28a1ea2
Simplify upstream request logic and fix missing upstream requests
sleeplesslord Feb 23, 2023
d7ac1c3
Fix stream being completed despite buffer having values
sleeplesslord Feb 23, 2023
44f3920
Complete future exceptionally on error
sleeplesslord Feb 23, 2023
ff2b023
Close or abort elements in the buffer when flatMap is cancelled
sleeplesslord Feb 23, 2023
6da1473
Prevent multiple calls to subscription.cancel in FlatMapSubscriber
sleeplesslord Feb 24, 2023
fc5948b
Improve formatting
sleeplesslord Mar 3, 2023
eff4ccc
Fix flatMap not completing after last elements are requested
sleeplesslord Mar 3, 2023
8aed68f
Clean up values received after canceling
sleeplesslord Mar 3, 2023
5a2b058
Clean up buffer to prevent mistakes
sleeplesslord Mar 3, 2023
a72d567
Handle multiple calls to cancel as NOP in flat map
sleeplesslord Mar 3, 2023
f3fa4a8
Run cancel through EventExecutor in flat map
sleeplesslord Mar 3, 2023
ca99d0c
Fix lint
sleeplesslord Mar 3, 2023
a69185c
Fix implementations of isOpen and isEmpty
sleeplesslord Apr 2, 2023
bc960cd
Remove unnecessary subscription cancel
sleeplesslord Apr 2, 2023
54a77e2
Handle future and downstream notification on cancel
sleeplesslord Apr 2, 2023
1cc7db7
Rename variable to a more fitting name
sleeplesslord Apr 2, 2023
80cebda
Move check into event loop to remove volatile
sleeplesslord Apr 2, 2023
8b2b98e
Merge branch 'main' into flatmap
jrhee17 Mar 21, 2024
eeb2085
handle leaks and concurrentmodificationexception
jrhee17 Mar 22, 2024
e7b3d7f
Return early and decrease the demand before calling onNext
minwoox Apr 1, 2024
b7ac1c4
Call completionFuture.completeExceptionally after subscriber.onError
minwoox Apr 1, 2024
3b35003
minor clean up
ikhoon Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,368 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.common.stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import com.google.common.math.LongMath;

import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.EventLoopCheckingFuture;
import com.linecorp.armeria.internal.common.stream.StreamMessageUtil;

import io.netty.util.concurrent.EventExecutor;

final class FlatMapStreamMessage<T, U> implements StreamMessage<U> {
private final StreamMessage<T> source;
private final Function<T, StreamMessage<U>> function;
private final int maxConcurrency;

private final CompletableFuture<Void> completionFuture;
private FlatMapAggregatingSubscriber<T, U> innerSubscriber;

@SuppressWarnings("unchecked")
FlatMapStreamMessage(StreamMessage<? extends T> source,
Function<? super T, ? extends StreamMessage<? extends U>> function,
int maxConcurrency) {
requireNonNull(source, "source");
requireNonNull(function, "function");

this.source = (StreamMessage<T>) source;
this.function = (Function<T, StreamMessage<U>>) function;
this.maxConcurrency = maxConcurrency;
completionFuture = new EventLoopCheckingFuture<>();
}

@Override
public boolean isOpen() {
return source.isOpen();
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean isEmpty() {
return source.isEmpty();
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public long demand() {
return innerSubscriber.requestedByDownstream;
}

@Override
public CompletableFuture<Void> whenComplete() {
return completionFuture;
}

@Override
public void subscribe(Subscriber<? super U> subscriber, EventExecutor executor,
SubscriptionOption... options) {
requireNonNull(subscriber, "subscriber");
requireNonNull(executor, "executor");
requireNonNull(options, "options");

innerSubscriber = new FlatMapAggregatingSubscriber<>(subscriber, function, executor, maxConcurrency,
completionFuture);

source.subscribe(innerSubscriber, executor, options);
}

@Override
public void abort() {
source.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also abort the nested StreamMessages in progress?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good idea. If I call source.abort() here, will that propagate to the Subscription so that I can just cancel my nested subscriptions in onError?

}

@Override
public void abort(Throwable cause) {
requireNonNull(cause, "cause");
source.abort(cause);
}

private static final class FlatMapAggregatingSubscriber<T, U> implements Subscriber<T>, Subscription {
private final int maxConcurrency;

private final Subscriber<? super U> downstream;
private final Function<T, StreamMessage<U>> function;
private final EventExecutor executor;
private final Set<FlatMapSubscriber<T, U>> sourceSubscriptions;
private final Queue<U> buffer;
private final CompletableFuture<Void> completionFuture;

@Nullable
private volatile Subscription upstream;
private volatile boolean canceled;
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved

private long requestedByDownstream;
private int pendingSubscriptions;
private boolean completing;

FlatMapAggregatingSubscriber(Subscriber<? super U> downstream,
Function<T, StreamMessage<U>> function,
EventExecutor executor,
int maxConcurrency,
CompletableFuture<Void> completionFuture) {
requireNonNull(downstream, "downstream");
requireNonNull(function, "function");
requireNonNull(executor, "executor");
requireNonNull(completionFuture, "completionFuture");

this.downstream = downstream;
this.function = function;
this.executor = executor;
this.maxConcurrency = maxConcurrency;
this.completionFuture = completionFuture;

sourceSubscriptions = new HashSet<>();
buffer = new ArrayDeque<>();
}

@Override
public void onSubscribe(Subscription subscription) {
requireNonNull(subscription, "subscription");

upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
requireNonNull(item, "item");

if (canceled) {
StreamMessageUtil.closeOrAbort(item);
return;
}

final StreamMessage<U> newStreamMessage = function.apply(item);
newStreamMessage.subscribe(new FlatMapSubscriber<>(this), executor);
minwoox marked this conversation as resolved.
Show resolved Hide resolved
pendingSubscriptions++;
minwoox marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void onError(Throwable cause) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also call completionFuture#completeExceptionally(cause) in this block

StreamMessage#whenComplete is an often used callback in this codebase, so it's important that this future is completed

requireNonNull(cause, "cause");

if (canceled) {
return;
}
canceled = true;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess whatever is in the buffer needs to be cleaned up/closed using StreamMessageUtil.closeOrAbort and also whenComplete needs to be completed exceptionally.

Same goes for when FlatMapAggregatingSubscriber#cancel is called.

cancelSourceSubscriptions();
downstream.onError(cause);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto: Cancel all inner StreamMessages?

}

@Override
public void onComplete() {
if (canceled) {
return;
}

if (sourceSubscriptions.isEmpty() && pendingSubscriptions == 0) {
downstream.onComplete();
completionFuture.complete(null);
} else {
completing = true;
}
}

@Override
public void request(long n) {
if (n <= 0) {
onError(new IllegalArgumentException(
"n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)"));
upstream.cancel();
minwoox marked this conversation as resolved.
Show resolved Hide resolved
return;
}

if (canceled) {
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

if (executor.inEventLoop()) {
handleRequest(n);
} else {
executor.execute(() -> handleRequest(n));
}
}

private void handleRequest(long n) {
requestedByDownstream = LongMath.saturatedAdd(requestedByDownstream, n);
upstream.request(maxConcurrency - sourceSubscriptions.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where sourceSubscriptions.size() is greater than maxConcurrency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never happen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add:

if (maxConcurrency - sourceSubscriptions.size() != 0) {...}

so that we don't request with 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we call flush before request? so that we don't call upstream.request(...) if we have enough elements to hand on to the downstream?


flush();
requestAllAvailable();
}

@Override
public void cancel() {
upstream.cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we cancel all StreamMessages in progress?

jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
cancelSourceSubscriptions();
}

private void cancelSourceSubscriptions() {
sourceSubscriptions.forEach(FlatMapSubscriber::cancel);
minwoox marked this conversation as resolved.
Show resolved Hide resolved
}

void subscribeChild(FlatMapSubscriber<T, U> child) {
minwoox marked this conversation as resolved.
Show resolved Hide resolved
pendingSubscriptions--;
sourceSubscriptions.add(child);

requestAllAvailable();
}

private long getAvailableBufferSpace() {
if (requestedByDownstream == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}

final Optional<Long> requested = sourceSubscriptions.stream().map(FlatMapSubscriber::getRequested)
.reduce(LongMath::saturatedAdd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final Optional<Long> requested = sourceSubscriptions.stream().map(FlatMapSubscriber::getRequested)
.reduce(LongMath::saturatedAdd);
final long requested = sourceSubscriptions.stream().mapToLong(FlatMapSubscriber::getRequested).sum();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that handle overflowing in case getRequested returns Long.MAX_VALUE?


return maxConcurrency - requested.orElse(0L) - buffer.size();
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

private void requestAllAvailable() {
if (sourceSubscriptions.isEmpty()) {
return;
}

final long available = getAvailableBufferSpace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If available is Long.MAX_VALUE, we don't need backpressure with buffering.


if (available == Long.MAX_VALUE) {
sourceSubscriptions.forEach(sub -> sub.request(Long.MAX_VALUE));
}
minwoox marked this conversation as resolved.
Show resolved Hide resolved

final List<FlatMapSubscriber<T, U>> toRequest = sourceSubscriptions.stream()
.filter(sub -> sub.getRequested()
== 0)
.limit(available)
.collect(toImmutableList());
toRequest.forEach(sub -> sub.request(1));
}

private void flush() {
while (requestedByDownstream > 0 && !buffer.isEmpty()) {
final U value = buffer.remove();

publishDownstream(value);
}
}

void completeChild(FlatMapSubscriber<T, U> child) {
sourceSubscriptions.remove(child);

if (sourceSubscriptions.isEmpty() && pendingSubscriptions == 0 && completing) {
flush();
downstream.onComplete();
completionFuture.complete(null);
}
}

void onNextChild(U value) {
requireNonNull(value, "value");
if (requestedByDownstream > 0) {
publishDownstream(value);
} else {
buffer.add(value);
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

requestAllAvailable();
}

private void publishDownstream(U item) {
if (canceled) {
StreamMessageUtil.closeOrAbort(item);
return;
}

downstream.onNext(item);

if (requestedByDownstream != Long.MAX_VALUE) {
requestedByDownstream--;
}
}
}

private static final class FlatMapSubscriber<T, U> implements Subscriber<U> {
private final FlatMapAggregatingSubscriber<T, U> parent;

private long requested;

@Nullable
private Subscription subscription;

FlatMapSubscriber(FlatMapAggregatingSubscriber<T, U> parent) {
requireNonNull(parent, "parent");

this.parent = parent;
requested = 0;
}

@Override
public void onSubscribe(Subscription subscription) {
requireNonNull(subscription, "subscription");

this.subscription = subscription;
parent.subscribeChild(this);
}

@Override
public void onNext(U value) {
requested--;
minwoox marked this conversation as resolved.
Show resolved Hide resolved
parent.onNextChild(value);
}

@Override
public void onError(Throwable cause) {
requireNonNull(cause, "cause");

subscription.cancel();
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved

parent.onError(cause);
}

@Override
public void onComplete() {
parent.completeChild(this);
}

public void request(long n) {
requested += n;
minwoox marked this conversation as resolved.
Show resolved Hide resolved
subscription.request(n);
}

public void cancel() {
subscription.cancel();
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}

public long getRequested() {
return requested;
}
}
}