Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write/Flush fails when encoder is NOT last handler and runs in UnorderedThreadPoolEventExecutor with size > 1 #13814

Open
andilem opened this issue Jan 30, 2024 · 3 comments

Comments

@andilem
Copy link

andilem commented Jan 30, 2024

When there is an encoder in a channel pipeline which is NOT the last handler AND runs in an UnorderedThreadPoolEventExecutor with size > 1, channel.writeAndFlush(...) fails to flush (from my simple investigation probably because the encoded bytes are not yet written - there is a FLUSH event, but no data). When flushing manually after a small delay, it works.

  • When setting the executor size to 1, it works.
  • When the encoder it is the last handler, it works

Minimal yet complete reproducer code

import java.nio.charset.StandardCharsets;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;

public class NettyBug {
    static {
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "trace");
    }
    
    private static class Codec extends MessageToByteEncoder<String> {
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            out.writeCharSequence(msg, StandardCharsets.UTF_8);
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // ===== discard + close server =====
        var serverGroup = new NioEventLoopGroup(1);
        var serverBootstrap = new ServerBootstrap().group(serverGroup);
        serverBootstrap.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, Integer.valueOf(128))
            .handler(new LoggingHandler("ServerAcceptor"));
        serverBootstrap.childHandler(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                // Discard the received data silently.
                System.err.println("OK! Received " + msg);
                ((ByteBuf) msg).release();
                System.err.println("Close");
                ctx.close();
            }
            
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                // Close the connection when an exception is raised.
                cause.printStackTrace();
                ctx.close();
            }
        });
        serverBootstrap.bind(50005).sync();
        
        // ===== CLIENT =====
        var clientIOGroup = new NioEventLoopGroup(1);
        var clientWorkerGroup = new UnorderedThreadPoolEventExecutor(2); // works with size = 1 !!!
        var bootstrap = new Bootstrap().group(clientIOGroup).channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                var pipeline = ch.pipeline();
                pipeline.addLast("frameLogger", new LoggingHandler("first", LogLevel.DEBUG)); // no impact, just for debugging
                pipeline.addLast(clientWorkerGroup, new Codec());
                pipeline.addLast("finalLogger", new LoggingHandler("second", LogLevel.DEBUG)); // works WITHOUT this handler !!!
            }
        });
        bootstrap.connect("localhost", 50005).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                f.channel().writeAndFlush("hello").addListener(ChannelFutureListener.CLOSE);
            }
        });
    }
}

Netty version

4.1.106

JVM version (e.g. java -version)

17

OS version (e.g. uname -a)

Windows

@normanmaurer
Copy link
Member

let me check

@normanmaurer
Copy link
Member

So this is kind of expected in the sense that each handler methods will be called in an "unordered" fashion. As writeAndFlush is just a shortcut for calling write(...) and flush separately you can end up with such a situation

@andilem
Copy link
Author

andilem commented Feb 12, 2024

That's what I thought. Big problem, because it prevents running the de/encoder in a separate thread pool if there is anything in the pipeline afterwards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants