Skip to content

Commit

Permalink
RedissonSetReactiveTest added. #210
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 30, 2015
1 parent 5092e81 commit 29a335c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 96 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/RedissonListReactive.java
Expand Up @@ -104,7 +104,7 @@ protected void onRequest(final long n) {

@Override
public void onSubscribe(Subscription s) {
s.request(1);
s.request(Long.MAX_VALUE);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/RedissonMapReactiveIterator.java
Expand Up @@ -60,7 +60,7 @@ protected void nextValues() {

@Override
public void onSubscribe(Subscription s) {
s.request(1);
s.request(Long.MAX_VALUE);
}

@Override
Expand All @@ -81,6 +81,7 @@ public void onNext(MapScanResult<Object, V> res) {
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
}
}
Expand Down
158 changes: 84 additions & 74 deletions src/main/java/org/redisson/RedissonSetReactive.java
Expand Up @@ -19,19 +19,21 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.core.RSet;
import org.redisson.core.RSetReactive;

import reactor.rx.Stream;
import reactor.rx.subscription.ReactiveSubscription;

/**
* Distributed and concurrent implementation of {@link java.util.Set}
*
Expand Down Expand Up @@ -61,75 +63,9 @@ public Publisher<Boolean> contains(Object o) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.SISMEMBER, getName(), o);
}

// private ListScanResult<V> scanIterator(InetSocketAddress client, long startPos) {
// Publisher<ListScanResult<V>> f = commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
// return get(f);
// }
//
// @Override
// public Iterator<V> iterator() {
// return new Iterator<V>() {
//
// private List<V> firstValues;
// private Iterator<V> iter;
// private InetSocketAddress client;
// private long nextIterPos;
//
// private boolean currentElementRemoved;
// private boolean removeExecuted;
// private V value;
//
// @Override
// public boolean hasNext() {
// if (iter == null || !iter.hasNext()) {
// if (nextIterPos == -1) {
// return false;
// }
// long prevIterPos = nextIterPos;
// ListScanResult<V> res = scanIterator(client, nextIterPos);
// client = res.getRedisClient();
// if (nextIterPos == 0 && firstValues == null) {
// firstValues = res.getValues();
// } else if (res.getValues().equals(firstValues)) {
// return false;
// }
// iter = res.getValues().iterator();
// nextIterPos = res.getPos();
// if (prevIterPos == nextIterPos && !removeExecuted) {
// nextIterPos = -1;
// }
// }
// return iter.hasNext();
// }
//
// @Override
// public V next() {
// if (!hasNext()) {
// throw new NoSuchElementException("No such element at index");
// }
//
// value = iter.next();
// currentElementRemoved = false;
// return value;
// }
//
// @Override
// public void remove() {
// if (currentElementRemoved) {
// throw new IllegalStateException("Element been already deleted");
// }
// if (iter == null) {
// throw new IllegalStateException();
// }
//
// iter.remove();
// RedissonSetReactive.this.remove(value);
// currentElementRemoved = true;
// removeExecuted = true;
// }
//
// };
// }
private Publisher<ListScanResult<V>> scanIteratorReactive(InetSocketAddress client, long startPos) {
return commandExecutor.readObservable(client, getName(), codec, RedisCommands.SSCAN, getName(), startPos);
}

@Override
public Publisher<Long> add(V e) {
Expand Down Expand Up @@ -207,8 +143,82 @@ public Publisher<Boolean> removeAll(Collection<?> c) {

@Override
public Publisher<V> iterator() {
// TODO Auto-generated method stub
return null;
return new Stream<V>() {

@Override
public void subscribe(final Subscriber<? super V> t) {
t.onSubscribe(new ReactiveSubscription<V>(this, t) {

private List<V> firstValues;
private long nextIterPos;
private InetSocketAddress client;

private long currentIndex;

@Override
protected void onRequest(final long n) {
currentIndex = n;
nextValues();
}

protected void nextValues() {
final ReactiveSubscription<V> m = this;
scanIteratorReactive(client, nextIterPos).subscribe(new Subscriber<ListScanResult<V>>() {

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(ListScanResult<V> res) {
client = res.getRedisClient();

long prevIterPos = nextIterPos;
if (nextIterPos == 0 && firstValues == null) {
firstValues = res.getValues();
} else if (res.getValues().equals(firstValues)) {
m.onComplete();
currentIndex = 0;
return;
}

nextIterPos = res.getPos();
if (prevIterPos == nextIterPos) {
nextIterPos = -1;
}
for (V val : res.getValues()) {
m.onNext(val);
currentIndex--;
if (currentIndex == 0) {
m.onComplete();
return;
}
}
if (nextIterPos == -1) {
m.onComplete();
currentIndex = 0;
}
}

@Override
public void onError(Throwable error) {
m.onError(error);
}

@Override
public void onComplete() {
if (currentIndex == 0) {
return;
}
nextValues();
}
});
}
});
}

};
}

}
8 changes: 5 additions & 3 deletions src/test/java/org/redisson/BaseReactiveTest.java
Expand Up @@ -2,15 +2,13 @@

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.reactivestreams.Publisher;
import org.redisson.core.RCollectionReactive;

import reactor.fn.Consumer;
import reactor.rx.Streams;

public abstract class BaseReactiveTest {
Expand All @@ -27,6 +25,10 @@ public static void afterClass() {
redisson.shutdown();
}

public <V> Iterable<V> sync(RCollectionReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
}

public <V> Iterator<V> toIterator(Publisher<V> pub) {
return Streams.create(pub).toList().poll().iterator();
}
Expand Down
17 changes: 0 additions & 17 deletions src/test/java/org/redisson/RedissonListReactiveTest.java
@@ -1,31 +1,18 @@
package org.redisson;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.redisson.client.RedisException;
import org.redisson.core.RListReactive;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.core.queue.CompletableBlockingQueue;
import reactor.fn.Consumer;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;

public class RedissonListReactiveTest extends BaseReactiveTest {

Expand Down Expand Up @@ -101,10 +88,6 @@ public void onError(Throwable error) {
Assert.assertThat(sync(list), Matchers.contains(1L, 2L));
}

private <V> Iterable<V> sync(RListReactive<V> list) {
return Streams.create(list.iterator()).toList().poll();
}

@Test
public void testLong() {
RListReactive<Long> list = redisson.getList("list");
Expand Down

0 comments on commit 29a335c

Please sign in to comment.