Skip to content

Commit

Permalink
Iterator with batch size param for all collections. #1519
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 26, 2018
1 parent 7086b16 commit d090994
Show file tree
Hide file tree
Showing 28 changed files with 671 additions and 104 deletions.
84 changes: 64 additions & 20 deletions redisson/src/main/java/org/redisson/RedissonMap.java
Expand Up @@ -346,28 +346,61 @@ public Set<K> keySet() {
return keySet(null); return keySet(null);
} }


@Override
public Set<K> keySet(String pattern) { public Set<K> keySet(String pattern) {
return new KeySet(pattern); return keySet(pattern, 10);
}

@Override
public Set<K> keySet(String pattern, int count) {
return new KeySet(pattern, count);
}

@Override
public Set<K> keySet(int count) {
return keySet(null, count);
} }


@Override @Override
public Collection<V> values() { public Collection<V> values() {
return values(null); return values(null);
} }


@Override
public Collection<V> values(String keyPattern, int count) {
return new Values(keyPattern, count);
}

@Override
public Collection<V> values(String keyPattern) { public Collection<V> values(String keyPattern) {
return new Values(keyPattern); return values(keyPattern, 10);
}

@Override
public Collection<V> values(int count) {
return values(null, count);
} }


@Override @Override
public Set<java.util.Map.Entry<K, V>> entrySet() { public Set<java.util.Map.Entry<K, V>> entrySet() {
return entrySet(null); return entrySet(null);
} }


@Override
public Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern) { public Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern) {
return new EntrySet(keyPattern); return entrySet(keyPattern, 10);
}

@Override
public Set<java.util.Map.Entry<K, V>> entrySet(String keyPattern, int count) {
return new EntrySet(keyPattern, count);
} }


@Override
public Set<java.util.Map.Entry<K, V>> entrySet(int count) {
return entrySet(null, count);
}

@Override @Override
public Set<K> readAllKeySet() { public Set<K> readAllKeySet() {
return get(readAllKeySetAsync()); return get(readAllKeySetAsync());
Expand Down Expand Up @@ -1012,15 +1045,20 @@ public long fastRemove(K ... keys) {
return get(fastRemoveAsync(keys)); return get(fastRemoveAsync(keys));
} }


public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern) { public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<MapScanResult<Object, Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f);
}

public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) { if (pattern == null) {
RFuture<MapScanResult<Object, Object>> f RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos); = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "COUNT", count);
return get(f); return f;
} }
RFuture<MapScanResult<Object, Object>> f RFuture<MapScanResult<Object, Object>> f
= commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern); = commandExecutor.readAsync(client, name, codec, RedisCommands.HSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return get(f); return f;
} }


@Override @Override
Expand Down Expand Up @@ -1099,8 +1137,8 @@ public int hashCode() {
return h; return h;
} }


protected Iterator<K> keyIterator(String pattern) { protected Iterator<K> keyIterator(String pattern, int count) {
return new RedissonMapIterator<K>(RedissonMap.this, pattern) { return new RedissonMapIterator<K>(RedissonMap.this, pattern, count) {
@Override @Override
protected K getValue(java.util.Map.Entry<Object, Object> entry) { protected K getValue(java.util.Map.Entry<Object, Object> entry) {
return (K) entry.getKey(); return (K) entry.getKey();
Expand All @@ -1111,14 +1149,16 @@ protected K getValue(java.util.Map.Entry<Object, Object> entry) {
class KeySet extends AbstractSet<K> { class KeySet extends AbstractSet<K> {


private final String pattern; private final String pattern;
private final int count;


public KeySet(String pattern) { public KeySet(String pattern, int count) {
this.pattern = pattern; this.pattern = pattern;
this.count = count;
} }


@Override @Override
public Iterator<K> iterator() { public Iterator<K> iterator() {
return keyIterator(pattern); return keyIterator(pattern, count);
} }


@Override @Override
Expand Down Expand Up @@ -1150,8 +1190,8 @@ public void clear() {


} }


protected Iterator<V> valueIterator(String pattern) { protected Iterator<V> valueIterator(String pattern, int count) {
return new RedissonMapIterator<V>(RedissonMap.this, pattern) { return new RedissonMapIterator<V>(RedissonMap.this, pattern, count) {
@Override @Override
protected V getValue(java.util.Map.Entry<Object, Object> entry) { protected V getValue(java.util.Map.Entry<Object, Object> entry) {
return (V) entry.getValue(); return (V) entry.getValue();
Expand All @@ -1162,14 +1202,16 @@ protected V getValue(java.util.Map.Entry<Object, Object> entry) {
final class Values extends AbstractCollection<V> { final class Values extends AbstractCollection<V> {


private final String keyPattern; private final String keyPattern;
private final int count;


public Values(String keyPattern) { public Values(String keyPattern, int count) {
this.keyPattern = keyPattern; this.keyPattern = keyPattern;
this.count = count;
} }


@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return valueIterator(keyPattern); return valueIterator(keyPattern, count);
} }


@Override @Override
Expand Down Expand Up @@ -1197,8 +1239,8 @@ public void clear() {


} }


protected Iterator<Map.Entry<K,V>> entryIterator(String pattern) { protected Iterator<Map.Entry<K,V>> entryIterator(String pattern, int count) {
return new RedissonMapIterator<Map.Entry<K, V>>(RedissonMap.this, pattern); return new RedissonMapIterator<Map.Entry<K, V>>(RedissonMap.this, pattern, count);
} }


private void loadValue(final K key, final RPromise<V> result, final boolean replaceValue) { private void loadValue(final K key, final RPromise<V> result, final boolean replaceValue) {
Expand Down Expand Up @@ -1286,13 +1328,15 @@ public void operationComplete(Future<Void> future) throws Exception {
final class EntrySet extends AbstractSet<Map.Entry<K,V>> { final class EntrySet extends AbstractSet<Map.Entry<K,V>> {


private final String keyPattern; private final String keyPattern;
private final int count;


public EntrySet(String keyPattern) { public EntrySet(String keyPattern, int count) {
this.keyPattern = keyPattern; this.keyPattern = keyPattern;
this.count = count;
} }


public final Iterator<Map.Entry<K,V>> iterator() { public final Iterator<Map.Entry<K,V>> iterator() {
return entryIterator(keyPattern); return entryIterator(keyPattern, count);
} }


public final boolean contains(Object o) { public final boolean contains(Object o) {
Expand Down
13 changes: 7 additions & 6 deletions redisson/src/main/java/org/redisson/RedissonMapCache.java
Expand Up @@ -1208,28 +1208,29 @@ protected RFuture<Long> fastRemoveOperationAsync(K ... keys) {
} }


@Override @Override
public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern) { public MapScanResult<Object, Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
return get(scanIteratorAsync(name, client, startPos, pattern)); return get(scanIteratorAsync(name, client, startPos, pattern, count));
} }


public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern) { public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
params.add(startPos); params.add(startPos);
if (pattern != null) { if (pattern != null) {
params.add(pattern); params.add(pattern);
} }
params.add(count);


RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL", RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP); new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(codec), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN, RFuture<MapCacheScanResult<Object, Object>> f = commandExecutor.evalReadAsync(client, name, codec, EVAL_HSCAN,
"local result = {}; " "local result = {}; "
+ "local idleKeys = {}; " + "local idleKeys = {}; "
+ "local res; " + "local res; "
+ "if (#ARGV == 3) then " + "if (#ARGV == 4) then "
+ " res = redis.call('hscan', KEYS[1], ARGV[2], 'match', ARGV[3]); " + " res = redis.call('hscan', KEYS[1], ARGV[2], 'match', ARGV[3], 'count', ARGV[4]); "
+ "else " + "else "
+ " res = redis.call('hscan', KEYS[1], ARGV[2]); " + " res = redis.call('hscan', KEYS[1], ARGV[2], 'count', ARGV[3]); "
+ "end;" + "end;"
+ "local currentTime = tonumber(ARGV[1]); " + "local currentTime = tonumber(ARGV[1]); "
+ "for i, value in ipairs(res[2]) do " + "for i, value in ipairs(res[2]) do "
Expand Down
6 changes: 4 additions & 2 deletions redisson/src/main/java/org/redisson/RedissonMapIterator.java
Expand Up @@ -29,10 +29,12 @@ public class RedissonMapIterator<M> extends RedissonBaseMapIterator<M> {


private final RedissonMap map; private final RedissonMap map;
private final String pattern; private final String pattern;
private final int count;


public RedissonMapIterator(RedissonMap map, String pattern) { public RedissonMapIterator(RedissonMap map, String pattern, int count) {
this.map = map; this.map = map;
this.pattern = pattern; this.pattern = pattern;
this.count = count;
} }


@Override @Override
Expand All @@ -42,7 +44,7 @@ protected Object put(Entry<Object, Object> entry, Object value) {


@Override @Override
protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) { protected ScanResult<Entry<Object, Object>> iterator(RedisClient client, long nextIterPos) {
return map.scanIterator(map.getName(), client, nextIterPos, pattern); return map.scanIterator(map.getName(), client, nextIterPos, pattern, count);
} }


@Override @Override
Expand Down
27 changes: 23 additions & 4 deletions redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java
Expand Up @@ -392,18 +392,37 @@ public RFuture<Integer> rankAsync(V o) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o)); return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANK_INT, getName(), encode(o));
} }


private ListScanResult<Object> scanIterator(RedisClient client, long startPos) { private ListScanResult<Object> scanIterator(RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos); if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "COUNT", count);
return get(f);
}
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.ZSCAN, getName(), startPos, "MATCH", pattern, "COUNT", count);
return get(f); return get(f);
} }


@Override @Override
public Iterator<V> iterator() { public Iterator<V> iterator() {
return iterator(null, 10);
}

@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}

@Override
public Iterator<V> iterator(int count) {
return iterator(null, count);
}

@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {


@Override @Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(client, nextIterPos); return scanIterator(client, nextIterPos, pattern, count);
} }


@Override @Override
Expand All @@ -413,7 +432,7 @@ protected void remove(Object value) {


}; };
} }

@Override @Override
public Object[] toArray() { public Object[] toArray() {
List<Object> res = (List<Object>) get(valueRangeAsync(0, -1)); List<Object> res = (List<Object>) get(valueRangeAsync(0, -1));
Expand Down
22 changes: 16 additions & 6 deletions redisson/src/main/java/org/redisson/RedissonSet.java
Expand Up @@ -92,23 +92,33 @@ public RFuture<Boolean> containsAsync(Object o) {
} }


@Override @Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) { public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
if (pattern == null) { if (pattern == null) {
RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos); RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "COUNT", count);
return get(f); return get(f);
} }


RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern); RFuture<ListScanResult<Object>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos, "MATCH", pattern, "COUNT", count);
return get(f); return get(f);
} }


@Override @Override
public Iterator<V> iterator(final String pattern) { public Iterator<V> iterator(int count) {
return iterator(null, count);
}

@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}

@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {


@Override @Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern, count);
} }


@Override @Override
Expand Down Expand Up @@ -566,7 +576,7 @@ public RLock getLock(V value) {


@Override @Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos,
String pattern) { String pattern, int count) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }


Expand Down
27 changes: 19 additions & 8 deletions redisson/src/main/java/org/redisson/RedissonSetCache.java
Expand Up @@ -121,27 +121,28 @@ public RFuture<Boolean> containsAsync(Object o) {
} }


@Override @Override
public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern) { public ListScanResult<Object> scanIterator(String name, RedisClient client, long startPos, String pattern, int count) {
RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern); RFuture<ListScanResult<Object>> f = scanIteratorAsync(name, client, startPos, pattern, count);
return get(f); return get(f);
} }


@Override @Override
public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern) { public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClient client, long startPos, String pattern, int count) {
List<Object> params = new ArrayList<Object>(); List<Object> params = new ArrayList<Object>();
params.add(startPos); params.add(startPos);
params.add(System.currentTimeMillis()); params.add(System.currentTimeMillis());
if (pattern != null) { if (pattern != null) {
params.add(pattern); params.add(pattern);
} }
params.add(count);


return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN, return commandExecutor.evalReadAsync(client, name, codec, RedisCommands.EVAL_ZSCAN,
"local result = {}; " "local result = {}; "
+ "local res; " + "local res; "
+ "if (#ARGV == 3) then " + "if (#ARGV == 4) then "
+ " res = redis.call('zscan', KEYS[1], ARGV[1], 'match', ARGV[3]); " + " res = redis.call('zscan', KEYS[1], ARGV[1], 'match', ARGV[3], 'count', ARGV[4]); "
+ "else " + "else "
+ " res = redis.call('zscan', KEYS[1], ARGV[1]); " + " res = redis.call('zscan', KEYS[1], ARGV[1], 'count', ARGV[3]); "
+ "end;" + "end;"
+ "for i, value in ipairs(res[2]) do " + "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then " + "if i % 2 == 0 then "
Expand All @@ -155,12 +156,22 @@ public RFuture<ListScanResult<Object>> scanIteratorAsync(String name, RedisClien
} }


@Override @Override
public Iterator<V> iterator(final String pattern) { public Iterator<V> iterator(int count) {
return iterator(null, count);
}

@Override
public Iterator<V> iterator(String pattern) {
return iterator(pattern, 10);
}

@Override
public Iterator<V> iterator(final String pattern, final int count) {
return new RedissonBaseIterator<V>() { return new RedissonBaseIterator<V>() {


@Override @Override
protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) { protected ListScanResult<Object> iterator(RedisClient client, long nextIterPos) {
return scanIterator(getName(), client, nextIterPos, pattern); return scanIterator(getName(), client, nextIterPos, pattern, count);
} }


@Override @Override
Expand Down

0 comments on commit d090994

Please sign in to comment.