diff --git a/http-client/src/main/java/io/micronaut/http/client/netty/ReactiveClientReader.java b/http-client/src/main/java/io/micronaut/http/client/netty/ReactiveClientReader.java index fe36a0c442d..d59cc65cf60 100644 --- a/http-client/src/main/java/io/micronaut/http/client/netty/ReactiveClientReader.java +++ b/http-client/src/main/java/io/micronaut/http/client/netty/ReactiveClientReader.java @@ -16,6 +16,7 @@ package io.micronaut.http.client.netty; import io.micronaut.core.annotation.Internal; +import io.micronaut.core.annotation.Nullable; import io.micronaut.http.client.exceptions.ResponseClosedException; import io.micronaut.http.netty.reactive.HotObservable; import io.netty.channel.ChannelHandlerContext; @@ -38,7 +39,9 @@ abstract class ReactiveClientReader extends ChannelInboundHandlerAdapter implements HotObservable, Subscription { private EventLoop eventLoop; private ChannelHandlerContext ctx; + @Nullable private Subscriber subscriber; + private Throwable heldBackException; private long demand; private boolean cancelled = false; @@ -48,12 +51,21 @@ public final void handlerAdded(ChannelHandlerContext ctx) throws Exception { eventLoop = ctx.channel().eventLoop(); } + private void forwardException(Throwable t) { + if (subscriber == null) { + // no subscriber yet + heldBackException = t; + } else { + subscriber.onError(t); + } + } + @Override public final void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); if (!cancelled) { cancelled = true; - subscriber.onError(new ResponseClosedException("Connection closed before full response body was transferred")); + forwardException(new ResponseClosedException("Connection closed before full response body was transferred")); } } @@ -69,6 +81,11 @@ public final void subscribe(Subscriber s) { subscriber = s; s.onSubscribe(this); + if (heldBackException != null) { + // already got an error + s.onError(heldBackException); + heldBackException = null; + } } @Override @@ -118,6 +135,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exce } } else { assert demand > 0 : "should be ensured by FlowControlHandler"; + // demand > 0 => subscriber != null, so this is safe subscriber.onNext((HttpContent) msg); if (last) { cancelled = true; @@ -136,7 +154,7 @@ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) th } else { cancelled = true; remove(ctx); - subscriber.onError(cause); + forwardException(cause); } } diff --git a/http-client/src/test/groovy/io/micronaut/http/client/netty/ReactiveClientReaderSpec.groovy b/http-client/src/test/groovy/io/micronaut/http/client/netty/ReactiveClientReaderSpec.groovy index 6ba5200d16d..e67ad7cb8d0 100644 --- a/http-client/src/test/groovy/io/micronaut/http/client/netty/ReactiveClientReaderSpec.groovy +++ b/http-client/src/test/groovy/io/micronaut/http/client/netty/ReactiveClientReaderSpec.groovy @@ -1,5 +1,6 @@ package io.micronaut.http.client.netty +import io.micronaut.http.client.exceptions.ResponseClosedException import io.netty.buffer.Unpooled import io.netty.channel.ChannelHandlerContext import io.netty.channel.embedded.EmbeddedChannel @@ -8,6 +9,7 @@ import io.netty.handler.codec.http.HttpContent import io.netty.handler.flow.FlowControlHandler import org.reactivestreams.Subscriber import org.reactivestreams.Subscription +import reactor.core.publisher.Flux import spock.lang.Specification class ReactiveClientReaderSpec extends Specification { @@ -79,4 +81,48 @@ class ReactiveClientReaderSpec extends Specification { item == c3 !nested } + + def 'error before subscribe'() { + given: + def reader = new ReactiveClientReader() { + @Override + protected void remove(ChannelHandlerContext ctx) { + } + } + def channel = new EmbeddedChannel(reader) + def err = new RuntimeException() + + when: + channel.pipeline().fireExceptionCaught(err) + channel.checkException() + then: + noExceptionThrown() + + when: + Flux.from(reader).blockLast() + then: + def e = thrown RuntimeException + e == err + } + + def 'inactive before subscribe'() { + given: + def reader = new ReactiveClientReader() { + @Override + protected void remove(ChannelHandlerContext ctx) { + } + } + def channel = new EmbeddedChannel(reader) + + when: + channel.pipeline().fireChannelInactive() + channel.checkException() + then: + noExceptionThrown() + + when: + Flux.from(reader).blockLast() + then: + thrown ResponseClosedException + } }