From abca09e5b7a5f71cb0618deb9fe3598bee870699 Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 15 Apr 2016 16:48:16 +0300 Subject: [PATCH] Replaying phase handling in CommandDecoder --- .../client/handler/CommandDecoder.java | 188 +++++++++++------- .../org/redisson/client/handler/State.java | 80 +++++--- .../redisson/client/handler/StateLevel.java | 29 +++ .../client/protocol/decoder/DecoderState.java | 7 + .../decoder/FlatNestedMultiDecoder.java | 2 +- .../protocol/decoder/NestedMultiDecoder.java | 32 ++- .../decoder/CacheGetAllDecoder.java | 4 +- .../java/org/redisson/RedissonGeoTest.java | 56 ++++++ .../org/redisson/RedissonSetCacheTest.java | 17 +- 9 files changed, 306 insertions(+), 109 deletions(-) create mode 100644 src/main/java/org/redisson/client/handler/StateLevel.java create mode 100644 src/main/java/org/redisson/client/protocol/decoder/DecoderState.java diff --git a/src/main/java/org/redisson/client/handler/CommandDecoder.java b/src/main/java/org/redisson/client/handler/CommandDecoder.java index 64113ad5d4f..d9eb1c1d246 100644 --- a/src/main/java/org/redisson/client/handler/CommandDecoder.java +++ b/src/main/java/org/redisson/client/handler/CommandDecoder.java @@ -36,6 +36,7 @@ import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.decoder.MultiDecoder; +import org.redisson.client.protocol.decoder.NestedMultiDecoder; import org.redisson.client.protocol.pubsub.Message; import org.redisson.client.protocol.pubsub.PubSubMessage; import org.redisson.client.protocol.pubsub.PubSubPatternMessage; @@ -79,61 +80,99 @@ public void addPubSubCommand(String channel, CommandData data) { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get(); - Decoder currentDecoder = null; - if (data == null) { - currentDecoder = StringCodec.INSTANCE.getValueDecoder(); + if (log.isTraceEnabled()) { + log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); } - if (state() == null) { - state(new State()); - - if (log.isTraceEnabled()) { - log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8)); + boolean makeCheckpoint = data != null; + if (data != null) { + if (data instanceof CommandsData) { + makeCheckpoint = false; + } else { + CommandData cmd = (CommandData)data; + if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) { + makeCheckpoint = false; + } + } } + state(new State(makeCheckpoint)); } + state().setDecoderState(null); if (data == null) { - decode(in, null, null, ctx.channel(), currentDecoder); + decode(in, null, null, ctx.channel()); } else if (data instanceof CommandData) { CommandData cmd = (CommandData)data; try { -// if (state().getSize() > 0) { -// List respParts = new ArrayList(); -// if (state().getRespParts() != null) { -// respParts = state().getRespParts(); -// } -// decodeMulti(in, cmd, null, ctx.channel(), currentDecoder, state().getSize(), respParts, true); -// } else { - decode(in, cmd, null, ctx.channel(), currentDecoder); -// } + if (state().getLevels().size() > 0) { + decodeFromCheckpoint(ctx, in, data, cmd); + } else { + decode(in, cmd, null, ctx.channel()); + } } catch (IOException e) { cmd.getPromise().tryFailure(e); } } else if (data instanceof CommandsData) { CommandsData commands = (CommandsData)data; - handleCommandsDataResponse(ctx, in, data, currentDecoder, commands); + decodeCommandBatch(ctx, in, data, commands); return; } - + ctx.pipeline().get(CommandsQueue.class).sendNextCommand(ctx.channel()); state(null); } - private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, - Decoder currentDecoder, CommandsData commands) { - int i = state().getIndex(); + private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, + CommandData cmd) throws IOException { + if (state().getLevels().size() == 2) { + StateLevel secondLevel = state().getLevels().get(1); + + if (secondLevel.getParts().isEmpty()) { + state().getLevels().remove(1); + } + } + + if (state().getLevels().size() == 2) { + StateLevel firstLevel = state().getLevels().get(0); + StateLevel secondLevel = state().getLevels().get(1); + + decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts()); + + Channel channel = ctx.channel(); + MultiDecoder decoder = messageDecoder(cmd, firstLevel.getParts(), channel); + if (decoder != null) { + Object result = decoder.decode(firstLevel.getParts(), state()); + if (data != null) { + handleResult(cmd, null, result, true, channel); + } + } + } + if (state().getLevels().size() == 1) { + StateLevel firstLevel = state().getLevels().get(0); + if (firstLevel.getParts().isEmpty()) { + state().resetLevel(); + decode(in, cmd, null, ctx.channel()); + } else { + decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts()); + } + } + } + + private void decodeCommandBatch(ChannelHandlerContext ctx, ByteBuf in, QueueCommand data, + CommandsData commandBatch) { + int i = state().getBatchIndex(); RedisException error = null; while (in.writerIndex() > in.readerIndex()) { CommandData cmd = null; try { checkpoint(); - state().setIndex(i); - cmd = (CommandData) commands.getCommands().get(i); - decode(in, cmd, null, ctx.channel(), currentDecoder); + state().setBatchIndex(i); + cmd = (CommandData) commandBatch.getCommands().get(i); + decode(in, cmd, null, ctx.channel()); i++; } catch (IOException e) { cmd.getPromise().tryFailure(e); @@ -147,8 +186,8 @@ private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, Q } } - if (i == commands.getCommands().size()) { - Promise promise = commands.getPromise(); + if (i == commandBatch.getCommands().size()) { + Promise promise = commandBatch.getPromise(); if (error != null) { if (!promise.tryFailure(error) && promise.cause() instanceof RedisTimeoutException) { log.warn("response has been skipped due to timeout! channel: {}, command: {}", ctx.channel(), data); @@ -164,11 +203,11 @@ private void handleCommandsDataResponse(ChannelHandlerContext ctx, ByteBuf in, Q state(null); } else { checkpoint(); - state().setIndex(i); + state().setBatchIndex(i); } } - private void decode(ByteBuf in, CommandData data, List parts, Channel channel, Decoder currentDecoder) throws IOException { + private void decode(ByteBuf in, CommandData data, List parts, Channel channel) throws IOException { int code = in.readByte(); if (code == '+') { String result = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8); @@ -212,32 +251,40 @@ private void decode(ByteBuf in, CommandData data, List p ByteBuf buf = readBytes(in); Object result = null; if (buf != null) { - result = decoder(data, parts, currentDecoder).decode(buf, state()); + Decoder decoder = selectDecoder(data, parts); + result = decoder.decode(buf, state()); } handleResult(data, parts, result, false, channel); } else if (code == '*') { + int level = state().incLevel(); + long size = readLong(in); - List respParts = new ArrayList(); - boolean top = false; -// if (state().trySetSize(size)) { -// state().setRespParts(respParts); -// top = true; -// } - - decodeMulti(in, data, parts, channel, currentDecoder, size, respParts, top); + List respParts; + if (state().getLevels().size()-1 >= level) { + StateLevel stateLevel = state().getLevels().get(level); + respParts = stateLevel.getParts(); + size = stateLevel.getSize(); + } else { + respParts = new ArrayList(); + if (state().isMakeCheckpoint()) { + state().addLevel(new StateLevel(size, respParts)); + } + } + + decodeMulti(in, data, parts, channel, size, respParts); } else { throw new IllegalStateException("Can't decode replay " + (char)code); } } private void decodeMulti(ByteBuf in, CommandData data, List parts, - Channel channel, Decoder currentDecoder, long size, List respParts, boolean top) + Channel channel, long size, List respParts) throws IOException { for (int i = respParts.size(); i < size; i++) { - decode(in, data, respParts, channel, currentDecoder); -// if (top) { -// checkpoint(); -// } + decode(in, data, respParts, channel); + if (state().isMakeCheckpoint()) { + checkpoint(); + } } MultiDecoder decoder = messageDecoder(data, respParts, channel); @@ -246,7 +293,10 @@ private void decodeMulti(ByteBuf in, CommandData data, List data, List in.readerIndex()) { - decode(in, data, null, channel, currentDecoder); + decode(in, data, null, channel); } - } else { - handleMultiResult(data, parts, channel, result); } } private void handleMultiResult(CommandData data, List parts, Channel channel, Object result) { - if (data != null) { - handleResult(data, parts, result, true, channel); - } else { - if (result instanceof PubSubStatusMessage) { - String channelName = ((PubSubStatusMessage) result).getChannel(); - CommandData d = pubSubChannels.get(channelName); - if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) { - pubSubChannels.remove(channelName); - pubSubMessageDecoders.put(channelName, d.getMessageDecoder()); - } - if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) { - pubSubChannels.remove(channelName); - pubSubMessageDecoders.remove(channelName); - } + if (result instanceof PubSubStatusMessage) { + String channelName = ((PubSubStatusMessage) result).getChannel(); + CommandData d = pubSubChannels.get(channelName); + if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) { + pubSubChannels.remove(channelName); + pubSubMessageDecoders.put(channelName, d.getMessageDecoder()); } - - RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); - if (result instanceof PubSubStatusMessage) { - pubSubConnection.onMessage((PubSubStatusMessage) result); - } else if (result instanceof PubSubMessage) { - pubSubConnection.onMessage((PubSubMessage) result); - } else { - pubSubConnection.onMessage((PubSubPatternMessage) result); + if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) { + pubSubChannels.remove(channelName); + pubSubMessageDecoders.remove(channelName); } } + + RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel); + if (result instanceof PubSubStatusMessage) { + pubSubConnection.onMessage((PubSubStatusMessage) result); + } else if (result instanceof PubSubMessage) { + pubSubConnection.onMessage((PubSubMessage) result); + } else { + pubSubConnection.onMessage((PubSubPatternMessage) result); + } } private void handleResult(CommandData data, List parts, Object result, boolean multiResult, Channel channel) { @@ -329,7 +373,7 @@ private MultiDecoder messageDecoder(CommandData data, Li return data.getCommand().getReplayMultiDecoder(); } - private Decoder decoder(CommandData data, List parts, Decoder currentDecoder) { + private Decoder selectDecoder(CommandData data, List parts) { if (data == null) { if (parts.size() == 2 && parts.get(0).equals("message")) { String channelName = (String) parts.get(1); @@ -339,7 +383,7 @@ private Decoder decoder(CommandData data, List p String patternName = (String) parts.get(1); return pubSubMessageDecoders.get(patternName); } - return currentDecoder; + return StringCodec.INSTANCE.getValueDecoder(); } Decoder decoder = data.getCommand().getReplayDecoder(); diff --git a/src/main/java/org/redisson/client/handler/State.java b/src/main/java/org/redisson/client/handler/State.java index d9d1ba5791f..0c8f3520625 100644 --- a/src/main/java/org/redisson/client/handler/State.java +++ b/src/main/java/org/redisson/client/handler/State.java @@ -15,50 +15,80 @@ */ package org.redisson.client.handler; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import org.redisson.client.protocol.decoder.DecoderState; + public class State { - private int index; - private Object decoderState; + private int batchIndex; + private DecoderState decoderState; + + private int level = -1; + private List levels; + private DecoderState decoderStateCopy; + private final boolean makeCheckpoint; - private long size; - private List respParts; + public State(boolean makeCheckpoint) { + this.makeCheckpoint = makeCheckpoint; + } - public State() { - super(); + public boolean isMakeCheckpoint() { + return makeCheckpoint; } - public boolean trySetSize(long size) { - if (this.size != 0) { - return false; + public void resetLevel() { + level = -1; + } + public int decLevel() { + return --level; + } + public int incLevel() { + return ++level; + } + + public void addLevel(StateLevel stateLevel) { + if (levels == null) { + levels = new ArrayList(2); } - this.size = size; - return true; + levels.add(stateLevel); } - public long getSize() { - return size; + public List getLevels() { + if (levels == null) { + return Collections.emptyList(); + } + return levels; } - public void setRespParts(List respParts) { - this.respParts = respParts; + public void setBatchIndex(int index) { + this.batchIndex = index; } - public List getRespParts() { - return respParts; + public int getBatchIndex() { + return batchIndex; } - public void setIndex(int index) { - this.index = index; + public T getDecoderState() { + return (T) decoderState; } - public int getIndex() { - return index; + public void setDecoderState(DecoderState decoderState) { + this.decoderState = decoderState; } - public T getDecoderState() { - return (T)decoderState; + public DecoderState getDecoderStateCopy() { + return decoderStateCopy; } - public void setDecoderState(Object decoderState) { - this.decoderState = decoderState; + public void setDecoderStateCopy(DecoderState decoderStateCopy) { + this.decoderStateCopy = decoderStateCopy; + } + + @Override + public String toString() { + return "State [batchIndex=" + batchIndex + ", decoderState=" + decoderState + ", level=" + level + ", levels=" + + levels + ", decoderStateCopy=" + decoderStateCopy + "]"; } + + } diff --git a/src/main/java/org/redisson/client/handler/StateLevel.java b/src/main/java/org/redisson/client/handler/StateLevel.java new file mode 100644 index 00000000000..37f2fbc4a8a --- /dev/null +++ b/src/main/java/org/redisson/client/handler/StateLevel.java @@ -0,0 +1,29 @@ +package org.redisson.client.handler; + +import java.util.List; + +public class StateLevel { + + private long size; + private List parts; + + public StateLevel(long size, List parts) { + super(); + this.size = size; + this.parts = parts; + } + + public long getSize() { + return size; + } + + public List getParts() { + return parts; + } + + @Override + public String toString() { + return "StateLevel [size=" + size + ", parts=" + parts + "]"; + } + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java b/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java new file mode 100644 index 00000000000..c3abf7502c2 --- /dev/null +++ b/src/main/java/org/redisson/client/protocol/decoder/DecoderState.java @@ -0,0 +1,7 @@ +package org.redisson.client.protocol.decoder; + +public interface DecoderState { + + DecoderState copy(); + +} diff --git a/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java index 4bd04c8eeab..72f5cfc1c0f 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/FlatNestedMultiDecoder.java @@ -38,7 +38,7 @@ public Object decode(ByteBuf buf, State state) throws IOException { @Override public boolean isApplicable(int paramNum, State state) { - DecoderState ds = getDecoder(state); + NestedDecoderState ds = getDecoder(state); if (paramNum == 0) { ds.resetDecoderIndex(); } diff --git a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java index 43b5266ac37..2fcd87c4ee8 100644 --- a/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java +++ b/src/main/java/org/redisson/client/protocol/decoder/NestedMultiDecoder.java @@ -24,13 +24,19 @@ public class NestedMultiDecoder implements MultiDecoder { - public static class DecoderState { + public static class NestedDecoderState implements DecoderState { int decoderIndex; int flipDecoderIndex; - public DecoderState() { + public NestedDecoderState() { + } + + public NestedDecoderState(int decoderIndex, int flipDecoderIndex) { + super(); + this.decoderIndex = decoderIndex; + this.flipDecoderIndex = flipDecoderIndex; } public int getDecoderIndex() { @@ -53,6 +59,16 @@ public void incFlipDecoderIndex() { flipDecoderIndex++; } + @Override + public DecoderState copy() { + return new NestedDecoderState(decoderIndex, flipDecoderIndex); + } + + @Override + public String toString() { + return "NestedDecoderState [decoderIndex=" + decoderIndex + ", flipDecoderIndex=" + flipDecoderIndex + "]"; + } + } protected final MultiDecoder firstDecoder; @@ -81,7 +97,7 @@ public NestedMultiDecoder(MultiDecoder firstDecoder, MultiDecoder decoder = null; if (ds.getFlipDecoderIndex() == 2) { @@ -96,7 +112,7 @@ public Object decode(ByteBuf buf, State state) throws IOException { @Override public boolean isApplicable(int paramNum, State state) { - DecoderState ds = getDecoder(state); + NestedDecoderState ds = getDecoder(state); if (paramNum == 0) { ds.incFlipDecoderIndex(); ds.resetDecoderIndex(); @@ -118,10 +134,10 @@ public boolean isApplicable(int paramNum, State state) { return decoder.isApplicable(paramNum, state); } - protected final DecoderState getDecoder(State state) { - DecoderState ds = state.getDecoderState(); + protected final NestedDecoderState getDecoder(State state) { + NestedDecoderState ds = state.getDecoderState(); if (ds == null) { - ds = new DecoderState(); + ds = new NestedDecoderState(); state.setDecoderState(ds); } return ds; @@ -137,7 +153,7 @@ public Object decode(List parts, State state) { return decoder.decode(parts, state); } - DecoderState ds = getDecoder(state); + NestedDecoderState ds = getDecoder(state); if (parts.isEmpty()) { ds.resetDecoderIndex(); } diff --git a/src/main/java/org/redisson/connection/decoder/CacheGetAllDecoder.java b/src/main/java/org/redisson/connection/decoder/CacheGetAllDecoder.java index 8e5fe597bf7..0d14eff2d17 100644 --- a/src/main/java/org/redisson/connection/decoder/CacheGetAllDecoder.java +++ b/src/main/java/org/redisson/connection/decoder/CacheGetAllDecoder.java @@ -21,11 +21,11 @@ import java.util.List; import java.util.Map; +import org.redisson.client.codec.LongCodec; import org.redisson.client.handler.State; import org.redisson.client.protocol.decoder.MultiDecoder; import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; public class CacheGetAllDecoder implements MultiDecoder> { @@ -37,7 +37,7 @@ public CacheGetAllDecoder(List args) { @Override public Object decode(ByteBuf buf, State state) throws IOException { - return Long.valueOf(buf.toString(CharsetUtil.UTF_8)); + return LongCodec.INSTANCE.getValueDecoder().decode(buf, state); } @Override diff --git a/src/test/java/org/redisson/RedissonGeoTest.java b/src/test/java/org/redisson/RedissonGeoTest.java index fc689e006b7..4b6baf7ee4d 100644 --- a/src/test/java/org/redisson/RedissonGeoTest.java +++ b/src/test/java/org/redisson/RedissonGeoTest.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import org.junit.Test; import org.redisson.core.GeoEntry; @@ -103,6 +104,61 @@ public void testRadiusWithDistance() { expected.put("Catania", 56.4413); assertThat(geo.radiusWithDistance(15, 37, 200, GeoUnit.KILOMETERS)).isEqualTo(expected); } + + @Test + public void testRadiusWithDistanceHugeAmount() { + RGeo geo = redisson.getGeo("test"); + + for (int i = 0; i < 10000; i++) { + geo.add(10 + 0.000001*i, 11 + 0.000001*i, "" + i); + } + + Map res = geo.radiusWithDistance(10, 11, 200, GeoUnit.KILOMETERS); + assertThat(res).hasSize(10000); + } + + @Test + public void testRadiusWithPositionHugeAmount() { + RGeo geo = redisson.getGeo("test"); + + for (int i = 0; i < 10000; i++) { + geo.add(10 + 0.000001*i, 11 + 0.000001*i, "" + i); + } + + Map res = geo.radiusWithPosition(10, 11, 200, GeoUnit.KILOMETERS); + assertThat(res).hasSize(10000); + } + + + @Test + public void testRadiusWithDistanceBigObject() { + RGeo> geo = redisson.getGeo("test"); + + Map map = new HashMap(); + for (int i = 0; i < 150; i++) { + map.put("" + i, "" + i); + } + + geo.add(new GeoEntry(13.361389, 38.115556, map)); + + Map map1 = new HashMap(map); + map1.remove("100"); + geo.add(new GeoEntry(15.087269, 37.502669, map1)); + + Map map2 = new HashMap(map); + map2.remove("0"); + geo.add(new GeoEntry(15.081269, 37.502169, map2)); + + Map, Double> expected = new HashMap, Double>(); + expected.put(map, 190.4424); + expected.put(map1, 56.4413); + expected.put(map2, 56.3159); + + Map, Double> res = geo.radiusWithDistance(15, 37, 200, GeoUnit.KILOMETERS); + assertThat(res.keySet()).containsOnlyElementsOf(expected.keySet()); + assertThat(res.values()).containsOnlyElementsOf(expected.values()); + } + @Test public void testRadiusWithDistanceEmpty() { diff --git a/src/test/java/org/redisson/RedissonSetCacheTest.java b/src/test/java/org/redisson/RedissonSetCacheTest.java index 64e4ddc236e..4698435b424 100644 --- a/src/test/java/org/redisson/RedissonSetCacheTest.java +++ b/src/test/java/org/redisson/RedissonSetCacheTest.java @@ -5,8 +5,10 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -46,7 +48,20 @@ public void testEmptyReadAll() { RSetCache set = redisson.getSetCache("set"); assertThat(set.readAll()).isEmpty(); } - + + @Test + public void testAddBigBean() { + RSetCache> set = redisson.getSetCache("simple"); + Map map = new HashMap(); + for (int i = 0; i < 150; i++) { + map.put(i, i); + } + set.add(map); + map.remove(0); + set.add(map); + set.iterator().next(); + } + @Test public void testAddBean() throws InterruptedException, ExecutionException { SimpleBean sb = new SimpleBean();