Skip to content

Commit

Permalink
PubSub support
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 8, 2015
1 parent 6c182ed commit e5da696
Show file tree
Hide file tree
Showing 16 changed files with 371 additions and 89 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/redisson/RedissonDeque.java
Expand Up @@ -43,7 +43,7 @@ protected RedissonDeque(ConnectionManager connectionManager, String name) {

@Override
public void addFirst(final V e) {
connectionManager.write(new VoidOperation<V, Long>() {
connectionManager.write(getName(), new VoidOperation<V, Long>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.lpush(getName(), e);
Expand All @@ -53,7 +53,7 @@ protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {

@Override
public void addLast(final V e) {
connectionManager.write(new VoidOperation<V, Long>() {
connectionManager.write(getName(), new VoidOperation<V, Long>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, V> async) {
return async.rpush(getName(), e);
Expand Down Expand Up @@ -99,7 +99,7 @@ public void remove() {

@Override
public V getLast() {
List<V> list = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> list = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), -1, -1);
Expand All @@ -113,7 +113,7 @@ protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {

@Override
public boolean offerFirst(final V e) {
connectionManager.write(new ResultOperation<Long, Object>() {
connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.lpush(getName(), e);
Expand All @@ -134,7 +134,7 @@ public V peekFirst() {

@Override
public V peekLast() {
List<V> list = connectionManager.read(new ResultOperation<List<V>, V>() {
List<V> list = connectionManager.read(getName(), new ResultOperation<List<V>, V>() {
@Override
protected Future<List<V>> execute(RedisAsyncConnection<Object, V> async) {
return async.lrange(getName(), -1, -1);
Expand All @@ -153,7 +153,7 @@ public V pollFirst() {

@Override
public V pollLast() {
return connectionManager.write(new ResultOperation<V, V>() {
return connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpop(getName());
Expand All @@ -178,7 +178,7 @@ public boolean removeFirstOccurrence(Object o) {

@Override
public V removeLast() {
V value = connectionManager.write(new ResultOperation<V, V>() {
V value = connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpop(getName());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonExpirable.java
Expand Up @@ -74,7 +74,7 @@ protected Future<Boolean> execute(RedisAsyncConnection<Object, Object> async) {

@Override
public long remainTimeToLive() {
return connectionManager.write(new ResultOperation<Long, Object>() {
return connectionManager.write(getName(), new ResultOperation<Long, Object>() {
@Override
protected Future<Long> execute(RedisAsyncConnection<Object, Object> async) {
return async.ttl(getName());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonLock.java
Expand Up @@ -393,7 +393,7 @@ private Future<Boolean> forceUnlockAsync() {

@Override
public boolean isLocked() {
return connectionManager.read(new SyncOperation<Boolean, Boolean>() {
return connectionManager.read(getName(), new SyncOperation<Boolean, Boolean>() {
@Override
public Boolean execute(RedisConnection<Object, Boolean> conn) {
return conn.exists(getName());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonQueue.java
Expand Up @@ -94,7 +94,7 @@ public V peek() {

@Override
public V pollLastAndOfferFirstTo(final String queueName) {
return connectionManager.write(new ResultOperation<V, V>() {
return connectionManager.write(getName(), new ResultOperation<V, V>() {
@Override
protected Future<V> execute(RedisAsyncConnection<Object, V> async) {
return async.rpoplpush(getName(), queueName);
Expand Down
42 changes: 28 additions & 14 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -16,12 +16,12 @@
package org.redisson.client;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutionException;

import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisDecoder;
import org.redisson.client.handler.RedisEncoder;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;
import org.redisson.client.protocol.PubSubMessage;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
Expand All @@ -35,6 +35,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor;

public class RedisClient {
Expand All @@ -56,10 +57,9 @@ public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketCh

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addFirst(
new RedisEncoder(),
new RedisCommandsQueue(),
new RedisDecoder());
ch.pipeline().addFirst(new RedisEncoder(),
new RedisCommandsQueue(),
new RedisDecoder());
}

});
Expand Down Expand Up @@ -87,20 +87,34 @@ public RedisConnection connect() {
return new RedisConnection(this, future.channel());
}

public RedisPubSubConnection connectPubSub() {
ChannelFuture future = bootstrap.connect();
future.syncUninterruptibly();
channels.add(future.channel());
return new RedisPubSubConnection(this, future.channel());
}

public ChannelGroupFuture shutdownAsync() {
return channels.close();
}

public static void main(String[] args) throws InterruptedException {
public static void main(String[] args) throws InterruptedException, ExecutionException {
final RedisClient c = new RedisClient("127.0.0.1", 6379);
RedisConnection rc = c.connect();
// for (int i = 0; i < 10000; i++) {
String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1);
String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2);
Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
System.out.println("res name 2: " + res3);
RedisPubSubConnection rpsc = c.connectPubSub();

// String res1 = rc.sync(RedisCommands.CLIENT_SETNAME, "12333");
// System.out.println("res 12: " + res1);
// String res2 = rc.sync(RedisCommands.CLIENT_GETNAME);
// System.out.println("res name: " + res2);
// Boolean res3 = rc.sync(new StringCodec(), RedisCommands.EXISTS, "33");
// System.out.println("res name 2: " + res3);

Future<Long> m = rpsc.publish("sss", "123");
System.out.println("out: " + m.get());
Future<PubSubMessage> m1 = rpsc.subscribe("sss");
System.out.println("out: " + m1.get());



/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/org/redisson/client/RedisPubSubConnection.java
@@ -0,0 +1,44 @@
package org.redisson.client;

import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.PubSubMessage;
import org.redisson.client.protocol.PubSubMessageDecoder;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;

public class RedisPubSubConnection {

final Channel channel;
final RedisClient redisClient;

public RedisPubSubConnection(RedisClient redisClient, Channel channel) {
this.redisClient = redisClient;
this.channel = channel;
}

public Future<PubSubMessage> subscribe(String ... channel) {
return async(new PubSubMessageDecoder(), RedisCommands.SUBSCRIBE, channel);
}

public Future<Long> publish(String channel, String msg) {
return async(new StringCodec(), RedisCommands.PUBLISH, channel, msg);
}

public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
}

public ChannelFuture closeAsync() {
return channel.close();
}

}
47 changes: 40 additions & 7 deletions src/main/java/org/redisson/client/handler/RedisDecoder.java
Expand Up @@ -16,6 +16,7 @@
package org.redisson.client.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.redisson.client.RedisException;
Expand All @@ -25,39 +26,71 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.util.CharsetUtil;

public class RedisDecoder extends ReplayingDecoder<Void> {

private static final char CR = '\r';
private static final char LF = '\n';
public static final char CR = '\r';
public static final char LF = '\n';
private static final char ZERO = '0';

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
RedisData<Object, Object> data = ctx.channel().attr(RedisCommandsQueue.REPLAY_PROMISE).getAndRemove();

decode(in, data, null);

ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}

private void decode(ByteBuf in, RedisData<Object, Object> data, List<Object> parts) throws IOException {
int code = in.readByte();
if (code == '+') {
Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setSuccess(result);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '-') {
Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setFailure(new RedisException(result.toString()));
} else if (code == ':') {
Object result = data.getCommand().getReponseDecoder().decode(in);
data.getPromise().setSuccess(result);
String status = in.readBytes(in.bytesBefore((byte) '\r')).toString(CharsetUtil.UTF_8);
in.skipBytes(2);
Long result = Long.valueOf(status);
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '$') {
Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
Object result = decoder.decode(readBytes(in));
if (parts != null) {
parts.add(result);
} else {
data.getPromise().setSuccess(result);
}
} else if (code == '*') {
long size = readLong(in);
List<Object> respParts = new ArrayList<Object>();
for (int i = 0; i < size; i++) {
decode(in, data, respParts);
}

Decoder<Object> decoder = data.getCommand().getReponseDecoder();
if (decoder == null) {
decoder = data.getCodec();
}
Object result = decoder.decode(respParts);
data.getPromise().setSuccess(result);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}

ctx.pipeline().fireUserEventTriggered(QueueCommands.NEXT_COMMAND);
}

public ByteBuf readBytes(ByteBuf is) throws IOException {
Expand Down
Expand Up @@ -15,6 +15,8 @@
*/
package org.redisson.client.protocol;

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

Expand All @@ -27,4 +29,9 @@ public Boolean decode(ByteBuf buf) {
return Boolean.valueOf(status);
}

@Override
public Boolean decode(List<Object> parts) {
throw new IllegalStateException();
}

}
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/client/protocol/Decoder.java
Expand Up @@ -15,10 +15,14 @@
*/
package org.redisson.client.protocol;

import java.util.List;

import io.netty.buffer.ByteBuf;

public interface Decoder<R> {

R decode(ByteBuf buf);

R decode(List<Object> parts);

}
29 changes: 29 additions & 0 deletions src/main/java/org/redisson/client/protocol/PubSubMessage.java
@@ -0,0 +1,29 @@
package org.redisson.client.protocol;

public class PubSubMessage {

public enum Type {SUBSCRIBE, MESSAGE}

private Type type;
private String channel;

public PubSubMessage(Type type, String channel) {
super();
this.type = type;
this.channel = channel;
}

public String getChannel() {
return channel;
}

public Type getType() {
return type;
}

@Override
public String toString() {
return "PubSubReplay [type=" + type + ", channel=" + channel + "]";
}

}

0 comments on commit e5da696

Please sign in to comment.