Skip to content

Commit 9334a56

Browse files
committedNov 28, 2017
Don't automatically start requesting on subscribe
Motivation: HandlerSubscriber automatically starts requests when calling onSubscribe. But when a publisher subscribes, RxJava automatically requests the Subscription if there are some pending requests. When such race condition happen, we end up with 2 threads (calling and event loop) competing for requesting the Subscription. This results is out of order published messages. Modification: Make HandlerSubscriber#onSubscribe noop and actually delay it to after Publisher has subscribed. Result: No more out of order messages.
1 parent 8c27b67 commit 9334a56

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed
 

‎client/src/main/java/org/asynchttpclient/netty/request/body/NettyReactiveStreamsBody.java

+12
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
5858
NettySubscriber subscriber = new NettySubscriber(channel, future);
5959
channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
6060
publisher.subscribe(new SubscriberAdapter(subscriber));
61+
subscriber.delayedStart();
6162
}
6263
}
6364

@@ -108,6 +109,17 @@ protected void complete() {
108109
.addListener(future -> removeFromPipeline()));
109110
}
110111

112+
private volatile Subscription deferredSubscription;
113+
114+
@Override
115+
public void onSubscribe(Subscription subscription) {
116+
deferredSubscription = subscription;
117+
}
118+
119+
public void delayedStart() {
120+
super.onSubscribe(deferredSubscription);
121+
}
122+
111123
@Override
112124
protected void error(Throwable error) {
113125
if (error == null)

0 commit comments

Comments
 (0)
Failed to load comments.