Incorrect result when a decoding exception occured #247

Open
zhanggl opened this Issue Feb 28, 2014 · 3 comments

Projects

None yet

3 participants

@zhanggl
Contributor
zhanggl commented Feb 28, 2014

When decoding unexpected (illegal data from the server or legal data that we just can't recognize yet) response data from a Redis server, A exception will probably be thrown from Reply.scala.

When this happens, I believe the result of this service(cmd) call should be a Throw(ex) instead of a Return(Reply). But this is not what we actually got.

Here is an example:
Before this empty string bug is fixed, given a MBULK input like this:

*4
$3
bar
$0

$3
foo
$3
moo

An IndexOutOfBoundsException will be thrown by val header = line(0) when decoding line 5 (empty string). But we end up with a BULK_REPLY("foo") result instead of a failed Throw.

Here is what I found, please correct me if I'm wrong:

By running this test:

      "get multiple values including one empty string" in {
        Await.result(client.hSet(roo, bar, StringToChannelBuffer("")))
        Await.result(client.hSet(roo, foo, moo))
        CBToString.fromTuples(
          Await.result(client.hGetAll(roo))) mustEqual Seq(("bar", ""), ("foo", "moo"))
      }

I got the following stack trace.

[error]     x get multiple values including one empty string
[error]       null (Client.scala:103)
[error]       com.twitter.finagle.redis.BaseClient$$anonfun$doRequest$1.apply(Client.scala:103)
[error]       com.twitter.finagle.redis.BaseClient$$anonfun$doRequest$1.apply(Client.scala:101)
[error]       scala.PartialFunction$$anon$1.apply(PartialFunction.scala:76)
[error]       com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:777)
[error]       com.twitter.util.Future$$anonfun$flatMap$1.apply(Future.scala:776)
[error]       com.twitter.util.Promise$Transformer.liftedTree1$1(Promise.scala:93)
[error]       com.twitter.util.Promise$Transformer.k(Promise.scala:93)
[error]       com.twitter.util.Promise$Transformer.apply(Promise.scala:102)
[error]       com.twitter.util.Promise$Transformer.apply(Promise.scala:84)
[error]       com.twitter.util.Promise$$anon$2.run(Promise.scala:321)
[error]       com.twitter.concurrent.LocalScheduler$Activation.run(Scheduler.scala:124)
[error]       com.twitter.concurrent.LocalScheduler$Activation.submit(Scheduler.scala:102)
[error]       com.twitter.concurrent.LocalScheduler.submit(Scheduler.scala:143)
[error]       com.twitter.concurrent.Scheduler$.submit(Scheduler.scala:55)
[error]       com.twitter.util.Promise.runq(Promise.scala:307)
[error]       com.twitter.util.Promise.updateIfEmpty(Promise.scala:599)
[error]       com.twitter.util.Promise.update(Promise.scala:577)
[error]       com.twitter.util.Promise.setValue(Promise.scala:553)
[error]       com.twitter.concurrent.AsyncQueue.offer(AsyncQueue.scala:71)
[error]       com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:41)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
[error]       org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
[error]       org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
[error]       org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:482)
[error]       org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:365)
[error]       org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:102)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.channelDisconnected(SimpleChannelHandler.java:199)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:120)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.channelDisconnected(SimpleChannelHandler.java:199)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:120)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
[error]       org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:396)
[error]       org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:360)
[error]       org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:58)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
[error]       org.jboss.netty.channel.SimpleChannelHandler.closeRequested(SimpleChannelHandler.java:334)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:260)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
[error]       org.jboss.netty.channel.SimpleChannelHandler.closeRequested(SimpleChannelHandler.java:334)
[error]       com.twitter.finagle.channel.ChannelStatsHandler.closeRequested(ChannelStatsHandler.scala:90)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:260)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
[error]       com.twitter.finagle.redis.naggati.Codec.handleDownstream(Codec.scala:139)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
[error]       org.jboss.netty.channel.Channels.close(Channels.java:812)
[error]       com.twitter.finagle.transport.ChannelTransport.close(ChannelTransport.scala:112)
[error]       com.twitter.util.Closable$class.close(Closable.scala:17)
[error]       com.twitter.finagle.transport.ChannelTransport.close(ChannelTransport.scala:11)
[error]       com.twitter.finagle.transport.ChannelTransport.com$twitter$finagle$transport$ChannelTransport$$fail(ChannelTransport.scala:33)
[error]       com.twitter.finagle.transport.ChannelTransport.handleUpstream(ChannelTransport.scala:70)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.handler.codec.frame.FrameDecoder.exceptionCaught(FrameDecoder.java:377)
[error]       org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:112)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.exceptionCaught(SimpleChannelHandler.java:156)
[error]       com.twitter.finagle.channel.ChannelStatsHandler.exceptionCaught(ChannelStatsHandler.scala:101)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:130)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.exceptionCaught(SimpleChannelHandler.java:156)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:130)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
[error]       org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)
[error]       org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.notifyHandlerException(DefaultChannelPipeline.java:658)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:566)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)
[error]       com.twitter.finagle.channel.ChannelStatsHandler.messageReceived(ChannelStatsHandler.scala:81)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[error]       org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)
[error]       com.twitter.finagle.channel.ChannelRequestStatsHandler.messageReceived(ChannelRequestStatsHandler.scala:35)
[error]       org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[error]       org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
[error]       org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
[error]       org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
[error]       org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
[error]       org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
[error]       org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
[error]       org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
[error]       org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
[error]       org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
[error]       org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
[error]       java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[error]       java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[error]       java.lang.Thread.run(Thread.java:744)

It looks like that what happened here were:

  1. An IndexOutOfBoundsException raised
  2. Caught by DefaultChannelPipeline.java, then wrapped as an ChannelPipelineException and thrown into upstream
  3. com.twitter.finagle.transport.ChannelTransport took control and decided to close this channel and then fail this invocation by sending downstream a close request.
  4. After the channel was closed by AbstractNioWorker, a ChannelDisconnected event sent upstream
  5. When org.jboss.netty.handler.codec.frame.FrameDecoder received this message, it did its cleanup work by decoding remaining dirty data in ChannelBuffer which happens to be foo, which was what we finally got.

I believe the cleanup behavior of FrameDecoder should be overridden/modified to let the remaining dirty data be discarded if there was something bad(Exception) happened before.

Contributor
mosesn commented Feb 28, 2014

Good sleuthing! Let me make sure we're on the same page.

Ok, from a high level view:

  1. If a redis client receives bad data, it closes the connection
  2. After it closes the connection, it tries to interpret the rest of the bad data, but it should treat it as unrecoverable.
  3. Redis can be pipelined, so this means that closing the connection when multiple requests are in flight should fail those requests too.

I'm not certain that overriding the cleanup behavior is the easiest way to fix this. I'll pull in @trustin and get some expert advice.

Contributor
trustin commented Mar 20, 2014

The problem is that the Redis decoder implementation's decode() method does not discard the unrecoverable data from the buffer when it fails, leaving the window open for this bug. I would modify the decode() method like the following:

private boolean badStream;

void decode(...) {
    if (badStream) {
        buf.skipBytes(buf.readableBytes());
        return;
    }
    boolean success = false;
    try {
        ... decode ...
        success = true;
    } finally {
        if (!success) {
            badStream = true;
        }
    }
}
Contributor
mosesn commented Feb 4, 2015

@zhanggl sorry this fell through the cracks! Is this still a problem for you? Do you want to tackle it?

@mosesn mosesn added the Starter label May 29, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment