Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelInitializer may be invoked multiple times when used with custo… #8620

Merged
merged 1 commit into from Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 25 additions & 7 deletions transport/src/main/java/io/netty/channel/ChannelInitializer.java
Expand Up @@ -18,11 +18,12 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.util.concurrent.ConcurrentMap;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* A special {@link ChannelInboundHandler} which offers an easy way to initialize a {@link Channel} once it was
Expand Down Expand Up @@ -53,9 +54,10 @@
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
// We use a ConcurrentMap as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
// We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
// ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();
private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

/**
* This method will be called once the {@link Channel} was registered. After the method returns this instance
Expand Down Expand Up @@ -108,9 +110,14 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
initMap.remove(ctx);
}

@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
Expand All @@ -125,14 +132,25 @@ private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
return false;
}

private void remove(ChannelHandlerContext ctx) {
private void remove(final ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
// The removal may happen in an async fashion if the EventExecutor we use does something funky.
if (ctx.isRemoved()) {
initMap.remove(ctx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this line? Wouldn't we only get here if handlerRemoved() was called? Is the concern that someone could override handlerRemoved()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes :/

} else {
// Ensure we always remove from the Map in all cases to not produce a memory leak.
ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will retain ctx until the channel is closed, even if handlerRemoved() is called. If this is a one-off initializer (not reused) then that could greatly increase the lifetime of some objects used during handshakes and the like.

Is this seriously the only way we can clean up properly? Is the problem that the methods can be overridden? If so, then maybe we should fix that in Netty 5? Or maybe we could avoid the check-for-double-register problem by having an alternative approach to 26aa348 in Netty 5?

It is possible to cancel the future to clean up, but the amount of effort necessary to do something simple makes me question if this is more of systemic problem that should be addressed in Netty 5.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ejona86 yeah I think its the best we can do for netty 4 to ensure we not leak if the user overrides handlerRemoved(...).Thats also why I check isRemoved() first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could check handlerRemoved() was overridden at instantiation time using reflection and take the fast path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trustin we could ... that said I think it does not really worth it as ctx.isRemoved() should be true if the EventExecutor is not some funky custom implemention that is is used with ChannelInitializer. So I would prefer to keep things simple for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

@Override
public void operationComplete(ChannelFuture future) {
initMap.remove(ctx);
}
});
}
}
}
}
126 changes: 126 additions & 0 deletions transport/src/test/java/io/netty/channel/ChannelInitializerTest.java
Expand Up @@ -21,12 +21,16 @@
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -35,6 +39,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;

public class ChannelInitializerTest {
Expand Down Expand Up @@ -249,6 +254,127 @@ private void testChannelRegisteredEventPropagation(ChannelInitializer<LocalChann
}
}

@SuppressWarnings("deprecation")
@Test(timeout = 10000)
public void testChannelInitializerEventExecutor() throws Throwable {
final AtomicInteger invokeCount = new AtomicInteger();
final AtomicInteger completeCount = new AtomicInteger();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
LocalAddress addr = new LocalAddress("test");

final EventExecutor executor = new DefaultEventLoop() {
private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor();

@Override
public void shutdown() {
execService.shutdown();
}

@Override
public boolean inEventLoop(Thread thread) {
// Always return false which will ensure we always call execute(...)
return false;
}

@Override
public boolean isShuttingDown() {
return false;
}

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new IllegalStateException();
}

@Override
public Future<?> terminationFuture() {
throw new IllegalStateException();
}

@Override
public boolean isShutdown() {
return execService.isShutdown();
}

@Override
public boolean isTerminated() {
return execService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return execService.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
execService.execute(command);
}
};

ServerBootstrap serverBootstrap = new ServerBootstrap()
.channel(LocalServerChannel.class)
.group(group)
.localAddress(addr)
.childHandler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) {
ch.pipeline().addLast(executor, new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
invokeCount.incrementAndGet();
ChannelHandlerContext ctx = ch.pipeline().context(this);
assertNotNull(ctx);
ch.pipeline().addAfter(ctx.executor(),
ctx.name(), null, new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// just drop on the floor.
}
});
completeCount.incrementAndGet();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
errorRef.set(cause);
}
});
}
});

Channel server = serverBootstrap.bind().sync().channel();

Bootstrap clientBootstrap = new Bootstrap()
.channel(LocalChannel.class)
.group(group)
.remoteAddress(addr)
.handler(new ChannelInboundHandlerAdapter());

Channel client = clientBootstrap.connect().sync().channel();
client.writeAndFlush("Hello World").sync();

client.close().sync();
server.close().sync();

client.closeFuture().sync();
server.closeFuture().sync();

// Give some time to execute everything that was submitted before.
Thread.sleep(1000);

executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));

assertEquals(invokeCount.get(), 1);
assertEquals(invokeCount.get(), completeCount.get());

Throwable cause = errorRef.get();
if (cause != null) {
throw cause;
}
}

private static void closeChannel(Channel c) {
if (c != null) {
c.close().syncUninterruptibly();
Expand Down