Skip to content

Commit

Permalink
ASK handling. #264
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 30, 2015
1 parent f8d3b56 commit 8279398
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 87 deletions.
30 changes: 19 additions & 11 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
Expand All @@ -36,6 +37,7 @@
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -102,15 +104,15 @@ public CommandBatchExecutorService(ConnectionManager connectionManager) {
}

@Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
protected <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) {
if (executed) {
throw new IllegalStateException("Batch already executed!");
}
Entry entry = commands.get(slot);
Entry entry = commands.get(nodeSource.getSlot());
if (entry == null) {
entry = new Entry();
Entry oldEntry = commands.putIfAbsent(slot, entry);
Entry oldEntry = commands.putIfAbsent(nodeSource.getSlot(), entry);
if (oldEntry != null) {
entry = oldEntry;
}
Expand Down Expand Up @@ -143,6 +145,7 @@ public Future<List<?>> executeAsync() {
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
promise.setFailure(future.cause());
commands = null;
return;
}

Expand All @@ -162,12 +165,12 @@ public void operationComplete(Future<Void> future) throws Exception {

AtomicInteger slots = new AtomicInteger(commands.size());
for (java.util.Map.Entry<Integer, Entry> e : commands.entrySet()) {
execute(e.getValue(), e.getKey(), voidPromise, slots, 0);
execute(e.getValue(), new NodeSource(e.getKey()), voidPromise, slots, 0);
}
return promise;
}

public void execute(final Entry entry, final int slot, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
public void execute(final Entry entry, final NodeSource source, final Promise<Void> mainPromise, final AtomicInteger slots, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return;
Expand All @@ -189,15 +192,15 @@ public void run(Timeout timeout) throws Exception {
attemptPromise.cancel(true);

int count = attempt + 1;
execute(entry, slot, mainPromise, slots, count);
execute(entry, source, mainPromise, slots, count);
}
};

Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
connectionFuture = connectionManager.connectionReadOp(slot, null);
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(slot, null);
connectionFuture = connectionManager.connectionWriteOp(source, null);
}

connectionFuture.addListener(new FutureListener<RedisConnection>() {
Expand Down Expand Up @@ -235,9 +238,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
});

if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
}
}
});
Expand All @@ -251,7 +254,12 @@ public void operationComplete(Future<Void> future) throws Exception {

if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
execute(entry, ex.getSlot(), mainPromise, slots, attempt);
execute(entry, new NodeSource(ex.getSlot()), mainPromise, slots, attempt);
return;
}
if (future.cause() instanceof RedisAskException) {
RedisAskException ex = (RedisAskException)future.cause();
execute(entry, new NodeSource(ex.getAddr()), mainPromise, slots, attempt);
return;
}

Expand Down
88 changes: 59 additions & 29 deletions src/main/java/org/redisson/CommandExecutorService.java
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.CommandBatchExecutorService.CommandEntry;
import org.redisson.client.RedisAskException;
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
Expand All @@ -34,10 +36,13 @@
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,7 +104,7 @@ public Promise<R> setFailure(Throwable cause) {
};

for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0);
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0);
}
return mainPromise;
}
Expand Down Expand Up @@ -136,7 +141,7 @@ public void operationComplete(Future<R> future) throws Exception {
});

ClusterSlotRange slot = slots.remove(0);
async(true, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0);
async(true, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, attemptPromise, null, 0);
}

public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
Expand Down Expand Up @@ -173,7 +178,7 @@ public Promise<T> setFailure(Throwable cause) {
}
};
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, params, promise, null, 0);
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, params, promise, null, 0);
}
return mainPromise;
}
Expand Down Expand Up @@ -215,14 +220,14 @@ public <T, R> R read(RedisClient client, String key, Codec codec, RedisCommand<T
public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, client, 0);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, client, 0);
return mainPromise;
}

public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, null, 0);
async(true, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}

Expand All @@ -233,7 +238,7 @@ public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object

public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
async(false, slot, null, codec, command, params, mainPromise, null, 0);
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}

Expand All @@ -243,25 +248,25 @@ public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ..

public <R> R write(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(false, codec, slot, operation, 0);
return async(false, codec, new NodeSource(slot), operation, 0);
}

public <R> R read(String key, Codec codec, SyncOperation<R> operation) {
int slot = connectionManager.calcSlot(key);
return async(true, codec, slot, operation, 0);
return async(true, codec, new NodeSource(slot), operation, 0);
}

private <R> R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation<R> operation, int attempt) {
private <R> R async(boolean readOnlyMode, Codec codec, NodeSource source, SyncOperation<R> operation, int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
return null;
}

try {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
connectionFuture = connectionManager.connectionReadOp(slot, null);
connectionFuture = connectionManager.connectionReadOp(source, null);
} else {
connectionFuture = connectionManager.connectionWriteOp(slot, null);
connectionFuture = connectionManager.connectionWriteOp(source, null);
}
connectionFuture.syncUninterruptibly();

Expand All @@ -270,19 +275,21 @@ private <R> R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation<R
try {
return operation.execute(codec, connection);
} catch (RedisMovedException e) {
return async(readOnlyMode, codec, e.getSlot(), operation, attempt);
return async(readOnlyMode, codec, new NodeSource(e.getSlot()), operation, attempt);
} catch (RedisAskException e) {
return async(readOnlyMode, codec, new NodeSource(e.getAddr()), operation, attempt);
} catch (RedisTimeoutException e) {
if (attempt == connectionManager.getConfig().getRetryAttempts()) {
throw e;
}
attempt++;
return async(readOnlyMode, codec, slot, operation, attempt);
return async(readOnlyMode, codec, source, operation, attempt);
} finally {
connectionManager.getShutdownLatch().release();
if (readOnlyMode) {
connectionManager.releaseRead(slot, connection);
connectionManager.releaseRead(source, connection);
} else {
connectionManager.releaseWrite(slot, connection);
connectionManager.releaseWrite(source, connection);
}
}
} catch (RedisException e) {
Expand All @@ -295,7 +302,7 @@ private <R> R async(boolean readOnlyMode, Codec codec, int slot, SyncOperation<R
Thread.currentThread().interrupt();
}
attempt++;
return async(readOnlyMode, codec, slot, operation, attempt);
return async(readOnlyMode, codec, source, operation, attempt);
}
}

Expand Down Expand Up @@ -355,7 +362,7 @@ public Promise<T> setFailure(Throwable cause) {
args.addAll(keys);
args.addAll(Arrays.asList(params));
for (ClusterSlotRange slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot.getStartSlot(), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0);
async(readOnlyMode, new NodeSource(slot.getStartSlot()), null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0);
}
return mainPromise;
}
Expand All @@ -368,7 +375,7 @@ private <T, R> Future<R> evalAsync(boolean readOnlyMode, String key, Codec codec
args.addAll(keys);
args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0);
async(readOnlyMode, new NodeSource(slot), null, codec, evalCommandType, args.toArray(), mainPromise, null, 0);
return mainPromise;
}

Expand Down Expand Up @@ -398,11 +405,11 @@ public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object .
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(false, slot, null, codec, command, params, mainPromise, null, 0);
async(false, new NodeSource(slot), null, codec, command, params, mainPromise, null, 0);
return mainPromise;
}

protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
Expand All @@ -427,7 +434,7 @@ public void run(Timeout timeout) throws Exception {
}

int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count);
async(readOnlyMode, source, messageDecoder, codec, command, params, mainPromise, client, count);
}
};

Expand All @@ -437,12 +444,12 @@ public void run(Timeout timeout) throws Exception {
Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
if (client != null) {
connectionFuture = connectionManager.connectionReadOp(slot, command, client);
connectionFuture = connectionManager.connectionReadOp(source, command, client);
} else {
connectionFuture = connectionManager.connectionReadOp(slot, command);
connectionFuture = connectionManager.connectionReadOp(source, command);
}
} else {
connectionFuture = connectionManager.connectionWriteOp(slot, command);
connectionFuture = connectionManager.connectionWriteOp(source, command);
}

connectionFuture.addListener(new FutureListener<RedisConnection>() {
Expand All @@ -463,8 +470,20 @@ public void operationComplete(Future<RedisConnection> connFuture) throws Excepti

RedisConnection connection = connFuture.getNow();

log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ChannelFuture future = null;
if (source.getAddr() != null) {
// ASK handling
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(2);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, codec, RedisCommands.ASKING, new Object[] {}));
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise();
future = connection.send(new CommandsData(main, list));
} else {
log.debug("getting connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
}

future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand All @@ -481,9 +500,9 @@ public void operationComplete(ChannelFuture future) throws Exception {
});

if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeout));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout));
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeout));
}
}
});
Expand All @@ -503,7 +522,18 @@ public void operationComplete(Future<R> future) throws Exception {

RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt);
async(readOnlyMode, new NodeSource(ex.getSlot()), messageDecoder, codec, command, params, mainPromise, client, attempt);
return;
}

if (future.cause() instanceof RedisAskException) {
if (!connectionManager.getShutdownLatch().acquire()) {
return;
}

RedisAskException ex = (RedisAskException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, new NodeSource(ex.getAddr()), messageDecoder, codec, command, params, mainPromise, client, attempt);
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonPatternTopic.java
Expand Up @@ -77,7 +77,7 @@ private int addListener(RedisPubSubListener<M> pubSubListener) {

@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonTopic.java
Expand Up @@ -92,7 +92,7 @@ private int addListener(RedisPubSubListener<M> pubSubListener) {

@Override
public void removeListener(int listenerId) {
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getEntry(name);
PubSubConnectionEntry entry = commandExecutor.getConnectionManager().getPubSubEntry(name);
if (entry == null) {
return;
}
Expand Down

0 comments on commit 8279398

Please sign in to comment.