Skip to content

Commit

Permalink
Fixed - Redis response isn't fully consumed after decoding error. #2098
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed May 20, 2019
1 parent 09c8374 commit 18a6cea
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 15 deletions.
Expand Up @@ -103,16 +103,29 @@ protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>

if (data == null) {
while (in.writerIndex() > in.readerIndex()) {
skipCommand(in);
int endIndex = skipCommand(in);

decode(ctx, in, data);
try {
decode(ctx, in, data);
} catch (Exception e) {
in.readerIndex(endIndex);
throw e;
}
}
} else {
int endIndex = 0;
if (!(data instanceof CommandsData)) {
skipCommand(in);
endIndex = skipCommand(in);
}

decode(ctx, in, data);
try {
decode(ctx, in, data);
} catch (Exception e) {
if (!(data instanceof CommandsData)) {
in.readerIndex(endIndex);
}
throw e;
}
}
}

Expand Down Expand Up @@ -151,10 +164,12 @@ protected void sendNext(Channel channel, QueueCommand data) {
}
}

protected void skipCommand(ByteBuf in) throws Exception {
protected int skipCommand(ByteBuf in) throws Exception {
in.markReaderIndex();
skipDecode(in);
int res = in.readerIndex();
in.resetReaderIndex();
return res;
}

protected void skipDecode(ByteBuf in) throws IOException{
Expand Down Expand Up @@ -240,11 +255,12 @@ private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data,
Throwable error = null;
while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> commandData = null;

checkpoint();
state.get().setBatchIndex(i);

int endIndex = skipCommand(in);
try {
checkpoint();
state.get().setBatchIndex(i);

skipCommand(in);

RedisCommand<?> cmd = commandBatch.getCommands().get(i).getCommand();
boolean skipConvertor = commandBatch.isQueued();
Expand Down Expand Up @@ -286,10 +302,10 @@ private void decodeCommandBatch(Channel channel, ByteBuf in, QueueCommand data,
}
}
} catch (Exception e) {
in.readerIndex(endIndex);
if (commandData != null) {
commandData.tryFailure(e);
}
throw e;
}
i++;
if (commandData != null && !commandData.isSuccess()) {
Expand Down
Expand Up @@ -38,10 +38,6 @@ public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, bo
this(promise, commands, null, false, false, queued);
}

public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, List<CommandData<?, ?>> attachedCommands) {
this(promise, commands, attachedCommands, false, false, true);
}

public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic, boolean queued) {
this(promise, commands, null, skipResult, atomic, queued);
}
Expand Down
Expand Up @@ -304,7 +304,7 @@ protected <R, V> void sendCommand(AsyncDetails<V, R> details, RedisConnection co
}

RPromise<Void> main = new RedissonPromise<Void>();
ChannelFuture future = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands())));
ChannelFuture future = connection.send(new CommandsData(main, list, new ArrayList(entry.getCommands()), options.isSkipResult(), false, true));
details.setWriteFuture(future);
} else {
RPromise<Void> main = new RedissonPromise<Void>();
Expand Down
42 changes: 42 additions & 0 deletions redisson/src/test/java/org/redisson/RedissonTest.java
Expand Up @@ -36,6 +36,8 @@
import org.redisson.api.Node.InfoSection;
import org.redisson.api.NodeType;
import org.redisson.api.NodesGroup;
import org.redisson.api.RBucket;
import org.redisson.api.RBuckets;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
Expand Down Expand Up @@ -190,6 +192,46 @@ public void after() throws InterruptedException {
public static class Dummy {
private String field;
}

@Test
public void testNextResponseAfterDecoderError() throws Exception {
Config config = new Config();
config.useSingleServer()
.setConnectionMinimumIdleSize(1)
.setConnectionPoolSize(1)
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());

RedissonClient redisson = Redisson.create(config);

setJSONValue(redisson, "test1", "test1");
setStringValue(redisson, "test2", "test2");
setJSONValue(redisson, "test3", "test3");
try {
RBuckets buckets = redisson.getBuckets(new JsonJacksonCodec());
buckets.get("test2", "test1");
} catch (Exception e) {
e.printStackTrace();
}
assertThat(getStringValue(redisson, "test3")).isEqualTo("\"test3\"");

redisson.shutdown();
}

public void setJSONValue(RedissonClient redisson, String key, Object t) {
RBucket<Object> test1 = redisson.getBucket(key, new JsonJacksonCodec());
test1.set(t);
}

public void setStringValue(RedissonClient redisson, String key, Object t) {
RBucket<Object> test1 = redisson.getBucket(key, new StringCodec());
test1.set(t);
}


public Object getStringValue(RedissonClient redisson, String key) {
RBucket<Object> test1 = redisson.getBucket(key, new StringCodec());
return test1.get();
}

@Test(expected = IllegalArgumentException.class)
public void testSer() {
Expand Down

0 comments on commit 18a6cea

Please sign in to comment.