Skip to content

Commit

Permalink
PubSub handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 10, 2015
1 parent 6bb464e commit 4783371
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 16 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -116,6 +116,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
System.out.println("out: " + m); System.out.println("out: " + m);
Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*"); Future<PubSubStatusMessage> m1 = rpsc.psubscribe("ss*");
System.out.println("out: " + m1.get()); System.out.println("out: " + m1.get());
Future<PubSubStatusMessage> m2 = rpsc.psubscribe("ss*");
System.out.println("out: " + m2.get());
rpsc.addListener(new RedisPubSubListener<String>() { rpsc.addListener(new RedisPubSubListener<String>() {
@Override @Override
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
Expand All @@ -135,6 +137,12 @@ public void onPatternMessage(String pattern, String channel, String message) {
Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444"); Long res = c2.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res); System.out.println("published: " + res);


Future<PubSubStatusMessage> m3 = rpsc.punsubscribe("ss*");
System.out.println("punsubscribe out: " + m3.get());

final RedisClient c3 = new RedisClient("127.0.0.1", 6379);
Long res3 = c3.connect().sync(new StringCodec(), RedisCommands.PUBLISH, "sss", "4444");
System.out.println("published: " + res3);




/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random()); /* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/client/RedisPubSubConnection.java
Expand Up @@ -81,6 +81,10 @@ public Future<PubSubStatusMessage> unsubscribe(String ... channel) {
return async(null, RedisCommands.UNSUBSCRIBE, channel); return async(null, RedisCommands.UNSUBSCRIBE, channel);
} }


public Future<PubSubStatusMessage> punsubscribe(String ... channel) {
return async(null, RedisCommands.PUNSUBSCRIBE, channel);
}

// public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { // public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
// Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise(); // Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params)); // channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/org/redisson/client/handler/RedisData.java
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import org.redisson.client.protocol.Codec; import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.pubsub.MultiDecoder; import org.redisson.client.protocol.pubsub.MultiDecoder;


Expand All @@ -31,18 +30,18 @@ public class RedisData<T, R> {
final Object[] params; final Object[] params;
final Codec codec; final Codec codec;
final AtomicBoolean sended = new AtomicBoolean(); final AtomicBoolean sended = new AtomicBoolean();
final MultiDecoder<Object> nextDecoder; final MultiDecoder<Object> messageDecoder;


public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) { public RedisData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
this(promise, null, codec, command, params); this(promise, null, codec, command, params);
} }


public RedisData(Promise<R> promise, MultiDecoder<Object> nextDecoder, Codec codec, RedisCommand<T> command, Object[] params) { public RedisData(Promise<R> promise, MultiDecoder<Object> messageDecoder, Codec codec, RedisCommand<T> command, Object[] params) {
this.promise = promise; this.promise = promise;
this.command = command; this.command = command;
this.params = params; this.params = params;
this.codec = codec; this.codec = codec;
this.nextDecoder = nextDecoder; this.messageDecoder = messageDecoder;
} }


public RedisCommand<T> getCommand() { public RedisCommand<T> getCommand() {
Expand All @@ -53,8 +52,8 @@ public Object[] getParams() {
return params; return params;
} }


public MultiDecoder<Object> getNextDecoder() { public MultiDecoder<Object> getMessageDecoder() {
return nextDecoder; return messageDecoder;
} }


public Promise<R> getPromise() { public Promise<R> getPromise() {
Expand Down
62 changes: 52 additions & 10 deletions src/main/java/org/redisson/client/handler/RedisDecoder.java
Expand Up @@ -18,7 +18,9 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;


import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
Expand All @@ -39,19 +41,29 @@ public class RedisDecoder extends ReplayingDecoder<Void> {
public static final char LF = '\n'; public static final char LF = '\n';
private static final char ZERO = '0'; private static final char ZERO = '0';


private MultiDecoder<Object> nextDecoder; private final Map<String, MultiDecoder<Object>> messageDecoders = new HashMap<String, MultiDecoder<Object>>();


@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove(); RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();
RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get(); RedisPubSubConnection pubSubConnection = ctx.channel().attr(RedisPubSubConnection.CONNECTION).get();


decode(in, data, null, pubSubConnection); Decoder<Object> currentDecoder = null;
if (data == null) {
currentDecoder = new Decoder<Object>() {
@Override
public Object decode(ByteBuf buf) {
return buf.toString(CharsetUtil.UTF_8);
}
};
}

decode(in, data, null, pubSubConnection, currentDecoder);


ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND); ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
} }


private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection) throws IOException { private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts, RedisPubSubConnection pubSubConnection, Decoder<Object> currentDecoder) throws IOException {
int code = in.readByte(); int code = in.readByte();
// System.out.println("trying decode -- " + (char)code); // System.out.println("trying decode -- " + (char)code);
if (code == '+') { if (code == '+') {
Expand All @@ -66,19 +78,26 @@ private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> par
Object result = Long.valueOf(status); Object result = Long.valueOf(status);
handleResult(data, parts, result); handleResult(data, parts, result);
} else if (code == '$') { } else if (code == '$') {
Object result = decoder(data, parts != null).decode(readBytes(in)); Object result = decoder(data, parts, currentDecoder).decode(readBytes(in));
handleResult(data, parts, result); handleResult(data, parts, result);
} else if (code == '*') { } else if (code == '*') {
long size = readLong(in); long size = readLong(in);
List<Object> respParts = new ArrayList<Object>(); List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
decode(in, data, respParts, pubSubConnection); decode(in, data, respParts, pubSubConnection, currentDecoder);
} }


Object result = ((MultiDecoder<Object>)decoder(data, true)).decode(respParts); Object result = messageDecoder(data, respParts).decode(respParts);
if (data != null) { if (data != null) {
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) { if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(data.getCommand().getName())) {
nextDecoder = data.getNextDecoder(); for (Object param : data.getParams()) {
messageDecoders.put(param.toString(), data.getMessageDecoder());
}
}
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(data.getCommand().getName())) {
for (Object param : data.getParams()) {
messageDecoders.remove(param.toString());
}
} }
data.getPromise().setSuccess(result); data.getPromise().setSuccess(result);
} else { } else {
Expand All @@ -104,12 +123,35 @@ private void handleResult(RedisData<Object, Object> data, List<Object> parts, Ob
} }
} }


private Decoder<Object> decoder(RedisData<Object, Object> data, boolean isMulti) { private MultiDecoder<Object> messageDecoder(RedisData<Object, Object> data, List<Object> parts) {
if (data == null) { if (data == null) {
return nextDecoder; if (parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
}
if (parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
} }
return data.getCommand().getReplayMultiDecoder();
}

private Decoder<Object> decoder(RedisData<Object, Object> data, List<Object> parts, Decoder<Object> currentDecoder) {
if (data == null) {
if (parts.size() == 2 && parts.get(0).equals("message")) {
String channelName = (String) parts.get(1);
return messageDecoders.get(channelName);
}
if (parts.size() == 3 && parts.get(0).equals("pmessage")) {
String patternName = (String) parts.get(1);
return messageDecoders.get(patternName);
}
return currentDecoder;
}

Decoder<Object> decoder = data.getCommand().getReplayDecoder(); Decoder<Object> decoder = data.getCommand().getReplayDecoder();
if (isMulti) { if (parts != null) {
decoder = data.getCommand().getReplayMultiDecoder(); decoder = data.getCommand().getReplayMultiDecoder();
} }
if (decoder == null) { if (decoder == null) {
Expand Down

0 comments on commit 4783371

Please sign in to comment.