Skip to content

Commit

Permalink
Allow onComplete before request in PipeliningServerHandler (#10017)
Browse files Browse the repository at this point in the history
It is legal for a reactive publisher to call onComplete immediately after subscribe, before data is requested. This could lead to a failure when such a publisher was queued for write.

Addresses #9677
  • Loading branch information
yawkat committed Oct 24, 2023
1 parent 5504b49 commit f7bc0ad
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ private final class StreamingOutboundHandler extends OutboundHandler implements
private final OutboundAccess outboundAccess;
private HttpResponse initialMessage;
private Subscription subscription;
private boolean earlyComplete = false;
private boolean writtenLast = false;

StreamingOutboundHandler(OutboundAccess outboundAccess, HttpResponse initialMessage) {
Expand All @@ -814,7 +815,13 @@ void writeSome() {
write(initialMessage, false, false);
initialMessage = null;
}
subscription.request(1);
if (earlyComplete) {
// onComplete has been called before the first writeSome. Trigger onComplete
// handling again.
onComplete();
} else {
subscription.request(1);
}
}

@Override
Expand Down Expand Up @@ -881,7 +888,9 @@ public void onComplete() {
}

if (outboundHandler != this) {
throw new IllegalStateException("onComplete before request?");
// onComplete can be called immediately after onSubscribe, before request.
earlyComplete = true;
return;
}

outboundHandler = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import spock.lang.Issue
Expand Down Expand Up @@ -290,6 +291,45 @@ class PipeliningServerHandlerSpec extends Specification {
completeOnCancel << [true, false]
}

def 'empty streaming response while in queue'() {
given:
def resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
def sink = Sinks.many().unicast().<HttpContent>onBackpressureBuffer()
def ch = new EmbeddedChannel(new PipeliningServerHandler(new RequestHandler() {
int i = 0

@Override
void accept(ChannelHandlerContext ctx, HttpRequest request, PipeliningServerHandler.OutboundAccess outboundAccess) {
if (i++ == 0) {
outboundAccess.writeStreamed(resp, sink.asFlux())
} else {
outboundAccess.writeStreamed(resp, Flux.empty())
}
}

@Override
void handleUnboundError(Throwable cause) {
cause.printStackTrace()
}
}))

when:
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
then:
ch.readOutbound() == null

when:
sink.tryEmitComplete()
then:
ch.readOutbound() == resp
ch.readOutbound() == LastHttpContent.EMPTY_LAST_CONTENT
ch.readOutbound() == resp
ch.readOutbound() == LastHttpContent.EMPTY_LAST_CONTENT
ch.readOutbound() == null
}

static class MonitorHandler extends ChannelOutboundHandlerAdapter {
int flush = 0
int read = 0
Expand Down

0 comments on commit f7bc0ad

Please sign in to comment.