Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 26, 2016
1 parent d9081f2 commit 9d1fd5d
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 67 deletions.
5 changes: 3 additions & 2 deletions redisson/src/main/java/org/redisson/RedissonNode.java
Expand Up @@ -23,6 +23,7 @@
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.config.RedissonNodeConfig; import org.redisson.config.RedissonNodeConfig;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void start() {
private void retrieveAdresses() { private void retrieveAdresses() {
ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager(); ConnectionManager connectionManager = ((Redisson)redisson).getConnectionManager();
for (MasterSlaveEntry entry : connectionManager.getEntrySet()) { for (MasterSlaveEntry entry : connectionManager.getEntrySet()) {
RFuture<RedisConnection> readFuture = entry.connectionReadOp(); RFuture<RedisConnection> readFuture = entry.connectionReadOp(RedisCommands.PUBLISH);
if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) if (readFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& readFuture.isSuccess()) { && readFuture.isSuccess()) {
RedisConnection connection = readFuture.getNow(); RedisConnection connection = readFuture.getNow();
Expand All @@ -155,7 +156,7 @@ private void retrieveAdresses() {
localAddress = (InetSocketAddress) connection.getChannel().localAddress(); localAddress = (InetSocketAddress) connection.getChannel().localAddress();
return; return;
} }
RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(); RFuture<RedisConnection> writeFuture = entry.connectionWriteOp(RedisCommands.PUBLISH);
if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout()) if (writeFuture.awaitUninterruptibly((long)connectionManager.getConfig().getConnectTimeout())
&& writeFuture.isSuccess()) { && writeFuture.isSuccess()) {
RedisConnection connection = writeFuture.getNow(); RedisConnection connection = writeFuture.getNow();
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.Decoder; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand; import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder; import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;
Expand Down Expand Up @@ -345,11 +346,11 @@ private void handlePublishSubscribe(CommandData<Object, Object> data, List<Objec
String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase(); String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
PubSubKey key = new PubSubKey(channelName, operation); PubSubKey key = new PubSubKey(channelName, operation);
CommandData<Object, Object> d = pubSubChannels.get(key); CommandData<Object, Object> d = pubSubChannels.get(key);
if (Arrays.asList("PSUBSCRIBE", "SUBSCRIBE").contains(d.getCommand().getName())) { if (Arrays.asList(RedisCommands.PSUBSCRIBE.getName(), RedisCommands.SUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key); pubSubChannels.remove(key);
pubSubMessageDecoders.put(channelName, d.getMessageDecoder()); pubSubMessageDecoders.put(channelName, d.getMessageDecoder());
} }
if (Arrays.asList("PUNSUBSCRIBE", "UNSUBSCRIBE").contains(d.getCommand().getName())) { if (Arrays.asList(RedisCommands.PUNSUBSCRIBE.getName(), RedisCommands.UNSUBSCRIBE.getName()).contains(d.getCommand().getName())) {
pubSubChannels.remove(key); pubSubChannels.remove(key);
pubSubMessageDecoders.remove(channelName); pubSubMessageDecoders.remove(channelName);
} }
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol;


import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;


Expand Down Expand Up @@ -91,14 +90,14 @@ public String toString() {


@Override @Override
public List<CommandData<Object, Object>> getPubSubOperations() { public List<CommandData<Object, Object>> getPubSubOperations() {
if (PUBSUB_COMMANDS.contains(getCommand().getName())) { if (RedisCommands.PUBSUB_COMMANDS.contains(getCommand().getName())) {
return Collections.singletonList((CommandData<Object, Object>)this); return Collections.singletonList((CommandData<Object, Object>)this);
} }
return Collections.emptyList(); return Collections.emptyList();
} }


public boolean isBlockingCommand() { public boolean isBlockingCommand() {
return QueueCommand.TIMEOUTLESS_COMMANDS.contains(command.getName()) && !promise.isDone(); return RedisCommands.BLOCKING_COMMANDS.contains(command.getName()) && !promise.isDone();
} }


} }
Expand Up @@ -58,7 +58,7 @@ public boolean isNoResult() {
public List<CommandData<Object, Object>> getPubSubOperations() { public List<CommandData<Object, Object>> getPubSubOperations() {
List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>(); List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>();
for (CommandData<?, ?> commandData : commands) { for (CommandData<?, ?> commandData : commands) {
if (PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) { if (RedisCommands.PUBSUB_COMMANDS.equals(commandData.getCommand().getName())) {
result.add((CommandData<Object, Object>)commandData); result.add((CommandData<Object, Object>)commandData);
} }
} }
Expand Down
Expand Up @@ -15,23 +15,15 @@
*/ */
package org.redisson.client.protocol; package org.redisson.client.protocol;


import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;


/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
* *
*/ */
public interface QueueCommand { public interface QueueCommand {

Set<String> PUBSUB_COMMANDS = new HashSet<String>(Arrays.asList("PSUBSCRIBE", "SUBSCRIBE", "PUNSUBSCRIBE", "UNSUBSCRIBE"));


Set<String> TIMEOUTLESS_COMMANDS = new HashSet<String>(Arrays.asList(RedisCommands.BLPOP_VALUE.getName(),
RedisCommands.BRPOP_VALUE.getName(), RedisCommands.BRPOPLPUSH.getName()));

List<CommandData<Object, Object>> getPubSubOperations(); List<CommandData<Object, Object>> getPubSubOperations();


boolean tryFailure(Throwable cause); boolean tryFailure(Throwable cause);
Expand Down
Expand Up @@ -22,6 +22,12 @@
import org.redisson.client.protocol.convertor.EmptyConvertor; import org.redisson.client.protocol.convertor.EmptyConvertor;
import org.redisson.client.protocol.decoder.MultiDecoder; import org.redisson.client.protocol.decoder.MultiDecoder;


/**
*
* @author Nikita Koksharov
*
* @param <R> return type
*/
public class RedisCommand<R> { public class RedisCommand<R> {


public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING} public enum ValueType {OBJECT, OBJECTS, MAP_VALUE, MAP_KEY, MAP, BINARY, STRING}
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.redisson.client.protocol; package org.redisson.client.protocol;


import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
Expand All @@ -41,7 +42,6 @@
import org.redisson.client.protocol.convertor.VoidReplayConvertor; import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ClusterNodesDecoder; import org.redisson.client.protocol.decoder.ClusterNodesDecoder;
import org.redisson.client.protocol.decoder.KeyValueObjectDecoder; import org.redisson.client.protocol.decoder.KeyValueObjectDecoder;
import org.redisson.client.protocol.decoder.ListFirstObjectDecoder;
import org.redisson.client.protocol.decoder.ListResultReplayDecoder; import org.redisson.client.protocol.decoder.ListResultReplayDecoder;
import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
Expand Down Expand Up @@ -183,6 +183,9 @@ public interface RedisCommands {
RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand<Object> BLPOP_VALUE = new RedisCommand<Object>("BLPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());
RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor()); RedisCommand<Object> BRPOP_VALUE = new RedisCommand<Object>("BRPOP", new KeyValueObjectDecoder(), new KeyValueConvertor());


Set<String> BLOCKING_COMMANDS = new HashSet<String>(
Arrays.asList(BLPOP_VALUE.getName(), BRPOP_VALUE.getName(), BRPOPLPUSH.getName()));

RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2); RedisCommand<Boolean> PFADD = new RedisCommand<Boolean>("PFADD", new BooleanReplayConvertor(), 2);
RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT"); RedisStrictCommand<Long> PFCOUNT = new RedisStrictCommand<Long>("PFCOUNT");
RedisStrictCommand<Void> PFMERGE = new RedisStrictCommand<Void>("PFMERGE", new VoidReplayConvertor()); RedisStrictCommand<Void> PFMERGE = new RedisStrictCommand<Void>("PFMERGE", new VoidReplayConvertor());
Expand Down Expand Up @@ -292,6 +295,9 @@ public interface RedisCommands {
RedisCommand<Object> PSUBSCRIBE = new RedisCommand<Object>("PSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand<Object> PSUBSCRIBE = new RedisCommand<Object>("PSUBSCRIBE", new PubSubStatusDecoder());
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());


Set<String> PUBSUB_COMMANDS = new HashSet<String>(
Arrays.asList(PSUBSCRIBE.getName(), SUBSCRIBE.getName(), PUNSUBSCRIBE.getName(), UNSUBSCRIBE.getName()));

RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder()); RedisStrictCommand<List<ClusterNodeInfo>> CLUSTER_NODES = new RedisStrictCommand<List<ClusterNodeInfo>>("CLUSTER", "NODES", new ClusterNodesDecoder());
RedisCommand<Object> TIME = new RedisCommand<Object>("TIME", new LongListObjectDecoder()); RedisCommand<Object> TIME = new RedisCommand<Object>("TIME", new LongListObjectDecoder());
RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder()); RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());
Expand Down
Expand Up @@ -20,17 +20,22 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.RedisClientResult; import org.redisson.RedisClientResult;
import org.redisson.RedissonReference;
import org.redisson.RedissonShutdownException; import org.redisson.RedissonShutdownException;
import org.redisson.SlotCallback; import org.redisson.SlotCallback;
import org.redisson.api.RFuture; import org.redisson.api.RFuture;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.RedisAskException; import org.redisson.client.RedisAskException;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
Expand All @@ -42,16 +47,20 @@
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.NodeSource; import org.redisson.connection.NodeSource;
import org.redisson.connection.PubSubConnectionEntry;
import org.redisson.connection.NodeSource.Redirect; import org.redisson.connection.NodeSource.Redirect;
import org.redisson.misc.LogHelper; import org.redisson.misc.LogHelper;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonObjectFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -62,17 +71,6 @@
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import java.util.HashMap;
import java.util.Map;
import org.redisson.RedissonReference;
import org.redisson.api.RedissonClient;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RedissonObjectFactory;


/** /**
* *
Expand Down Expand Up @@ -629,7 +627,7 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red
details.getTimeout().cancel(); details.getTimeout().cancel();


long timeoutTime = connectionManager.getConfig().getTimeout(); long timeoutTime = connectionManager.getConfig().getTimeout();
if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) { if (RedisCommands.BLOCKING_COMMANDS.contains(details.getCommand().getName())) {
Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString()); Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection, popTimeout); handleBlockingOperations(details, connection, popTimeout);
if (popTimeout == 0) { if (popTimeout == 0) {
Expand Down
Expand Up @@ -680,7 +680,7 @@ public RFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisComman
if (entry == null) { if (entry == null) {
entry = getEntry(source); entry = getEntry(source);
} }
return entry.connectionWriteOp(); return entry.connectionWriteOp(command);
} }


private MasterSlaveEntry getEntry(NodeSource source) { private MasterSlaveEntry getEntry(NodeSource source) {
Expand All @@ -707,9 +707,9 @@ public RFuture<RedisConnection> connectionReadOp(NodeSource source, RedisCommand
entry = getEntry(source.getSlot()); entry = getEntry(source.getSlot());
} }
if (source.getAddr() != null) { if (source.getAddr() != null) {
return entry.connectionReadOp(source.getAddr()); return entry.connectionReadOp(command, source.getAddr());
} }
return entry.connectionReadOp(); return entry.connectionReadOp(command);
} }


RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) { RFuture<RedisPubSubConnection> nextPubSubConnection(int slot) {
Expand Down
Expand Up @@ -32,6 +32,8 @@
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.config.ReadMode; import org.redisson.config.ReadMode;
Expand Down Expand Up @@ -255,7 +257,7 @@ private void reattachBlockingQueue(RedisConnection connection) {
return; return;
} }


RFuture<RedisConnection> newConnection = connectionReadOp(); RFuture<RedisConnection> newConnection = connectionReadOp(RedisCommands.BLPOP_VALUE);
newConnection.addListener(new FutureListener<RedisConnection>() { newConnection.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
Expand Down Expand Up @@ -387,16 +389,16 @@ public void shutdownMasterAsync() {
slaveBalancer.shutdownAsync(); slaveBalancer.shutdownAsync();
} }


public RFuture<RedisConnection> connectionWriteOp() { public RFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
return writeConnectionHolder.get(); return writeConnectionHolder.get(command);
} }


public RFuture<RedisConnection> connectionReadOp() { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
return slaveBalancer.nextConnection(); return slaveBalancer.nextConnection(command);
} }


public RFuture<RedisConnection> connectionReadOp(InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
return slaveBalancer.getConnection(addr); return slaveBalancer.getConnection(command, addr);
} }


RFuture<RedisPubSubConnection> nextPubSubConnection() { RFuture<RedisPubSubConnection> nextPubSubConnection() {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.pool.PubSubConnectionPool; import org.redisson.connection.pool.PubSubConnectionPool;
Expand Down Expand Up @@ -82,13 +83,13 @@ public void returnPubSubConnection(PubSubConnectionEntry entry) {
} }


@Override @Override
public RFuture<RedisConnection> connectionReadOp(InetSocketAddress addr) { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command, InetSocketAddress addr) {
return super.connectionWriteOp(); return super.connectionWriteOp(command);
} }


@Override @Override
public RFuture<RedisConnection> connectionReadOp() { public RFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
return super.connectionWriteOp(); return super.connectionWriteOp(command);
} }


@Override @Override
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
Expand Down Expand Up @@ -141,17 +142,17 @@ public RFuture<RedisPubSubConnection> nextPubSubConnection() {
return pubSubConnectionPool.get(); return pubSubConnectionPool.get();
} }


public RFuture<RedisConnection> getConnection(InetSocketAddress addr) { public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr); ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) { if (entry != null) {
return slaveConnectionPool.get(entry); return slaveConnectionPool.get(command, entry);
} }
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr); RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
} }


public RFuture<RedisConnection> nextConnection() { public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) {
return slaveConnectionPool.get(); return slaveConnectionPool.get(command);
} }


public void returnPubSubConnection(RedisPubSubConnection connection) { public void returnPubSubConnection(RedisPubSubConnection connection) {
Expand Down

0 comments on commit 9d1fd5d

Please sign in to comment.