Skip to content

Commit

Permalink
Replaying phase handling in CommandDecoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 15, 2016
1 parent 26f2d0a commit abca09e
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 109 deletions.
188 changes: 116 additions & 72 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -36,6 +36,7 @@
import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.pubsub.Message; import org.redisson.client.protocol.pubsub.Message;
import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubMessage;
import org.redisson.client.protocol.pubsub.PubSubPatternMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage;
Expand Down Expand Up @@ -79,61 +80,99 @@ public void addPubSubCommand(String channel, CommandData<Object, Object> data) {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();


Decoder<Object> currentDecoder = null; if (log.isTraceEnabled()) {
if (data == null) { log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
currentDecoder = StringCodec.INSTANCE.getValueDecoder();
} }

if (state() == null) { if (state() == null) {
state(new State()); boolean makeCheckpoint = data != null;

if (data != null) {
if (log.isTraceEnabled()) { if (data instanceof CommandsData) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); makeCheckpoint = false;
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) {
makeCheckpoint = false;
}
}
} }
state(new State(makeCheckpoint));
} }

state().setDecoderState(null); state().setDecoderState(null);


if (data == null) { if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder); decode(in, null, null, ctx.channel());
} else if (data instanceof CommandData) { } else if (data instanceof CommandData) {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data; CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try { try {
// if (state().getSize() > 0) { if (state().getLevels().size() > 0) {
// List<Object> respParts = new ArrayList<Object>(); decodeFromCheckpoint(ctx, in, data, cmd);
// if (state().getRespParts() != null) { } else {
// respParts = state().getRespParts(); decode(in, cmd, null, ctx.channel());
// } }
// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts, true);
// } else {
decode(in, cmd, null, ctx.channel(), currentDecoder);
// }
} catch (IOException e) { } catch (IOException e) {
cmd.getPromise().tryFailure(e); cmd.getPromise().tryFailure(e);
} }
} else if (data instanceof CommandsData) { } else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data; CommandsData commands = (CommandsData)data;


handleCommandsDataResponse(ctx, in, data, currentDecoder, commands); decodeCommandBatch(ctx, in, data, commands);
return; return;
} }

ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel());


state(null); state(null);
} }


private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
Decoder<Object> currentDecoder, CommandsData commands) { CommandData<Object, Object> cmd) throws IOException {
int i = state().getIndex(); if (state().getLevels().size() == 2) {
StateLevel secondLevel = state().getLevels().get(1);

if (secondLevel.getParts().isEmpty()) {
state().getLevels().remove(1);
}
}

if (state().getLevels().size() == 2) {
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);

decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());

Channel channel = ctx.channel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts(), channel);
if (decoder != null) {
Object result = decoder.decode(firstLevel.getParts(), state());
if (data != null) {
handleResult(cmd, null, result, true, channel);
}
}
}
if (state().getLevels().size() == 1) {
StateLevel firstLevel = state().getLevels().get(0);
if (firstLevel.getParts().isEmpty()) {
state().resetLevel();
decode(in, cmd, null, ctx.channel());
} else {
decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
}
}
}

private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data,
CommandsData commandBatch) {
int i = state().getBatchIndex();


RedisException error = null; RedisException error = null;
while (in.writerIndex() > in.readerIndex()) { while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null; CommandData<Object, Object> cmd = null;
try { try {
checkpoint(); checkpoint();
state().setIndex(i); state().setBatchIndex(i);
cmd = (CommandData<Object, Object>) commands.getCommands().get(i); cmd = (CommandData<Object, Object>) commandBatch.getCommands().get(i);
decode(in, cmd, null, ctx.channel(), currentDecoder); decode(in, cmd, null, ctx.channel());
i++; i++;
} catch (IOException e) { } catch (IOException e) {
cmd.getPromise().tryFailure(e); cmd.getPromise().tryFailure(e);
Expand All @@ -147,8 +186,8 @@ private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, Q
} }
} }


if (i == commands.getCommands().size()) { if (i == commandBatch.getCommands().size()) {
Promise<Void> promise = commands.getPromise(); Promise<Void> promise = commandBatch.getPromise();
if (error != null) { if (error != null) {
if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) {
log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data);
Expand All @@ -164,11 +203,11 @@ private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, Q
state(null); state(null);
} else { } else {
checkpoint(); checkpoint();
state().setIndex(i); state().setBatchIndex(i);
} }
} }


private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, Decoder<Object> currentDecoder) throws IOException { private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel) throws IOException {
int code = in.readByte(); int code = in.readByte();
if (code == '+') { if (code == '+') {
String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
Expand Down Expand Up @@ -212,32 +251,40 @@ private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> p
ByteBuf buf = readBytes(in); ByteBuf buf = readBytes(in);
Object result = null; Object result = null;
if (buf != null) { if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf, state()); Decoder<Object> decoder = selectDecoder(data, parts);
result = decoder.decode(buf, state());
} }
handleResult(data, parts, result, false, channel); handleResult(data, parts, result, false, channel);
} else if (code == '*') { } else if (code == '*') {
int level = state().incLevel();

long size = readLong(in); long size = readLong(in);
List<Object> respParts = new ArrayList<Object>(); List<Object> respParts;
boolean top = false; if (state().getLevels().size()-1 >= level) {
// if (state().trySetSize(size)) { StateLevel stateLevel = state().getLevels().get(level);
// state().setRespParts(respParts); respParts = stateLevel.getParts();
// top = true; size = stateLevel.getSize();
// } } else {

respParts = new ArrayList<Object>();
decodeMulti(in, data, parts, channel, currentDecoder, size, respParts, top); if (state().isMakeCheckpoint()) {
state().addLevel(new StateLevel(size, respParts));
}
}

decodeMulti(in, data, parts, channel, size, respParts);
} else { } else {
throw new IllegalStateException("Can't decode replay " + (char)code); throw new IllegalStateException("Can't decode replay " + (char)code);
} }
} }


private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, Decoder<Object> currentDecoder, long size, List<Object> respParts, boolean top) Channel channel, long size, List<Object> respParts)
throws IOException { throws IOException {
for (int i = respParts.size(); i < size; i++) { for (int i = respParts.size(); i < size; i++) {
decode(in, data, respParts, channel, currentDecoder); decode(in, data, respParts, channel);
// if (top) { if (state().isMakeCheckpoint()) {
// checkpoint(); checkpoint();
// } }
} }


MultiDecoder<Object> decoder = messageDecoder(data, respParts, channel); MultiDecoder<Object> decoder = messageDecoder(data, respParts, channel);
Expand All @@ -246,7 +293,10 @@ private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Obje
} }


Object result = decoder.decode(respParts, state()); Object result = decoder.decode(respParts, state());

if (data != null) {
handleResult(data, parts, result, true, channel);
return;
}


if (result instanceof Message) { if (result instanceof Message) {
// store current message index // store current message index
Expand All @@ -255,40 +305,34 @@ private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Obje
handleMultiResult(data, null, channel, result); handleMultiResult(data, null, channel, result);
// has next messages? // has next messages?
if (in.writerIndex() > in.readerIndex()) { if (in.writerIndex() > in.readerIndex()) {
decode(in, data, null, channel, currentDecoder); decode(in, data, null, channel);
} }
} else {
handleMultiResult(data, parts, channel, result);
} }
} }


private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts, private void handleMultiResult(CommandData<Object, Object> data, List<Object> parts,
Channel channel, Object result) { Channel channel, Object result) {
if (data != null) { if (result instanceof PubSubStatusMessage) {
handleResult(data, parts, result, true, channel); String channelName = ((PubSubStatusMessage) result).getChannel();
} else { CommandData<Object, Object> d = pubSubChannels.get(channelName);
if (result instanceof PubSubStatusMessage) { if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
String channelName = ((PubSubStatusMessage) result).getChannel(); pubSubChannels.remove(channelName);
CommandData<Object, Object> d = pubSubChannels.get(channelName); pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
pubSubChannels.remove(channelName);
pubSubMessageDecoders.remove(channelName);
}
} }

if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) {
RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); pubSubChannels.remove(channelName);
if (result instanceof PubSubStatusMessage) { pubSubMessageDecoders.remove(channelName);
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
} }
} }

RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
if (result instanceof PubSubStatusMessage) {
pubSubConnection.onMessage((PubSubStatusMessage) result);
} else if (result instanceof PubSubMessage) {
pubSubConnection.onMessage((PubSubMessage) result);
} else {
pubSubConnection.onMessage((PubSubPatternMessage) result);
}
} }


private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) { private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean multiResult, Channel channel) {
Expand Down Expand Up @@ -329,7 +373,7 @@ private MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, Li
return data.getCommand().getReplayMultiDecoder(); return data.getCommand().getReplayMultiDecoder();
} }


private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) { private Decoder<Object> selectDecoder(CommandData<Object, Object> data, List<Object> parts) {
if (data == null) { if (data == null) {
if (parts.size() == 2 && parts.get(0).equals("message")) { if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1); String channelName = (String) parts.get(1);
Expand All @@ -339,7 +383,7 @@ private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> p
String patternName = (String) parts.get(1); String patternName = (String) parts.get(1);
return pubSubMessageDecoders.get(patternName); return pubSubMessageDecoders.get(patternName);
} }
return currentDecoder; return StringCodec.INSTANCE.getValueDecoder();
} }


Decoder<Object> decoder = data.getCommand().getReplayDecoder(); Decoder<Object> decoder = data.getCommand().getReplayDecoder();
Expand Down

0 comments on commit abca09e

Please sign in to comment.