Skip to content

Commit

Permalink
[#5639] Ensure fireChannelActive() is also called if Channel is close…
Browse files Browse the repository at this point in the history
…d in connect promise.

Motivation:

We need to ensure we also call fireChannelActive() if the Channel is directly closed in a ChannelFutureListener that is belongs to the promise for the connect. Otherwise we will see missing active events.

Modifications:

Ensure we always call fireChannelActive() if the Channel was active.

Result:

No missing events.
  • Loading branch information
normanmaurer committed Aug 24, 2016
1 parent 2c1f17f commit 5e148d5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
Expand Up @@ -18,13 +18,16 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -67,6 +70,46 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
}

@Test(timeout = 3000)
public void testChannelEventsFiredWhenClosedDirectly() throws Throwable {
run();
}

public void testChannelEventsFiredWhenClosedDirectly(ServerBootstrap sb, Bootstrap cb) throws Throwable {
final BlockingQueue<Integer> events = new LinkedBlockingQueue<Integer>();

Channel sc = null;
Channel cc = null;
try {
sb.childHandler(new ChannelInboundHandlerAdapter());
sc = sb.bind(0).syncUninterruptibly().channel();

cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
events.add(0);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
events.add(1);
}
});
// Connect and directly close again.
cc = cb.connect(sc.localAddress()).addListener(ChannelFutureListener.CLOSE).
syncUninterruptibly().channel();
assertEquals(0, events.take().intValue());
assertEquals(1, events.take().intValue());
} finally {
if (cc != null) {
cc.close();
}
if (sc != null) {
sc.close();
}
}
}

private static void assertLocalAddress(InetSocketAddress address) {
assertTrue(address.getPort() > 0);
assertFalse(address.getAddress().isAnyLocalAddress());
Expand Down
Expand Up @@ -834,12 +834,16 @@ private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
}
active = true;

// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();

// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();

// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}

Expand Down
Expand Up @@ -296,12 +296,16 @@ private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
return;
}

// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();

// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();

// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}

Expand Down
Expand Up @@ -68,8 +68,13 @@ public void connect(
try {
boolean wasActive = isActive();
doConnect(remoteAddress, localAddress);

// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
boolean active = isActive();

safeSetSuccess(promise);
if (!wasActive && isActive()) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}
} catch (Throwable t) {
Expand Down

0 comments on commit 5e148d5

Please sign in to comment.