Skip to content

Commit

Permalink
RedisConnection added
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 5, 2015
1 parent cbbac2c commit d50e997
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 41 deletions.
49 changes: 9 additions & 40 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -18,11 +18,8 @@
import java.net.InetSocketAddress;

import org.redisson.client.handler.RedisCommandsQueue;
import org.redisson.client.handler.RedisData;
import org.redisson.client.handler.RedisDecoder;
import org.redisson.client.handler.RedisEncoder;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.StringCodec;

Expand All @@ -36,7 +33,6 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

public class RedisClient {

Expand All @@ -62,48 +58,21 @@ protected void initChannel(Channel ch) throws Exception {
});
}

public ChannelFuture connect() {
public RedisConnection connect() {
ChannelFuture future = bootstrap.connect();
channel = future.channel();
return future;
}

// public <R> Future<R> execute(Codec encoder, RedisCommand<R> command, Object ... params) {
// Promise<R> promise = bootstrap.group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<R, R>(promise, encoder, command, params));
// return promise;
// }

public <T, R> Future<R> execute(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = bootstrap.group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
future.syncUninterruptibly();
return new RedisConnection(bootstrap, channel);
}

public static void main(String[] args) throws InterruptedException {
final RedisClient rc = new RedisClient("127.0.0.1", 6379);
rc.connect().sync();
final RedisClient c = new RedisClient("127.0.0.1", 6379);
RedisConnection rc = c.connect();
// for (int i = 0; i < 10000; i++) {
Future<String> res1 = rc.execute(new StringCodec(), RedisCommands.CLIENT_SETNAME, "12333");
res1.addListener(new FutureListener<String>() {

@Override
public void operationComplete(Future<String> future) throws Exception {
System.out.println("res 12: " + future.getNow());
}

});

Future<String> res2 = rc.execute(new StringCodec(), RedisCommands.CLIENT_GETNAME);
res2.addListener(new FutureListener<String>() {

@Override
public void operationComplete(Future<String> future) throws Exception {
System.out.println("res name: " + future.getNow());
}

});

String res1 = rc.sync(new StringCodec(), RedisCommands.CLIENT_SETNAME, "12333");
System.out.println("res 12: " + res1);
String res2 = rc.sync(new StringCodec(), RedisCommands.CLIENT_GETNAME);
System.out.println("res name: " + res2);

/* Future<String> res = rc.execute(new StringCodec(), RedisCommands.SET, "test", "" + Math.random());
res.addListener(new FutureListener<String>() {
Expand Down
54 changes: 54 additions & 0 deletions src/main/java/org/redisson/client/RedisConnection.java
@@ -0,0 +1,54 @@
package org.redisson.client;

import org.redisson.client.handler.RedisData;
import org.redisson.client.protocol.Codec;
import org.redisson.client.protocol.RedisCommand;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;

public class RedisConnection {

final Bootstrap bootstrap;
final Channel channel;

public RedisConnection(Bootstrap bootstrap, Channel channel) {
super();
this.bootstrap = bootstrap;
this.channel = channel;
}

public <V> V get(Future<V> future) {
future.awaitUninterruptibly();
if (future.isSuccess()) {
return future.getNow();
}

if (future.cause() instanceof RedisException) {
throw (RedisException) future.cause();
}
throw new RedisException("Unexpected exception while processing command", future.cause());
}

public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params);
return get(r);
}

public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = bootstrap.group().next().<R>newPromise();
channel.writeAndFlush(new RedisData<T, R>(promise, encoder, command, params));
return promise;
}

// public <R> Future<R> execute(Codec encoder, RedisCommand<R> command, Object ... params) {
// Promise<R> promise = bootstrap.group().next().<R>newPromise();
// channel.writeAndFlush(new RedisData<R, R>(promise, encoder, command, params));
// return promise;
//}



}
Expand Up @@ -21,7 +21,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

public class RedisCommandsQueue extends ChannelDuplexHandler {
Expand Down

0 comments on commit d50e997

Please sign in to comment.