Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improperly decoding command responses #1512

Closed
johnny-costanzo opened this issue Nov 9, 2020 · 8 comments
Closed

Improperly decoding command responses #1512

johnny-costanzo opened this issue Nov 9, 2020 · 8 comments
Assignees
Labels
type: blocker An issue that is blocking us from releasing type: regression A regression from a previous release
Milestone

Comments

@johnny-costanzo
Copy link

Bug Report

Current Behavior

I'm upgrading our version of lettuce from 5.2.0 to 6.0.0 and immediately have encountered an issue which makes the new version unusable for us. Our application is only using HMGET (for 10 keys) and LRANGE (for lists of exactly 100 values) commands. With the new version of the library I am getting exceptions from lettuce indicating that it is either trying to parse more data than was actually requested for the particular command, or apparently waiting for more data to be received that never comes (due to however it is improperly parsing the response data and getting out of alignment, so the command inevitably times out and is canceled). Although all my list data only has 100 values each I can occasionally see through debug/trace logging that lettuce has parsed a list of 150 items (or more, or less than 100). I've also seen it attempting to parse more than 10 keys for my HMGET and then throwing this exception:

java.util.NoSuchElementException
        at java.base/java.util.Arrays$ArrayItr.next(Arrays.java:4431)
        at io.lettuce.core.output.KeyValueListOutput.set(KeyValueListOutput.java:80)
        at io.lettuce.core.protocol.RedisStateMachine.safeSet(RedisStateMachine.java:810)
        at io.lettuce.core.protocol.RedisStateMachine.handleBytes(RedisStateMachine.java:572)
        at io.lettuce.core.protocol.RedisStateMachine$State$Type.handle(RedisStateMachine.java:206)
        at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:334)
        at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:295)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:799)
        at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:750)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:733)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:618)
        at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:560)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

Debugging it makes it look like it has actually read all 10 keys for the HMGET but for whatever reason attempts to read an 11th even though the keys iterator has nothing left to match it to. I have trace logging enabled and can see the responses from lettuce and that they do contain the proper number of elements, so whatever piece of code that then deals with processing the response seems to be getting garbled. In other cases I'll see an LRANGE complete with 10 values, making it look like it consumed an HMGET response and skipped over the actual LRANGE response, or some combination of things.

I'm somewhat at a loss of where to go next with debugging this issue since everything works fine on 5.2.0. The application is big and complicated so I don't exactly have a sample app to reproduce it. I'm not seeing exceptions come up from any other parts of the code (so I don't see any exceptions+ coming from the codec).

Is it possible that when lettuce receives data within CommandHandler.channelRead that it is not saving it to its internal buffer correctly (and so throwing away some of the earlier data it received that did not represent the output of a full command)? I do see lines like:

# writing first LRANGE
DEBUG [DefaultEndpoint]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() writeAndFlush command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <PrewarmerImpl.run-12-0> (channelWriteAndFlush.419)
DEBUG [DefaultEndpoint]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() done <PrewarmerImpl.run-12-0> (write.196)
DEBUG [CommandHandler]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] write(ctx, AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command], promise) <lettuce-epollEventLoop-4-1> (write.365)
DEBUG [CommandEncoder]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379] writing command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <lettuce-epollEventLoop-4-1> (encode.101)

# receive response in two channelRead invocations
DEBUG [CommandHandler]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Received: 512 bytes, 1 commands in the stack <lettuce-epollEventLoop-4-1> (channelRead.539)
DEBUG [CommandHandler]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Stack contains: 1 commands <lettuce-epollEventLoop-4-1> (decode.612)
DEBUG [CommandHandler]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Received: 494 bytes, 1 commands in the stack <lettuce-epollEventLoop-4-1> (channelRead.539)

# second LRANGE writing
DEBUG [DefaultEndpoint]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() writeAndFlush command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <PrewarmerImpl.run-12-0> (channelWriteAndFlush.419)
DEBUG [DefaultEndpoint]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() done <PrewarmerImpl.run-12-0> (write.196)
DEBUG [CommandHandler]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] write(ctx, AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command], promise) <lettuce-epollEventLoop-4-1> (write.365)
DEBUG [CommandEncoder]  <11:41:42> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379] writing command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <lettuce-epollEventLoop-4-1> (encode.101)

# receive response in a single channelRead
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Received: 1006 bytes, 2 commands in the stack <lettuce-epollEventLoop-4-1> (channelRead.539)
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Stack contains: 2 commands <lettuce-epollEventLoop-4-1> (decode.612)

# complete the first LRANGE with 150 values (should be 100)
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Completing command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[[B@56dfd39a, [B@5fe0ecd1, [B@4e672bc6, [B@526b67b2, [B@7a802d66, [B@225ed083, [B@5510e615, [B@5c4f5df2, [B@275fb000, [B@73d1a6a0, [B@21a3c681, [B@7a5af8a, [B@5da0d0ce, [B@23ba997c, [B@1c12d444, [B@3661e1b3, [B@6d1c6111, [B@1a1a64b7, [B@10a02344, [B@29f5349a, [B@276fcc5c, [B@6e993bda, [B@7f328f4e, [B@7fe335d6, [B@26370b5a, [B@1d7a3ad, [B@6a740426, [B@cdfc13b, [B@122c6621, [B@2eda6502, [B@644501cb, [B@16e1d261, [B@678db3f2, [B@1a822874, [B@560539b6, [B@4eef8bcd, [B@4477b3c1, [B@4fbde2b3, [B@308aba7b, [B@209ee44, [B@7b99bc7f, [B@5b1161c5, [B@3e2d51be, [B@731be3f1, [B@45d7d3a9, [B@15189d9f, [B@41ec9e23, [B@6d44f984, [B@1675351e, [B@4750b62a, [B@62682565, [B@282a7875, [B@17c875ab, [B@55eb609e, [B@768b244b, [B@d99133f, [B@59284a0a, [B@57c93e21, [B@784fe1ef, [B@3d0a0532, [B@26696083, [B@384f6a59, [B@3f08b426, [B@52460560, [B@3f461ad3, [B@7c18a2c0, [B@6cb80e1, [B@64c4e85d, [B@6b5bf694, [B@52ec579d, [B@87a09a9, [B@4bbf9efb, [B@7fe43e4e, [B@4f04de90, [B@176bb198, [B@2014adc8, [B@7e5ce1ca, [B@174550cf, [B@5d68c4ce, [B@58174846, [B@415cb03f, [B@7ca243f3, [B@6777a61d, [B@54087cd, [B@21b91212, [B@4fea700d, [B@157760a3, [B@1408f32f, [B@7dc85a3e, [B@613e3792, [B@4fe2316e, [B@1c203236, [B@23869226, [B@3394b363, [B@6d238d3e, [B@700fb32e, [B@670f18ab, [B@7116a082, [B@2d89e3, [B@d36cc0b, [B@12634746, [B@5635f738, [B@3abb1e8c, [B@156db44, [B@987e55f, [B@70725f60, [B@12df609c, [B@25640578, [B@13ffa196, [B@719465d6, [B@182630f8, [B@1decb941, [B@7371487b, [B@21193916, [B@8916c3d, [B@704ffadf, [B@904463d, [B@613d0f82, [B@624e771c, [B@6d58f088, [B@46c666b0, [B@4f0e20f5, [B@5e69489f, [B@741f3f6a, [B@11924bf, [B@3f2eb5e1, [B@2b973f2b, [B@26feb15c, [B@6e8a45e0, [B@24c1646d, [B@363d69b9, [B@651a5a73, [B@5903a675, [B@76ce4334, [B@2be39cf5, [B@1c1d9238, [B@5869afc0, [B@5cba901a, [B@6b89ab4c, [B@1bf57857, [B@ef7d346, [B@645b8c2, [B@20b0a08d, [B@10b7530c, [B@20eadfbf, [B@b42f0c8, [B@2e538de5, [B@4d68410, [B@72b7b412, [B@6e6fce3c], error='null'], commandType=io.lettuce.core.protocol.Command] <lettuce-epollEventLoop-4-1> (decode.637)

# an HMGET is issued for 10 keys
DEBUG [DefaultEndpoint]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() writeAndFlush command AsyncCommand [type=HMGET, output=KeyValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <PrewarmerImpl.run-12-0> (channelWriteAndFlush.419)
DEBUG [DefaultEndpoint]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, epid=0x51] write() done <PrewarmerImpl.run-12-0> (write.196)
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] write(ctx, AsyncCommand [type=HMGET, output=KeyValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command], promise) <lettuce-epollEventLoop-4-1> (write.365)
DEBUG [CommandEncoder]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379] writing command AsyncCommand [type=HMGET, output=KeyValueListOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command] <lettuce-epollEventLoop-4-1> (encode.101)

# the HMGET response is received
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Received: 100 bytes, 2 commands in the stack <lettuce-epollEventLoop-4-1> (channelRead.539)
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Stack contains: 2 commands <lettuce-epollEventLoop-4-1> (decode.612)

# the LRANGE is completed with the HMGET response
DEBUG [CommandHandler]  <11:41:43> [channel=0xa002ce56, /10.96.145.66:33250 -> 10.43.1.232/10.43.1.232:6379, chid=0x51] Completing command AsyncCommand [type=LRANGE, output=ValueListOutput [output=[null, [B@5c615ab1, [B@6c27a329, [B@23ff8d9b, [B@33f1b9f6, [B@6513d9eb, [B@34fc008b, [B@6bb0d0da, [B@5530ea53, [B@2e17ad9f], error='null'], commandType=io.lettuce.core.protocol.Command] <lettuce-epollEventLoop-4-1> (decode.637)

In the above the stack began with 2 LRANGE requests. Trace logging (not included) shows that both responses were received of 100 items each. The first response was split across two channel reads (one of 512 bytes and the other of 494), while the second response came wholly in a 1006 byte response. At that point since it had a full response it went and processed the first command on the stack, the first LRANGE, but associated 150 values with its output (even though there were 100). In between an HMGET request was made and the response was received which was then used to complete the second LRANGE with 10 values.

I've noticed that in the first LRANGE response of 512 bytes, the last line in that response (which is one of the values in the list) coincidentally begins with '>', the same marker to indicate a PUSH type message in the state machine. Maybe that would cause it to treat this as isPushDecode(buffer) == true and therefore not log "{} Stack contains: {} commands" for the second channelRead but still process those bytes into a pushOutput command (that would then be thrown away instead of used on the actual LRANGE command).

Environment

  • Lettuce version(s): 6.0.0.RELEASE
  • Redis version: 5.0.4
@johnny-costanzo johnny-costanzo added the type: bug A general bug label Nov 9, 2020
@johnny-costanzo
Copy link
Author

for good measure I added a listener to the connection that would log when it got a push message (which I now found out was a thing). Confirming what I thought above I'm seeing a number of "push" messages appear in my logs with garbage data. I wouldn't expect to see any since I'm still using Redis 5

@mp911de
Copy link
Collaborator

mp911de commented Nov 10, 2020

Thanks for the report. Can you provide a minimal reproducer that is able to reproduce the issue? Since our tests are green, we probably miss a specific arrangement that leads to the failure.

@johnny-costanzo
Copy link
Author

johnny-costanzo commented Nov 10, 2020

Try adding this test to CommandHandlerUnitTests.java. It passes on 5.3.x branch but fails on main (the assert after the second channelRead fails since it threw away that read and did not complete the LRANGE as expected and as done on previous versions).

So actually the more specific issue is that it occurs when the last line of the buffer is incomplete but also starts with '>'. It leads to the line not being consumed and pushing the read index ahead, resulting in the next channelRead to think it got a PUSH message. In the below example the LRANGE should return 4 values, but i split the redis response across two channel reads where the final line of the first channelRead is not yet complete (it is expecting a value of 4 bytes but only 3 are found at that point, the remaining byte comes in at the start of the next channelRead)

    @Test
    void shouldHandleIncompleteResponses() throws Exception {

        Command<String, String, List<String>> lrangeCommand = new Command<>(
            CommandType.LRANGE, new ValueListOutput<>(StringCodec.UTF8),
            new CommandArgs<>(StringCodec.UTF8).addKey("lrangeKey").add(0).add(1));

        Command<String, String, List<KeyValue<String, String>>> hmgetCommand = new Command<>(
            CommandType.HMGET, new KeyValueListOutput<>(StringCodec.UTF8, Arrays.asList("KEY1", "KEY2", "KEY3")),
            new CommandArgs<>(StringCodec.UTF8).addKeys("hmgetKey", "KEY1", "KEY2", "KEY3"));

        ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
        channelPromise.setSuccess();

        sut.channelRegistered(context);
        sut.channelActive(context);

        List<Command<String, String, ?>> commands = Arrays.asList(lrangeCommand, hmgetCommand);
        sut.write(context, commands, channelPromise);
        assertThat(stack.size()).isEqualTo(2);

        // the LRANGE response comes back across two channelReads
        sut.channelRead(context, Unpooled.wrappedBuffer("*4\r\n$3\r\nONE\r\n$4\r\n>TW".getBytes()));
        assertThat(stack.size()).isEqualTo(2);

        sut.channelRead(context, Unpooled.wrappedBuffer("O\r\n$5\r\nTHREE\r\n$4\r\nFOUR\r\n".getBytes()));
        assertThat(stack.size()).isEqualTo(1);
        assertThat(lrangeCommand.get()).isNotNull();
        assertThat(lrangeCommand.get().size()).isEqualTo(4);

        // the HMGET response comes back in another read
        sut.channelRead(context, Unpooled.wrappedBuffer("*3\r\n$4\r\nVAL1\r\n$4\r\nVAL2\r\n$4\r\nVAL3\r\n".getBytes()));

        assertThat(stack.isEmpty()).isTrue();
        assertThat(hmgetCommand.get()).isNotNull();
        assertThat(hmgetCommand.get().size()).isEqualTo(3);
    }

@johnny-costanzo
Copy link
Author

I'm also wondering if this issue was the cause of #1443 since it manifests itself as things getting out of sync

@mp911de
Copy link
Collaborator

mp911de commented Nov 14, 2020

Thanks a lot. Let me investigate why this is.

@mp911de mp911de added type: regression A regression from a previous release and removed type: bug A general bug labels Nov 14, 2020
@mp911de mp911de added this to the 6.0.2 milestone Nov 14, 2020
@mp911de mp911de added the type: blocker An issue that is blocking us from releasing label Nov 14, 2020
@mp911de
Copy link
Collaborator

mp911de commented Nov 14, 2020

With the introduction of push message handling, we introduced this regression. The decode loop doesn't consider the partial decode progress and falls into the push decode code path.

@mp911de mp911de self-assigned this Nov 14, 2020
mp911de added a commit that referenced this issue Nov 14, 2020
We now verify that no partial decode is in progress when entering the code path to decode push messages.

Previously, a partial read could leave the decode buffer at a read position that started with > which is the byte marker for push decode. Without considering the partial decode progress, subsequent decodes used push decode which left commands in the stack without properly decoding the actual command.
mp911de added a commit that referenced this issue Nov 14, 2020
We now verify that no partial decode is in progress when entering the code path to decode push messages.

Previously, a partial read could leave the decode buffer at a read position that started with > which is the byte marker for push decode. Without considering the partial decode progress, subsequent decodes used push decode which left commands in the stack without properly decoding the actual command.
@mp911de
Copy link
Collaborator

mp911de commented Nov 14, 2020

That's fixed now. Care to retest against the latest snapshot?

@mp911de mp911de closed this as completed Nov 14, 2020
@johnny-costanzo
Copy link
Author

looks fixed in the snapshot build. thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: blocker An issue that is blocking us from releasing type: regression A regression from a previous release
Projects
None yet
Development

No branches or pull requests

2 participants