diff --git a/src/main/java/org/redisson/RedissonListReactive.java b/src/main/java/org/redisson/RedissonListReactive.java index 30eb39a68e9..c58b0907853 100644 --- a/src/main/java/org/redisson/RedissonListReactive.java +++ b/src/main/java/org/redisson/RedissonListReactive.java @@ -104,7 +104,7 @@ protected void onRequest(final long n) { @Override public void onSubscribe(Subscription s) { - s.request(1); + s.request(Long.MAX_VALUE); } @Override diff --git a/src/main/java/org/redisson/RedissonMapReactiveIterator.java b/src/main/java/org/redisson/RedissonMapReactiveIterator.java index ed4c7eb1726..9b3e60fd1ed 100644 --- a/src/main/java/org/redisson/RedissonMapReactiveIterator.java +++ b/src/main/java/org/redisson/RedissonMapReactiveIterator.java @@ -60,7 +60,7 @@ protected void nextValues() { @Override public void onSubscribe(Subscription s) { - s.request(1); + s.request(Long.MAX_VALUE); } @Override @@ -81,6 +81,7 @@ public void onNext(MapScanResult res) { currentIndex--; if (currentIndex == 0) { m.onComplete(); + return; } } } diff --git a/src/main/java/org/redisson/RedissonSetReactive.java b/src/main/java/org/redisson/RedissonSetReactive.java index 75deaa66333..2cd888c7a93 100644 --- a/src/main/java/org/redisson/RedissonSetReactive.java +++ b/src/main/java/org/redisson/RedissonSetReactive.java @@ -19,19 +19,21 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.redisson.client.codec.Codec; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.convertor.BooleanReplayConvertor; import org.redisson.client.protocol.decoder.ListScanResult; -import org.redisson.core.RSet; import org.redisson.core.RSetReactive; +import reactor.rx.Stream; +import reactor.rx.subscription.ReactiveSubscription; + /** * Distributed and concurrent implementation of {@link java.util.Set} * @@ -61,75 +63,9 @@ public Publisher contains(Object o) { return commandExecutor.readObservable(getName(), codec, RedisCommands.SISMEMBER, getName(), o); } -// private ListScanResult scanIterator(InetSocketAddress client, long startPos) { -// Publisher> f = commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); -// return get(f); -// } -// -// @Override -// public Iterator iterator() { -// return new Iterator() { -// -// private List firstValues; -// private Iterator iter; -// private InetSocketAddress client; -// private long nextIterPos; -// -// private boolean currentElementRemoved; -// private boolean removeExecuted; -// private V value; -// -// @Override -// public boolean hasNext() { -// if (iter == null || !iter.hasNext()) { -// if (nextIterPos == -1) { -// return false; -// } -// long prevIterPos = nextIterPos; -// ListScanResult res = scanIterator(client, nextIterPos); -// client = res.getRedisClient(); -// if (nextIterPos == 0 && firstValues == null) { -// firstValues = res.getValues(); -// } else if (res.getValues().equals(firstValues)) { -// return false; -// } -// iter = res.getValues().iterator(); -// nextIterPos = res.getPos(); -// if (prevIterPos == nextIterPos && !removeExecuted) { -// nextIterPos = -1; -// } -// } -// return iter.hasNext(); -// } -// -// @Override -// public V next() { -// if (!hasNext()) { -// throw new NoSuchElementException("No such element at index"); -// } -// -// value = iter.next(); -// currentElementRemoved = false; -// return value; -// } -// -// @Override -// public void remove() { -// if (currentElementRemoved) { -// throw new IllegalStateException("Element been already deleted"); -// } -// if (iter == null) { -// throw new IllegalStateException(); -// } -// -// iter.remove(); -// RedissonSetReactive.this.remove(value); -// currentElementRemoved = true; -// removeExecuted = true; -// } -// -// }; -// } + private Publisher> scanIteratorReactive(InetSocketAddress client, long startPos) { + return commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos); + } @Override public Publisher add(V e) { @@ -207,8 +143,82 @@ public Publisher removeAll(Collection c) { @Override public Publisher iterator() { - // TODO Auto-generated method stub - return null; + return new Stream() { + + @Override + public void subscribe(final Subscriber t) { + t.onSubscribe(new ReactiveSubscription(this, t) { + + private List firstValues; + private long nextIterPos; + private InetSocketAddress client; + + private long currentIndex; + + @Override + protected void onRequest(final long n) { + currentIndex = n; + nextValues(); + } + + protected void nextValues() { + final ReactiveSubscription m = this; + scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber>() { + + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(ListScanResult res) { + client = res.getRedisClient(); + + long prevIterPos = nextIterPos; + if (nextIterPos == 0 && firstValues == null) { + firstValues = res.getValues(); + } else if (res.getValues().equals(firstValues)) { + m.onComplete(); + currentIndex = 0; + return; + } + + nextIterPos = res.getPos(); + if (prevIterPos == nextIterPos) { + nextIterPos = -1; + } + for (V val : res.getValues()) { + m.onNext(val); + currentIndex--; + if (currentIndex == 0) { + m.onComplete(); + return; + } + } + if (nextIterPos == -1) { + m.onComplete(); + currentIndex = 0; + } + } + + @Override + public void onError(Throwable error) { + m.onError(error); + } + + @Override + public void onComplete() { + if (currentIndex == 0) { + return; + } + nextValues(); + } + }); + } + }); + } + + }; } } diff --git a/src/test/java/org/redisson/BaseReactiveTest.java b/src/test/java/org/redisson/BaseReactiveTest.java index bfe654d3b46..481b0f58d32 100644 --- a/src/test/java/org/redisson/BaseReactiveTest.java +++ b/src/test/java/org/redisson/BaseReactiveTest.java @@ -2,15 +2,13 @@ import java.util.Iterator; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.reactivestreams.Publisher; +import org.redisson.core.RCollectionReactive; -import reactor.fn.Consumer; import reactor.rx.Streams; public abstract class BaseReactiveTest { @@ -27,6 +25,10 @@ public static void afterClass() { redisson.shutdown(); } + public Iterable sync(RCollectionReactive list) { + return Streams.create(list.iterator()).toList().poll(); + } + public Iterator toIterator(Publisher pub) { return Streams.create(pub).toList().poll().iterator(); } diff --git a/src/test/java/org/redisson/RedissonListReactiveTest.java b/src/test/java/org/redisson/RedissonListReactiveTest.java index 045c79e79e9..84137a42c96 100644 --- a/src/test/java/org/redisson/RedissonListReactiveTest.java +++ b/src/test/java/org/redisson/RedissonListReactiveTest.java @@ -1,31 +1,18 @@ package org.redisson; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.ListIterator; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; -import org.reactivestreams.Subscriber; import org.redisson.client.RedisException; import org.redisson.core.RListReactive; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import reactor.core.queue.CompletableBlockingQueue; -import reactor.fn.Consumer; import reactor.rx.Promise; -import reactor.rx.Stream; -import reactor.rx.Streams; public class RedissonListReactiveTest extends BaseReactiveTest { @@ -101,10 +88,6 @@ public void onError(Throwable error) { Assert.assertThat(sync(list), Matchers.contains(1L, 2L)); } - private Iterable sync(RListReactive list) { - return Streams.create(list.iterator()).toList().poll(); - } - @Test public void testLong() { RListReactive list = redisson.getList("list");