Skip to content

Commit

Permalink
Few iterator improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 25, 2016
1 parent d9f2efb commit c9ac67e
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 30 deletions.
87 changes: 67 additions & 20 deletions src/main/java/org/redisson/RedissonBaseIterator.java
Expand Up @@ -16,6 +16,7 @@
package org.redisson; package org.redisson;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
Expand All @@ -25,42 +26,87 @@
abstract class RedissonBaseIterator<V> implements Iterator<V> { abstract class RedissonBaseIterator<V> implements Iterator<V> {


private List<V> firstValues; private List<V> firstValues;
private Iterator<V> iter; private List<V> lastValues;
private InetSocketAddress client; private Iterator<V> lastIter;
private long nextIterPos; protected long nextIterPos;
private long startPos = -1; protected InetSocketAddress client;


private boolean finished;
private boolean currentElementRemoved; private boolean currentElementRemoved;
private boolean removeExecuted; private boolean removeExecuted;
private V value; private V value;


@Override @Override
public boolean hasNext() { public boolean hasNext() {
if (iter == null || !iter.hasNext()) { if (lastIter == null || !lastIter.hasNext()) {
if (nextIterPos == -1) { if (finished) {
return false;
currentElementRemoved = false;
removeExecuted = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;

if (!tryAgain()) {
return false;
}
finished = false;
} }
long prevIterPos; long prevIterPos;
do { do {
prevIterPos = nextIterPos; prevIterPos = nextIterPos;
ListScanResult<V> res = iterator(client, nextIterPos); ListScanResult<V> res = iterator(client, nextIterPos);
lastValues = new ArrayList<V>(res.getValues());
client = res.getRedisClient(); client = res.getRedisClient();
if (startPos == -1) {
startPos = res.getPos();
}
if (nextIterPos == 0 && firstValues == null) { if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues(); firstValues = lastValues;
} else if (res.getValues().equals(firstValues) && res.getPos() == startPos) { lastValues = null;
return false; if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
}
} else {
if (firstValues.isEmpty()) {
firstValues = lastValues;
lastValues = null;
if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue;
}
} else if (lastValues.removeAll(firstValues)) {
currentElementRemoved = false;
removeExecuted = false;
client = null;
firstValues = null;
lastValues = null;
nextIterPos = 0;
prevIterPos = -1;
if (tryAgain()) {
continue;
}
finished = true;
return false;
}
} }
iter = res.getValues().iterator(); lastIter = res.getValues().iterator();
nextIterPos = res.getPos(); nextIterPos = res.getPos();
} while (!iter.hasNext() && nextIterPos != prevIterPos); } while (!lastIter.hasNext() && nextIterPos != prevIterPos);
if (prevIterPos == nextIterPos && !removeExecuted) { if (prevIterPos == nextIterPos && !removeExecuted) {
nextIterPos = -1; finished = true;
} }
} }
return iter.hasNext(); return lastIter.hasNext();
}

protected boolean tryAgain() {
return false;
} }


abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos); abstract ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos);
Expand All @@ -71,7 +117,7 @@ public V next() {
throw new NoSuchElementException("No such element"); throw new NoSuchElementException("No such element");
} }


value = iter.next(); value = lastIter.next();
currentElementRemoved = false; currentElementRemoved = false;
return value; return value;
} }
Expand All @@ -81,11 +127,12 @@ public void remove() {
if (currentElementRemoved) { if (currentElementRemoved) {
throw new IllegalStateException("Element been already deleted"); throw new IllegalStateException("Element been already deleted");
} }
if (iter == null) { if (lastIter == null) {
throw new IllegalStateException(); throw new IllegalStateException();
} }


iter.remove(); firstValues.remove(value);
lastIter.remove();
remove(value); remove(value);
currentElementRemoved = true; currentElementRemoved = true;
removeExecuted = true; removeExecuted = true;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/redisson/RedissonBaseMapIterator.java
Expand Up @@ -75,13 +75,18 @@ public boolean hasNext() {
if (firstValues.isEmpty() && tryAgain()) { if (firstValues.isEmpty() && tryAgain()) {
client = null; client = null;
firstValues = null; firstValues = null;
nextIterPos = 0;
prevIterPos = -1; prevIterPos = -1;
} }
} else { } else {
if (firstValues.isEmpty()) { if (firstValues.isEmpty()) {
firstValues = lastValues; firstValues = lastValues;
lastValues = null; lastValues = null;
if (firstValues.isEmpty() && tryAgain()) { if (firstValues.isEmpty() && tryAgain()) {
client = null;
firstValues = null;
nextIterPos = 0;
prevIterPos = -1;
continue; continue;
} }
} else if (lastValues.keySet().removeAll(firstValues.keySet())) { } else if (lastValues.keySet().removeAll(firstValues.keySet())) {
Expand Down
33 changes: 25 additions & 8 deletions src/main/java/org/redisson/RedissonSet.java
Expand Up @@ -80,8 +80,8 @@ protected String getName(Object o) {
return getName(); return getName();
} }


private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) { ListScanResult<V> scanIterator(String name, InetSocketAddress client, long startPos) {
Future<ListScanResult<V>> f = commandExecutor.readAsync(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); Future<ListScanResult<V>> f = commandExecutor.readAsync(client, name, codec, RedisCommands.SSCAN, name, startPos);
return get(f); return get(f);
} }


Expand All @@ -91,7 +91,7 @@ public Iterator<V> iterator() {


@Override @Override
ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) { ListScanResult<V> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos); return scanIterator(getName(), client, nextIterPos);
} }


@Override @Override
Expand Down Expand Up @@ -185,10 +185,6 @@ public Future<Boolean> containsAllAsync(Collection<?> c) {


@Override @Override
public boolean addAll(Collection<? extends V> c) { public boolean addAll(Collection<? extends V> c) {
if (c.isEmpty()) {
return false;
}

return get(addAllAsync(c)); return get(addAllAsync(c));
} }


Expand Down Expand Up @@ -229,7 +225,11 @@ public Future<Boolean> removeAllAsync(Collection<?> c) {
return newSucceededFuture(false); return newSucceededFuture(false);
} }


return commandExecutor.writeAsync(getName(), codec, RedisCommands.SREM_SINGLE, getName(), c.toArray()); List<Object> args = new ArrayList<Object>(c.size() + 1);
args.add(getName());
args.addAll(c);

return commandExecutor.writeAsync(getName(), codec, RedisCommands.SREM_SINGLE, c.toArray());
} }


@Override @Override
Expand Down Expand Up @@ -268,4 +268,21 @@ public void clear() {
delete(); delete();
} }


@Override
public String toString() {
Iterator<V> it = iterator();
if (! it.hasNext())
return "[]";

StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
V e = it.next();
sb.append(e == this ? "(this Collection)" : e);
if (! it.hasNext())
return sb.append(']').toString();
sb.append(',').append(' ');
}
}

} }
Expand Up @@ -99,7 +99,7 @@ public boolean cancel() {
} }
}; };


protected static final int MAX_SLOT = 16384; public static final int MAX_SLOT = 16384;


protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT); protected final ClusterSlotRange singleSlotRange = new ClusterSlotRange(0, MAX_SLOT);


Expand All @@ -121,7 +121,7 @@ public boolean cancel() {


protected boolean isClusterMode; protected boolean isClusterMode;


protected final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); private final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();


private final Promise<Boolean> shutdownPromise; private final Promise<Boolean> shutdownPromise;


Expand Down

0 comments on commit c9ac67e

Please sign in to comment.