From 6f512058b8cebaf560ac36c2f38b1f9642bb87e9 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Wed, 22 Jun 2016 11:03:52 -0700 Subject: [PATCH 1/2] logging of netty messages --- .../tcp/client/ClientTcpDuplexConnection.java | 13 ++++++++++++- .../tcp/client/ReactiveSocketClientHandler.java | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java index 64db55df5..c41978fac 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java @@ -22,6 +22,9 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.internal.logging.InternalLogger; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; import io.reactivesocket.exceptions.TransportException; @@ -38,8 +41,11 @@ import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.concurrent.CopyOnWriteArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientTcpDuplexConnection implements DuplexConnection { + private static Logger framesLogger = LoggerFactory.getLogger("frames"); private final Channel channel; private final CopyOnWriteArrayList> subjects; @@ -51,7 +57,8 @@ private ClientTcpDuplexConnection(Channel channel, CopyOnWriteArrayList create(SocketAddress address, EventLoopGroup eventLoopGroup) { return s -> { CopyOnWriteArrayList> subjects = new CopyOnWriteArrayList<>(); - ReactiveSocketClientHandler clientHandler = new ReactiveSocketClientHandler(subjects); + ReactiveSocketClientHandler clientHandler = new ReactiveSocketClientHandler(subjects, + framesLogger); Bootstrap bootstrap = new Bootstrap(); ChannelFuture connect = bootstrap .group(eventLoopGroup) @@ -66,6 +73,7 @@ protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast( new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE >> 1, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0), + new LoggingHandler("frameBytes", LogLevel.DEBUG), clientHandler ); } @@ -107,6 +115,9 @@ public void onSubscribe(Subscription s) { @Override public void onNext(Frame frame) { try { + if (framesLogger.isDebugEnabled()) { + framesLogger.debug(channel.toString() + " WRITE: " + frame); + } ByteBuf byteBuf = Unpooled.wrappedBuffer(frame.getByteBuffer()); ChannelFuture channelFuture = channel.writeAndFlush(byteBuf); channelFuture.addListener(future -> { diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java index ac773d782..f2c1bcdde 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java @@ -19,27 +19,37 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.logging.LoggingHandler; import io.reactivesocket.Frame; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; import io.reactivesocket.rx.Observer; +import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; +import org.slf4j.Logger; @ChannelHandler.Sharable public class ReactiveSocketClientHandler extends ChannelInboundHandlerAdapter { private final CopyOnWriteArrayList> subjects; + private final Logger logger; - public ReactiveSocketClientHandler(CopyOnWriteArrayList> subjects) { + public ReactiveSocketClientHandler(CopyOnWriteArrayList> subjects, + Logger logger) { this.subjects = subjects; + this.logger = logger; } @Override - public void channelRead(ChannelHandlerContext ctx, Object content) { + public void channelRead(ChannelHandlerContext ctx, Object content) throws Exception { ByteBuf byteBuf = (ByteBuf) content; try { MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(byteBuf); final Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity()); + + if (logger.isDebugEnabled()) { + logger.debug(ctx.channel().toString() + " RECEIVED: " + from); + } subjects.forEach(o -> o.onNext(from)); } finally { byteBuf.release(); From c045ec1a786f9803895cdf6ea20c47ea5d5a8b37 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Wed, 22 Jun 2016 11:08:15 -0700 Subject: [PATCH 2/2] clean imports --- .../transport/tcp/client/ClientTcpDuplexConnection.java | 1 - .../transport/tcp/client/ReactiveSocketClientHandler.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java index c41978fac..af03abcf0 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ClientTcpDuplexConnection.java @@ -24,7 +24,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.internal.logging.InternalLogger; import io.reactivesocket.DuplexConnection; import io.reactivesocket.Frame; import io.reactivesocket.exceptions.TransportException; diff --git a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java index f2c1bcdde..b034977ee 100644 --- a/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java +++ b/reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/ReactiveSocketClientHandler.java @@ -19,12 +19,10 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.logging.LoggingHandler; import io.reactivesocket.Frame; import io.reactivesocket.transport.tcp.MutableDirectByteBuf; import io.reactivesocket.rx.Observer; -import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger;