Skip to content

Commit

Permalink
Only call handlerRemoved(...) if handlerAdded(...) was called during …
Browse files Browse the repository at this point in the history
…adding the handler to the pipeline. (#8684)

Motivation:

Due a race in DefaultChannelPipeline / AbstractChannelHandlerContext it was possible to have only handlerRemoved(...) called during tearing down the pipeline, even when handlerAdded(...) was never called. We need to ensure we either call both of none to guarantee a proper lifecycle of the handler.

Modifications:

- Enforce handlerAdded(...) / handlerRemoved(...) semantics / ordering
- Add unit test.

Result:

Fixes #8676 / #6536 .
  • Loading branch information
normanmaurer committed Jan 14, 2019
1 parent 6ed296b commit d0891d0
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,17 @@ final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}

final void setAddComplete() {
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
Expand All @@ -979,6 +982,26 @@ final void setAddPending() {
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}

final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
handler().handlerAdded(this);
}
}

final void callHandlerRemoved() throws Exception {
try {
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}

/**
* Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
* yet. If not return {@code false} and if called or could not detect return {@code true}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,19 +631,12 @@ private static void checkMultiplicity(ChannelHandler handler) {

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
Expand All @@ -666,11 +659,7 @@ private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
ctx.callHandlerRemoved();
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down Expand Up @@ -1149,6 +1150,72 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

// Test for https://github.com/netty/netty/issues/8676.
@Test
public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
try {
final AtomicReference<Error> errorRef = new AtomicReference<Error>();

// As this only happens via a race we will verify 500 times. This was good enough to have it failed most of
// the time.
for (int i = 0; i < 500; i++) {

ChannelPipeline pipeline = new LocalChannel().pipeline();
group.register(pipeline.channel()).sync();

final CountDownLatch latch = new CountDownLatch(1);

pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Block just for a bit so we have a chance to trigger the race mentioned in the issue.
latch.await(50, TimeUnit.MILLISECONDS);
}
});

// Close the pipeline which will call destroy0(). This will remove each handler in the pipeline and
// should call handlerRemoved(...) if and only if handlerAdded(...) was called for the handler before.
pipeline.close();

pipeline.addLast(new ChannelInboundHandlerAdapter() {
private boolean handerAddedCalled;

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handerAddedCalled = true;
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
if (!handerAddedCalled) {
errorRef.set(new AssertionError(
"handlerRemoved(...) called without handlerAdded(...) before"));
}
}
});

latch.countDown();

pipeline.channel().closeFuture().syncUninterruptibly();

// Schedule something on the EventLoop to ensure all other scheduled tasks had a chance to complete.
pipeline.channel().eventLoop().submit(new Runnable() {
@Override
public void run() {
// NOOP
}
}).syncUninterruptibly();
Error error = errorRef.get();
if (error != null) {
throw error;
}
}
} finally {
group.shutdownGracefully();
}
}

@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
Expand Down

0 comments on commit d0891d0

Please sign in to comment.