Skip to content

Commit

Permalink
Fix to not log instead bubbling fatal exceptions
Browse files Browse the repository at this point in the history
Wrap possible singleton ClosedChannelException
  • Loading branch information
Stephane Maldini committed May 8, 2019
1 parent a55c667 commit d850804
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
12 changes: 10 additions & 2 deletions src/main/java/reactor/netty/FutureMono.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package reactor.netty;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -253,8 +255,14 @@ public void cancel() {
@Override
@SuppressWarnings("unchecked")
public void operationComplete(F future) {
if (!future.isSuccess()) {
s.onError(future.cause());
if (!future.isSuccess()) {//Avoid singleton
if (future.cause() instanceof ClosedChannelException) {
//Update with a common aborted exception?
s.onError(new IOException(future.cause()));
}
else {
s.onError(future.cause());
}
}
else {
s.onComplete();
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/reactor/netty/channel/AbortedException.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public AbortedException(String message) {
super(message);
}

public AbortedException(Throwable cause) {
super(cause);
}

/**
* Return true if connection has been simply aborted on a tcp level by verifying if
* the given inbound error.
Expand All @@ -50,4 +54,12 @@ public static boolean isConnectionReset(Throwable err) {
err.getMessage()
.contains("Connection reset by peer"));
}

@Override
public synchronized Throwable fillInStackTrace() {
if (getCause() != null) {
return getCause().fillInStackTrace();
}
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.DecoderResultProvider;
import io.netty.util.ReferenceCountUtil;
import reactor.core.Exceptions;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.NettyOutbound;
Expand Down Expand Up @@ -78,7 +77,6 @@ final public void channelInactive(ChannelHandlerContext ctx) {
}
}
catch (Throwable err) {
Exceptions.throwIfJvmFatal(err);
exceptionCaught(ctx, err);
}
}
Expand Down Expand Up @@ -114,15 +112,13 @@ final public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}
catch (Throwable err) {
Exceptions.throwIfJvmFatal(err);
exceptionCaught(ctx, err);
ReferenceCountUtil.safeRelease(msg);
exceptionCaught(ctx, err);
}
}

@Override
final public void exceptionCaught(ChannelHandlerContext ctx, Throwable err) {
Exceptions.throwIfJvmFatal(err);
Connection connection = Connection.from(ctx.channel());
ChannelOperations<?, ?> ops = connection.as(ChannelOperations.class);
if (ops != null) {
Expand Down
27 changes: 22 additions & 5 deletions src/main/java/reactor/netty/channel/FluxReceive.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.netty.channel;

import java.nio.channels.ClosedChannelException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
Expand All @@ -34,7 +35,6 @@
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import static reactor.netty.ReactorNetty.format;

Expand Down Expand Up @@ -339,17 +339,34 @@ final void onInboundComplete() {

final void onInboundError(Throwable err) {
if (isCancelled() || inboundDone) {
Context c = receiver == null ? Context.empty() : receiver.currentContext();
Operators.onErrorDropped(err, c);
if (log.isWarnEnabled()) {
log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.getClass().getSimpleName());
}
else if (log.isDebugEnabled()) {
log.error(format(channel, "An exception has been observed post termination"), err);
}
return;
}
CoreSubscriber<?> receiver = this.receiver;
this.inboundError = err;
this.inboundDone = true;

if(channel.isActive()){
parent.markPersistent(false);
}

if (err instanceof OutOfMemoryError) {
if (log.isWarnEnabled()) {
// log.error(format(channel, "An attempt to allocate memory has failed"), err);
}
this.inboundError = new AbortedException(err);
parent.terminate(); //get rid of the resource
}
else if (err instanceof ClosedChannelException) {
this.inboundError = new AbortedException("Channel has been closed");
}
else {
this.inboundError = err;
}

if (receiverFastpath && receiver != null) {
//parent.listener.onReceiveError(channel, err);
receiver.onError(err);
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/reactor/netty/channel/MonoSendMany.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package reactor.netty.channel;

import java.nio.channels.ClosedChannelException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -168,6 +169,11 @@ public void onError(Throwable t) {
cleanup();
}

//Avoid singleton
if (t instanceof ClosedChannelException) {
t = new AbortedException("Channel has been closed");
}

actual.onError(t);
}

Expand Down Expand Up @@ -239,7 +245,8 @@ public void operationComplete(ChannelFuture future) {
if (WIP.getAndIncrement(this) == 0) {
cleanup();
}
actual.onError(new AbortedException("Closed channel ["+ctx.channel().id().asShortText()+"] while sending operation active"));
//actual.onError(new AbortedException("Closed channel ["+ctx.channel().id().asShortText()+"] while sending operation active"));
actual.onComplete();
}
}

Expand Down

0 comments on commit d850804

Please sign in to comment.