Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow onComplete before request in PipeliningServerHandler #10017

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ private final class StreamingOutboundHandler extends OutboundHandler implements
private final OutboundAccess outboundAccess;
private HttpResponse initialMessage;
private Subscription subscription;
private boolean earlyComplete = false;
private boolean writtenLast = false;

StreamingOutboundHandler(OutboundAccess outboundAccess, HttpResponse initialMessage) {
Expand All @@ -814,7 +815,13 @@ void writeSome() {
write(initialMessage, false, false);
initialMessage = null;
}
subscription.request(1);
if (earlyComplete) {
// onComplete has been called before the first writeSome. Trigger onComplete
// handling again.
onComplete();
} else {
subscription.request(1);
}
}

@Override
Expand Down Expand Up @@ -881,7 +888,9 @@ public void onComplete() {
}

if (outboundHandler != this) {
throw new IllegalStateException("onComplete before request?");
// onComplete can be called immediately after onSubscribe, before request.
earlyComplete = true;
return;
}

outboundHandler = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpResponse
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.codec.http.LastHttpContent
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import spock.lang.Issue
Expand Down Expand Up @@ -290,6 +291,45 @@ class PipeliningServerHandlerSpec extends Specification {
completeOnCancel << [true, false]
}

def 'empty streaming response while in queue'() {
given:
def resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
resp.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
def sink = Sinks.many().unicast().<HttpContent>onBackpressureBuffer()
def ch = new EmbeddedChannel(new PipeliningServerHandler(new RequestHandler() {
int i = 0

@Override
void accept(ChannelHandlerContext ctx, HttpRequest request, PipeliningServerHandler.OutboundAccess outboundAccess) {
if (i++ == 0) {
outboundAccess.writeStreamed(resp, sink.asFlux())
} else {
outboundAccess.writeStreamed(resp, Flux.empty())
}
}

@Override
void handleUnboundError(Throwable cause) {
cause.printStackTrace()
}
}))

when:
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
ch.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
then:
ch.readOutbound() == null

when:
sink.tryEmitComplete()
then:
ch.readOutbound() == resp
ch.readOutbound() == LastHttpContent.EMPTY_LAST_CONTENT
ch.readOutbound() == resp
ch.readOutbound() == LastHttpContent.EMPTY_LAST_CONTENT
ch.readOutbound() == null
}

static class MonitorHandler extends ChannelOutboundHandlerAdapter {
int flush = 0
int read = 0
Expand Down