Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty - corrupted messages with netty multi-client #5371

Closed
bojke2000 opened this issue Jun 8, 2016 · 3 comments
Closed

netty - corrupted messages with netty multi-client #5371

bojke2000 opened this issue Jun 8, 2016 · 3 comments

Comments

@bojke2000
Copy link

bojke2000 commented Jun 8, 2016

The pretty typical netty client in the multithreaded environment gets corrupted messages randomly. Let's say 20-30 corrupted messages on 30-40k transferred messages.

The communication protocol is simple - packet contains a message of following format:
| length | string |

We create ten netty connections (FeedCommunicatorImpl class given below) to the same server. We instantiate ten Fetcher classes each of them containing and creating netty - FeedCommunicatorImpl connection to the same server. Each Fetcher sends a command to the server. The command says "I want to receive all messages with given ID" and that Fetcher will receive only messages with given ID from the server.

All works well except that we randomly get corrupted messages. The example of corrupted message given below.
I can confirm that the data server sends are OK since it is payed service and it offer control application which shows each specific message sent to us - we can confirm that server sends correct messages.

The netty-FeedCommunicatorImpl puts message in a queue, and at the other side of queue the MessageHandler converts received String messages to JSON.

Strange thing is that a corrupted message does not affect next messages in any way - they come correct.
In a length based protocol in an async environment I would expect that the first corrupted message in a length based protocol will totally devastate data exchange but it does not happen. The next message is usually correct.

Here is the code - below it are example of the corrupted and correct message. The correct message is JSON string and it should start with "{" character.
`

public class FeedCommunicatorImpl implements FeedCommunicator, SessionExceptionListener {
    private NioEventLoopGroup workerGroup;
    private Channel channel;
    BlockingQueue<String> queue;
    boolean daemonMode = true;
    String command;
    String ip;
    int port;
    static Logger logger;
    private Date timestamp;
    private String name = "";
public void connect() {
    workerGroup = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();

        b.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                pipeline.addLast("decoder", new StringDecoder());

                pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                pipeline.addLast("encoder", new StringEncoder());

                // and then business logic.

                pipeline.addLast("handler", new SimpleChannelInboundHandler<String>() {

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        try {
                            throw new Exception();
                        }catch(Exception e){
                                logger.info("Communicator " + name + " connected local:(" + ctx.channel().localAddress() + ") -> remote:(" + ctx.channel().remoteAddress() + ") ", e);  
                        } 
                    }

                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                        try {
                            if (!msg.startsWith("{\"dt\":")) {
                                queue.offer(msg);
                                if (logger.isTraceEnabled())
                                    logger.trace("message "+msg+" added in queue");
                            } else {
                                if (logger.isTraceEnabled())
                                    logger.trace("message dt received added in queue");
                            }
                            timestamp = new Date();

                        } catch (Exception e) {
                            logger.error(e.getMessage());
                        }
                    }


                    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                        if (ctx.channel().localAddress() != null)
                            logger.info("Communicator " + name + " disconnected local:(" + ctx.channel().localAddress() + ") -> remote:(" + ctx.channel().remoteAddress() + ") ");                          
                            disconnect();
                                super.channelUnregistered(ctx);
                    }
                });
            }
        }).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);

        ChannelFuture ret = b.connect(new InetSocketAddress(ip, port));
        this.channel = ret.sync().channel();            
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}


public boolean sendCommand(String command) {
    boolean isReconnected = false;

    if (isSessionExceptionOccured || isNoMessageInGivenInterval()) {

        if (isSessionExceptionOccured) {
            isSessionExceptionOccured = false;
            logger.warn("Feed Session Exception message - RECONNECTING");
        } else {
            logger.warn("FeedCommunicatorImpl did not get message in 10 seconds - RECONNECTING");
        }

        try {
            disconnect();
            connect();
            logger.info("FeedCommunicatorImpl - RECONNECTED");
        } catch (Exception e) {
            logger.error(e.getMessage());
        }

        isReconnected = true;
    }

    this.command=command;
    channel.writeAndFlush(command);
    return isReconnected;
}

public synchronized boolean isNoMessageInGivenInterval() {
    return this.timestamp != null && 
            (new Date().getTime() - this.timestamp.getTime() > timeoutTime);
}


public void disconnect() {
    try {
        if (this.channel != null) {
            this.channel.close();
        }
    } catch (Exception e) {
        logger.error(e.getMessage());
    } finally {
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
        logger.info("--- CLIENT - Stopped.");
    }
}


public boolean isConnected() {
    if (this.channel == null)
        return false;
    return this.channel.isActive();
}


public void setDaemonMode(boolean dm) {
    this.daemonMode = dm;
}


public void notifySessionException() {
    this.isSessionExceptionOccured = true;

}

`

Corrupted message
"n":"id: 29539990; name: Odd III vs Kongsberg; ss: 1"}}_______________MESSAGE ERROR DETAILS:
2016-06-07 20:56:17 ERROR MessageHandler:144 -[THREAD ID= MessageHandler-7]- CORRUPT MESSAGE: >{"C":"SS","bmId":43,"tok_______________MESSAGE ERROR DETAILS:
2016-06-07 20:58:26 ERROR MessageHandler:144 -[THREAD ID= MessageHandler-5]- CORRUPT MESSAGE: ?{"C":"SS","bmId":43,"tok_______________MESSAGE ERROR DETAILS:
2016-06-07 23:01:11 ERROR MessageHandler:144 -[THREAD ID= MessageHandler-7]- CORRUPT MESSAGE: �{"C":"SS","bmId":43,"tok_______________MESSAGE ERROR DETAILS:
2016-06-08 03:03:51 ER

Correct message
2016-06-08 09:35:34 INFO MessageHandler:82 -[THREAD ID= MessageHandler-1]- Response is : {"C":"SS","bmId":43,"tok":"987987987516","inst"
:"exefeed","eId":"29554494","dt":"06-08-2016 07:30:48","cmd":"U","flg":23,"obj":[{"num":283,"sts":"Aces=0:0|Double Faults=4:2|Win % 1st Se
rve=56:57|Break Point Conversions=50:60|INFO=30:0|","st":"11114","stn":"Valeriya Zeleva PointScore","sc":{"POINTS":[30,0]},"bt":[{"n":"HA"
,"o":[{"n":"1","v":1.166},{"n":"2","v":4.5}]},{"n":"HA_S2","o":[{"n":"1","v":1.666},{"n":"2","v":2.1}]},{"n":"RG_S2","h":3,"o":[{"n":"1","v":1.8},{"n":"2","v":1.909}]},{"n":"RG_S2","h":4,"o":[{"n":"1","v":1.727},{"n":"2","v":2}]},{"n":"RG_S2","h":5,"o":[{"n":"1","v":1.666},{"n":"2","v":2.1}]}],"u":1465371049,"tm":0,"tss":"00:00:00"}]}

@bojke2000 bojke2000 changed the title netty - corrupted messages in multithreaded netty client netty - corrupted messages with netty multi-client Jun 8, 2016
@Mr00Anderson
Copy link
Contributor

Mr00Anderson commented Aug 3, 2016

Just some comments and possible issue. It makes sense that your next messages are reading fine with a length based encoder/decoder; it will not matter if some gets corrupted, now if you used a Variant Length framer you would probably have complete corruption after the first corruption.

So with this the corruption and the fact the following messages work would lead me to think that the corruption is occurring before or after the prepender with the string encoding/decoding.

  1. Is the server and client on a separate machine? Looking at the StringEncoder() and StringDecoder() they are system dependent if you do not provide a character set.
StringDecoder()
Creates a new instance with the current system character set.
StringDecoder(Charset charset)
Creates a new instance with the specified character set.
  1. If you are sending messages why not use LineBasedFrameDecoder

@johnou
Copy link
Contributor

johnou commented Aug 12, 2016

The channel read looks pretty broken, if you are dealing with json only payloads consider JsonObjectDecoder.

@normanmaurer
Copy link
Member

Let me close this ... If you still think its an issue please reopen

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants