Skip to content

Commit

Permalink
use same node for SCAN/SSCAN/HSCAN during iteration. #230
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Sep 4, 2015
1 parent 0f663f0 commit 8bdd6cf
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 77 deletions.
6 changes: 3 additions & 3 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
Expand Up @@ -24,11 +24,11 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import org.redisson.client.WriteRedisConnectionException; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData; import org.redisson.client.protocol.CommandsData;
Expand Down Expand Up @@ -102,7 +102,7 @@ public CommandBatchExecutorService(ConnectionManager connectionManager) {


@Override @Override
protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder, protected <V, R> void async(boolean readOnlyMode, int slot, MultiDecoder<Object> messageDecoder,
Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, int attempt) { Codec codec, RedisCommand<V> command, Object[] params, Promise<R> mainPromise, RedisClient client, int attempt) {
if (executed) { if (executed) {
throw new IllegalStateException("Batch already executed!"); throw new IllegalStateException("Batch already executed!");
} }
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/redisson/CommandExecutor.java
Expand Up @@ -18,6 +18,7 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;


import org.redisson.client.RedisClient;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
Expand All @@ -32,6 +33,8 @@
//TODO ping support //TODO ping support
public interface CommandExecutor { public interface CommandExecutor {


<T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params);

<T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params); <T, R> Future<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, String script, List<Object> keys, Object ... params);


<R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params); <R, T> Future<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);
Expand Down
54 changes: 37 additions & 17 deletions src/main/java/org/redisson/CommandExecutorService.java
Expand Up @@ -26,12 +26,12 @@
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;


import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException; import org.redisson.client.RedisMovedException;
import org.redisson.client.RedisTimeoutException; import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.CommandData; import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
Expand Down Expand Up @@ -98,7 +98,7 @@ public Promise<R> setFailure(Throwable cause) {
}; };


for (Integer slot : connectionManager.getEntries().keySet()) { for (Integer slot : connectionManager.getEntries().keySet()) {
async(true, slot, null, connectionManager.getCodec(), command, params, promise, 0); async(true, slot, null, connectionManager.getCodec(), command, params, promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
Expand Down Expand Up @@ -135,7 +135,7 @@ public void operationComplete(Future<R> future) throws Exception {
}); });


Integer slot = slots.remove(0); Integer slot = slots.remove(0);
async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, 0); async(true, slot, null, connectionManager.getCodec(), command, params, attemptPromise, null, 0);
} }


public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) { public <T> Future<Void> writeAllAsync(RedisCommand<T> command, Object ... params) {
Expand Down Expand Up @@ -172,7 +172,7 @@ public Promise<T> setFailure(Throwable cause) {
} }
}; };
for (Integer slot : connectionManager.getEntries().keySet()) { for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, 0); async(readOnlyMode, slot, null, connectionManager.getCodec(), command, params, promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
Expand All @@ -196,10 +196,22 @@ public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object ..
return get(res); return get(res);
} }


public <T, R> R read(RedisClient client, String key, RedisCommand<T> command, Object ... params) {
Future<R> res = readAsync(client, key, connectionManager.getCodec(), command, params);
return get(res);
}

public <T, R> Future<R> readAsync(RedisClient client, String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, client, 0);
return mainPromise;
}

public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(true, slot, null, codec, command, params, mainPromise, 0); async(true, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }


Expand All @@ -210,7 +222,7 @@ public <T, R> R write(Integer slot, Codec codec, RedisCommand<T> command, Object


public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(Integer slot, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
async(false, slot, null, codec, command, params, mainPromise, 0); async(false, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }


Expand Down Expand Up @@ -328,7 +340,7 @@ public Promise<T> setFailure(Throwable cause) {
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
for (Integer slot : connectionManager.getEntries().keySet()) { for (Integer slot : connectionManager.getEntries().keySet()) {
async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, 0); async(readOnlyMode, slot, null, connectionManager.getCodec(), command, args.toArray(), promise, null, 0);
} }
return mainPromise; return mainPromise;
} }
Expand All @@ -341,7 +353,7 @@ private <T, R> Future<R> evalAsync(boolean readOnlyMode, String key, Codec codec
args.addAll(keys); args.addAll(keys);
args.addAll(Arrays.asList(params)); args.addAll(Arrays.asList(params));
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, 0); async(readOnlyMode, slot, null, codec, evalCommandType, args.toArray(), mainPromise, null, 0);
return mainPromise; return mainPromise;
} }


Expand Down Expand Up @@ -371,12 +383,12 @@ public <T, R> R write(String key, Codec codec, RedisCommand<T> command, Object .
public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise(); Promise<R> mainPromise = connectionManager.newPromise();
int slot = connectionManager.calcSlot(key); int slot = connectionManager.calcSlot(key);
async(false, slot, null, codec, command, params, mainPromise, 0); async(false, slot, null, codec, command, params, mainPromise, null, 0);
return mainPromise; return mainPromise;
} }


protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command, protected <V, R> void async(final boolean readOnlyMode, final int slot, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) { final Object[] params, final Promise<R> mainPromise, final RedisClient client, final int attempt) {
if (!connectionManager.getShutdownLatch().acquire()) { if (!connectionManager.getShutdownLatch().acquire()) {
mainPromise.setFailure(new IllegalStateException("Redisson is shutdown")); mainPromise.setFailure(new IllegalStateException("Redisson is shutdown"));
return; return;
Expand All @@ -400,18 +412,22 @@ public void run(Timeout timeout) throws Exception {
} }


int count = attempt + 1; int count = attempt + 1;
async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, count); async(readOnlyMode, slot, messageDecoder, codec, command, params, mainPromise, client, count);
} }
}; };


try { try {
org.redisson.client.RedisConnection connection; RedisConnection connection;
if (readOnlyMode) { if (readOnlyMode) {
connection = connectionManager.connectionReadOp(slot); if (client != null) {
connection = connectionManager.connectionReadOp(slot, client);
} else {
connection = connectionManager.connectionReadOp(slot);
}
} else { } else {
connection = connectionManager.connectionWriteOp(slot); connection = connectionManager.connectionWriteOp(slot);
} }
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr()); log.debug("getting connection for command {} from slot {} using node {}", command, slot, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params)); ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));


ex.set(new RedisTimeoutException()); ex.set(new RedisTimeoutException());
Expand Down Expand Up @@ -449,12 +465,16 @@ public void operationComplete(Future<R> future) throws Exception {
if (future.cause() instanceof RedisMovedException) { if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause(); RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS); connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt); async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, client, attempt);
return; return;
} }


if (future.isSuccess()) { if (future.isSuccess()) {
mainPromise.setSuccess(future.getNow()); R res = future.getNow();
if (res instanceof RedisClientResult) {
((RedisClientResult)res).setRedisClient(client);
}
mainPromise.setSuccess(res);
} else { } else {
mainPromise.setFailure(future.cause()); mainPromise.setFailure(future.cause());
} }
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/redisson/RedisClientResult.java
@@ -0,0 +1,11 @@
package org.redisson;

import org.redisson.client.RedisClient;

public interface RedisClientResult {

void setRedisClient(RedisClient client);

RedisClient getRedisClient();

}
11 changes: 8 additions & 3 deletions src/main/java/org/redisson/RedissonMap.java
Expand Up @@ -27,6 +27,7 @@
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;


import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType; import org.redisson.client.protocol.RedisCommand.ValueType;
Expand Down Expand Up @@ -277,15 +278,16 @@ public long fastRemove(K ... keys) {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }


private MapScanResult<Object, V> scanIterator(long startPos) { private MapScanResult<Object, V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(getName(), RedisCommands.HSCAN, getName(), startPos); return commandExecutor.read(client, getName(), RedisCommands.HSCAN, getName(), startPos);
} }


private Iterator<Map.Entry<K, V>> iterator() { private Iterator<Map.Entry<K, V>> iterator() {
return new Iterator<Map.Entry<K, V>>() { return new Iterator<Map.Entry<K, V>>() {


private Iterator<Map.Entry<K, V>> iter; private Iterator<Map.Entry<K, V>> iter;
private long iterPos = 0; private long iterPos = 0;
private RedisClient client;


private boolean removeExecuted; private boolean removeExecuted;
private Map.Entry<K,V> value; private Map.Entry<K,V> value;
Expand All @@ -294,7 +296,10 @@ private Iterator<Map.Entry<K, V>> iterator() {
public boolean hasNext() { public boolean hasNext() {
if (iter == null if (iter == null
|| (!iter.hasNext() && iterPos != 0)) { || (!iter.hasNext() && iterPos != 0)) {
MapScanResult<Object, V> res = scanIterator(iterPos); MapScanResult<Object, V> res = scanIterator(client, iterPos);
if (iter == null) {
client = res.getRedisClient();
}
iter = ((Map<K, V>)res.getMap()).entrySet().iterator(); iter = ((Map<K, V>)res.getMap()).entrySet().iterator();
iterPos = res.getPos(); iterPos = res.getPos();
} }
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/redisson/RedissonSet.java
Expand Up @@ -22,6 +22,7 @@
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;


import org.redisson.client.RedisClient;
import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
Expand Down Expand Up @@ -68,15 +69,16 @@ public Future<Boolean> containsAsync(Object o) {
return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o); return commandExecutor.readAsync(getName(), RedisCommands.SISMEMBER, getName(), o);
} }


private ListScanResult<V> scanIterator(long startPos) { private ListScanResult<V> scanIterator(RedisClient client, long startPos) {
return commandExecutor.read(getName(), RedisCommands.SSCAN, getName(), startPos); return commandExecutor.read(client, getName(), RedisCommands.SSCAN, getName(), startPos);
} }


@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return new Iterator<V>() { return new Iterator<V>() {


private Iterator<V> iter; private Iterator<V> iter;
private RedisClient client;
private Long iterPos; private Long iterPos;


private boolean removeExecuted; private boolean removeExecuted;
Expand All @@ -85,11 +87,12 @@ public Iterator<V> iterator() {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (iter == null) { if (iter == null) {
ListScanResult<V> res = scanIterator(0); ListScanResult<V> res = scanIterator(null, 0);
client = res.getRedisClient();
iter = res.getValues().iterator(); iter = res.getValues().iterator();
iterPos = res.getPos(); iterPos = res.getPos();
} else if (!iter.hasNext() && iterPos != 0) { } else if (!iter.hasNext() && iterPos != 0) {
ListScanResult<V> res = scanIterator(iterPos); ListScanResult<V> res = scanIterator(client, iterPos);
iter = res.getValues().iterator(); iter = res.getValues().iterator();
iterPos = res.getPos(); iterPos = res.getPos();
} }
Expand Down
Expand Up @@ -17,10 +17,14 @@


import java.util.List; import java.util.List;


public class ListScanResult<V> { import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;

public class ListScanResult<V> implements RedisClientResult {


private final Long pos; private final Long pos;
private final List<V> values; private final List<V> values;
private RedisClient client;


public ListScanResult(Long pos, List<V> values) { public ListScanResult(Long pos, List<V> values) {
this.pos = pos; this.pos = pos;
Expand All @@ -35,4 +39,13 @@ public List<V> getValues() {
return values; return values;
} }


@Override
public void setRedisClient(RedisClient client) {
this.client = client;
}

public RedisClient getRedisClient() {
return client;
}

} }
Expand Up @@ -17,10 +17,14 @@


import java.util.Map; import java.util.Map;


public class MapScanResult<K, V> { import org.redisson.RedisClientResult;
import org.redisson.client.RedisClient;

public class MapScanResult<K, V> implements RedisClientResult {


private final Long pos; private final Long pos;
private final Map<K, V> values; private final Map<K, V> values;
private RedisClient client;


public MapScanResult(Long pos, Map<K, V> values) { public MapScanResult(Long pos, Map<K, V> values) {
super(); super();
Expand All @@ -36,4 +40,13 @@ public Map<K, V> getMap() {
return values; return values;
} }


@Override
public void setRedisClient(RedisClient client) {
this.client = client;
}

public RedisClient getRedisClient() {
return client;
}

} }

0 comments on commit 8bdd6cf

Please sign in to comment.