Skip to content

Commit

Permalink
Fix error handling for delayed subscription in ReactiveClientReader (#…
Browse files Browse the repository at this point in the history
…9785)

When an error arrives before the ReactiveClientReader was subscribed to, we would get an exception.

Fixes #9776
  • Loading branch information
yawkat committed Aug 29, 2023
1 parent 47c5fb7 commit 65b6334
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.http.netty.reactive.HotObservable;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -38,7 +39,9 @@
abstract class ReactiveClientReader extends ChannelInboundHandlerAdapter implements HotObservable<HttpContent>, Subscription {
private EventLoop eventLoop;
private ChannelHandlerContext ctx;
@Nullable
private Subscriber<? super HttpContent> subscriber;
private Throwable heldBackException;
private long demand;
private boolean cancelled = false;

Expand All @@ -48,12 +51,21 @@ public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
eventLoop = ctx.channel().eventLoop();
}

private void forwardException(Throwable t) {
if (subscriber == null) {
// no subscriber yet
heldBackException = t;
} else {
subscriber.onError(t);
}
}

@Override
public final void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (!cancelled) {
cancelled = true;
subscriber.onError(new ResponseClosedException("Connection closed before full response body was transferred"));
forwardException(new ResponseClosedException("Connection closed before full response body was transferred"));
}
}

Expand All @@ -69,6 +81,11 @@ public final void subscribe(Subscriber<? super HttpContent> s) {

subscriber = s;
s.onSubscribe(this);
if (heldBackException != null) {
// already got an error
s.onError(heldBackException);
heldBackException = null;
}
}

@Override
Expand Down Expand Up @@ -118,6 +135,7 @@ public final void channelRead(ChannelHandlerContext ctx, Object msg) throws Exce
}
} else {
assert demand > 0 : "should be ensured by FlowControlHandler";
// demand > 0 => subscriber != null, so this is safe
subscriber.onNext((HttpContent) msg);
if (last) {
cancelled = true;
Expand All @@ -136,7 +154,7 @@ public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) th
} else {
cancelled = true;
remove(ctx);
subscriber.onError(cause);
forwardException(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.micronaut.http.client.netty

import io.micronaut.http.client.exceptions.ResponseClosedException
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.embedded.EmbeddedChannel
Expand All @@ -8,6 +9,7 @@ import io.netty.handler.codec.http.HttpContent
import io.netty.handler.flow.FlowControlHandler
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import spock.lang.Specification

class ReactiveClientReaderSpec extends Specification {
Expand Down Expand Up @@ -79,4 +81,48 @@ class ReactiveClientReaderSpec extends Specification {
item == c3
!nested
}

def 'error before subscribe'() {
given:
def reader = new ReactiveClientReader() {
@Override
protected void remove(ChannelHandlerContext ctx) {
}
}
def channel = new EmbeddedChannel(reader)
def err = new RuntimeException()

when:
channel.pipeline().fireExceptionCaught(err)
channel.checkException()
then:
noExceptionThrown()

when:
Flux.from(reader).blockLast()
then:
def e = thrown RuntimeException
e == err
}

def 'inactive before subscribe'() {
given:
def reader = new ReactiveClientReader() {
@Override
protected void remove(ChannelHandlerContext ctx) {
}
}
def channel = new EmbeddedChannel(reader)

when:
channel.pipeline().fireChannelInactive()
channel.checkException()
then:
noExceptionThrown()

when:
Flux.from(reader).blockLast()
then:
thrown ResponseClosedException
}
}

0 comments on commit 65b6334

Please sign in to comment.