Skip to content

Commit

Permalink
Decoder state introduced. #183
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 18, 2015
1 parent 6b76cd8 commit d06b9ba
Show file tree
Hide file tree
Showing 24 changed files with 141 additions and 95 deletions.
25 changes: 11 additions & 14 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -50,7 +50,7 @@
* @author Nikita Koksharov
*
*/
public class CommandDecoder extends ReplayingDecoder<DecoderState> {
public class CommandDecoder extends ReplayingDecoder<State> {

private final Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -68,7 +68,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
if (data == null) {
currentDecoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};
Expand All @@ -78,12 +78,14 @@ public Object decode(ByteBuf buf) {
log.trace("channel: {} message: {}", ctx.channel(), in.toString(0, in.writerIndex(), CharsetUtil.UTF_8));
}

if (state() == null) {
state(new State());
}
state().setDecoderState(null);

if (data == null) {
decode(in, null, null, ctx.channel(), currentDecoder);
} else if (data instanceof CommandData) {
// if (state() == null) {
// state(new DecoderState());
// }
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
try {
// if (state().getSize() > 0) {
Expand All @@ -97,12 +99,7 @@ public Object decode(ByteBuf buf) {
} else if (data instanceof CommandsData) {
CommandsData commands = (CommandsData)data;

int i = 0;
if (state() != null) {
i = state().getIndex();
} else {
state(new DecoderState());
}
int i = state().getIndex();

while (in.writerIndex() > in.readerIndex()) {
CommandData<Object, Object> cmd = null;
Expand Down Expand Up @@ -155,7 +152,7 @@ private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> p
ByteBuf buf = readBytes(in);
Object result = null;
if (buf != null) {
result = decoder(data, parts, currentDecoder).decode(buf);
result = decoder(data, parts, currentDecoder).decode(buf, state());
}
handleResult(data, parts, result, false);
} else if (code == '*') {
Expand All @@ -178,7 +175,7 @@ private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Obje
decode(in, data, respParts, channel, currentDecoder);
}

Object result = messageDecoder(data, respParts).decode(respParts);
Object result = messageDecoder(data, respParts).decode(respParts, state());
if (result instanceof PubSubStatusMessage) {
if (parts == null) {
parts = new ArrayList<Object>();
Expand Down Expand Up @@ -265,7 +262,7 @@ private Decoder<Object> decoder(CommandData<Object, Object> data, List<Object> p
Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (parts != null) {
MultiDecoder<Object> multiDecoder = data.getCommand().getReplayMultiDecoder();
if (multiDecoder.isApplicable(parts.size())) {
if (multiDecoder.isApplicable(parts.size(), state())) {
decoder = multiDecoder;
}
}
Expand Down
Expand Up @@ -2,14 +2,15 @@

import java.util.List;

public class DecoderState {
public class State {

private int index;
private Object decoderState;

private long size;
private List<Object> respParts;

public DecoderState() {
public State() {
super();
}

Expand All @@ -34,5 +35,11 @@ public int getIndex() {
return index;
}

public <T> T getDecoderState() {
return (T)decoderState;
}
public void setDecoderState(Object decoderState) {
this.decoderState = decoderState;
}

}
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/client/protocol/Decoder.java
Expand Up @@ -17,10 +17,12 @@

import java.io.IOException;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;

public interface Decoder<R> {

R decode(ByteBuf buf) throws IOException;
R decode(ByteBuf buf, State state) throws IOException;

}
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/client/protocol/LongCodec.java
Expand Up @@ -15,6 +15,8 @@
*/
package org.redisson.client.protocol;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

Expand All @@ -26,7 +28,7 @@ public class LongCodec extends StringCodec {
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}
};
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/client/protocol/StringCodec.java
Expand Up @@ -17,6 +17,8 @@

import java.io.IOException;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

Expand All @@ -28,7 +30,7 @@ public class StringCodec implements Codec {
public Decoder<Object> getValueDecoder() {
return new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return buf.toString(CharsetUtil.UTF_8);
}
};
Expand Down
Expand Up @@ -17,28 +17,30 @@

import java.util.List;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class KeyValueObjectDecoder implements MultiDecoder<Object> {

@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
String status = buf.toString(CharsetUtil.UTF_8);
buf.skipBytes(2);
return status;
}

@Override
public Object decode(List<Object> parts) {
public Object decode(List<Object> parts, State state) {
if (parts.isEmpty()) {
return null;
}
return new KeyValueMessage(parts.get(0), parts.get(1));
}

@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

Expand Down
Expand Up @@ -17,23 +17,25 @@

import java.util.List;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class ListScanResultReplayDecoder implements MultiDecoder<ListScanResult<Object>> {

@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}

@Override
public ListScanResult<Object> decode(List<Object> parts) {
public ListScanResult<Object> decode(List<Object> parts, State state) {
return new ListScanResult<Object>((Long)parts.get(0), (List<Object>)parts.get(1));
}

@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

Expand Down
Expand Up @@ -18,23 +18,25 @@
import java.util.List;
import java.util.Map;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class MapScanResultReplayDecoder implements MultiDecoder<MapScanResult<Object, Object>> {

@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}

@Override
public MapScanResult<Object, Object> decode(List<Object> parts) {
public MapScanResult<Object, Object> decode(List<Object> parts, State state) {
return new MapScanResult<Object, Object>((Long)parts.get(0), (Map<Object, Object>)parts.get(1));
}

@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

Expand Down
Expand Up @@ -17,12 +17,13 @@

import java.util.List;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;

public interface MultiDecoder<T> extends Decoder<Object> {

boolean isApplicable(int paramNum);
boolean isApplicable(int paramNum, State state);

T decode(List<Object> parts);
T decode(List<Object> parts, State state);

}
Expand Up @@ -21,60 +21,70 @@
import java.util.Deque;
import java.util.List;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;

public class NestedMultiDecoder<T> implements MultiDecoder<Object> {

private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;
public static class DecoderState {

Deque<MultiDecoder<?>> decoders;

Deque<MultiDecoder<?>> flipDecoders;

public DecoderState(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
super();
this.decoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
this.flipDecoders = new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
}

public Deque<MultiDecoder<?>> getDecoders() {
return decoders;
}

private ThreadLocal<Deque<MultiDecoder<?>>> decoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder));
};
};
public Deque<MultiDecoder<?>> getFlipDecoders() {
return flipDecoders;
}

}

private ThreadLocal<Deque<MultiDecoder<?>>> flipDecoders = new ThreadLocal<Deque<MultiDecoder<?>>>() {
protected Deque<MultiDecoder<?>> initialValue() {
return new ArrayDeque<MultiDecoder<?>>(Arrays.asList(firstDecoder, secondDecoder, firstDecoder));
};
};
private final MultiDecoder<Object> firstDecoder;
private final MultiDecoder<Object> secondDecoder;

public NestedMultiDecoder(MultiDecoder<Object> firstDecoder, MultiDecoder<Object> secondDecoder) {
this.firstDecoder = firstDecoder;
this.secondDecoder = secondDecoder;
}

@Override
public Object decode(ByteBuf buf) throws IOException {
return flipDecoders.get().peek().decode(buf);
public Object decode(ByteBuf buf, State state) throws IOException {
DecoderState ds = getDecoder(state);
return ds.getFlipDecoders().peek().decode(buf, state);
}

@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
DecoderState ds = getDecoder(state);
if (paramNum == 0) {
flipDecoders.get().poll();
// in case of incoming buffer tail
// state should be reseted
if (flipDecoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();

flipDecoders.get().poll();
}
ds.getFlipDecoders().poll();
}
return flipDecoders.get().peek().isApplicable(paramNum);
return ds.getFlipDecoders().peek().isApplicable(paramNum, state);
}

@Override
public Object decode(List<Object> parts) {
Object result = decoders.get().poll().decode(parts);
// clear state on last decoding
if (decoders.get().isEmpty()) {
flipDecoders.remove();
decoders.remove();
private DecoderState getDecoder(State state) {
DecoderState ds = state.getDecoderState();
if (ds == null) {
ds = new DecoderState(firstDecoder, secondDecoder);
state.setDecoderState(ds);
}
return result;
return ds;
}

@Override
public Object decode(List<Object> parts, State state) {
DecoderState ds = getDecoder(state);
return ds.getDecoders().poll().decode(parts, state);
}

}
Expand Up @@ -17,22 +17,24 @@

import java.util.List;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;

public class ObjectListReplayDecoder implements MultiDecoder<List<Object>> {

@Override
public Object decode(ByteBuf buf) {
public Object decode(ByteBuf buf, State state) {
throw new UnsupportedOperationException();
}

@Override
public List<Object> decode(List<Object> parts) {
public List<Object> decode(List<Object> parts, State state) {
return parts;
}

@Override
public boolean isApplicable(int paramNum) {
public boolean isApplicable(int paramNum, State state) {
return false;
}

Expand Down

0 comments on commit d06b9ba

Please sign in to comment.