Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 5, 2018
1 parent 3de7f75 commit 0e78d16
Show file tree
Hide file tree
Showing 21 changed files with 186 additions and 400 deletions.
22 changes: 13 additions & 9 deletions redisson/src/main/java/org/redisson/RedissonList.java
Expand Up @@ -148,11 +148,13 @@ public RFuture<Boolean> removeAsync(Object o) {
return removeAsync(o, 1);
}

protected RFuture<Boolean> removeAsync(Object o, int count) {
@Override
public RFuture<Boolean> removeAsync(Object o, int count) {
return commandExecutor.writeAsync(getName(), codec, LREM_SINGLE, getName(), count, encode(o));
}

protected boolean remove(Object o, int count) {
@Override
public boolean remove(Object o, int count) {
return get(removeAsync(o, count));
}

Expand Down Expand Up @@ -391,17 +393,19 @@ public RFuture<Void> fastSetAsync(int index, V element) {
public void add(int index, V element) {
addAll(index, Collections.singleton(element));
}

@Override
public V remove(int index) {
return remove((long) index);
public RFuture<Boolean> addAsync(int index, V element) {
return addAllAsync(index, Collections.singleton(element));
}

public V remove(long index) {

@Override
public V remove(int index) {
return get(removeAsync(index));
}

public RFuture<V> removeAsync(long index) {
@Override
public RFuture<V> removeAsync(int index) {
if (index == 0) {
return commandExecutor.writeAsync(getName(), codec, LPOP, getName());
}
Expand All @@ -421,7 +425,7 @@ public void fastRemove(int index) {
}

@Override
public RFuture<Void> fastRemoveAsync(long index) {
public RFuture<Void> fastRemoveAsync(int index) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('lset', KEYS[1], ARGV[1], 'DELETED_BY_REDISSON');" +
"redis.call('lrem', KEYS[1], 1, 'DELETED_BY_REDISSON');",
Expand Down
Expand Up @@ -187,6 +187,11 @@ public boolean add(V e) {
public RFuture<Boolean> addAsync(V e) {
return list.addAsync(e);
}

@Override
public RFuture<Boolean> addAsync(int index, V element) {
return list.addAsync(index, element);
}

@Override
public boolean remove(Object o) {
Expand All @@ -198,7 +203,8 @@ public RFuture<Boolean> removeAsync(Object o) {
return removeAsync(o, 1);
}

protected RFuture<Boolean> removeAsync(Object o, int count) {
@Override
public RFuture<Boolean> removeAsync(Object o, int count) {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); "
Expand All @@ -213,7 +219,8 @@ protected RFuture<Boolean> removeAsync(Object o, int count) {
System.currentTimeMillis(), count, encodeMapKey(key), encodeMapValue(o));
}

protected boolean remove(Object o, int count) {
@Override
public boolean remove(Object o, int count) {
return get(removeAsync(o, count));
}

Expand Down Expand Up @@ -438,7 +445,7 @@ public V remove(int index) {
}

@Override
public RFuture<V> removeAsync(long index) {
public RFuture<V> removeAsync(int index) {
return list.removeAsync(index);
}

Expand All @@ -448,7 +455,7 @@ public void fastRemove(int index) {
}

@Override
public RFuture<Void> fastRemoveAsync(long index) {
public RFuture<Void> fastRemoveAsync(int index) {
return list.fastRemoveAsync(index);
}

Expand Down
14 changes: 7 additions & 7 deletions redisson/src/main/java/org/redisson/RedissonReactive.java
Expand Up @@ -60,7 +60,6 @@
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.TransactionOptions;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.command.CommandReactiveService;
import org.redisson.config.Config;
Expand Down Expand Up @@ -266,14 +265,14 @@ public <K, V> RMapReactive<K, V> getMap(String name, Codec codec) {
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(commandExecutor, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}

@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetReactive<V>(commandExecutor, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}

@Override
Expand All @@ -290,8 +289,9 @@ public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec cod

@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return ReactiveProxyBuilder.create(commandExecutor, new RedissonLexSortedSet(commandExecutor, name, null),
new RedissonLexSortedSetReactive(commandExecutor, new RedissonScoredSortedSetReactive<String>(StringCodec.INSTANCE, commandExecutor, name)),
RedissonLexSortedSet set = new RedissonLexSortedSet(commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonLexSortedSetReactive(set),
RLexSortedSetReactive.class);
}

Expand Down Expand Up @@ -355,14 +355,14 @@ public <V> RDequeReactive<V> getDeque(String name, Codec codec) {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}

@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, commandExecutor, name, null);
return ReactiveProxyBuilder.create(commandExecutor, set,
new RedissonSetCacheReactive<V>(commandExecutor, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}

@Override
Expand Down
Expand Up @@ -96,7 +96,7 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if an element was added
* and <code>false</code> if it is already present
*/
Publisher<Integer> add(V e);
Publisher<Boolean> add(V e);

/**
* Adds all elements contained in the specified collection
Expand All @@ -105,7 +105,7 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Publisher<Integer> addAll(Publisher<? extends V> c);
Publisher<Boolean> addAll(Publisher<? extends V> c);

/**
* Adds all elements contained in the specified collection
Expand All @@ -114,6 +114,6 @@ public interface RCollectionReactive<V> extends RExpirableReactive {
* @return <code>true</code> if at least one element was added
* and <code>false</code> if all elements are already present
*/
Publisher<Integer> addAll(Collection<? extends V> c);
Publisher<Boolean> addAll(Collection<? extends V> c);

}
2 changes: 2 additions & 0 deletions redisson/src/main/java/org/redisson/api/RList.java
Expand Up @@ -99,4 +99,6 @@ public interface RList<V> extends List<V>, RExpirable, RListAsync<V>, RSortable<
*/
void fastRemove(int index);

boolean remove(Object o, int count);

}
8 changes: 6 additions & 2 deletions redisson/src/main/java/org/redisson/api/RListAsync.java
Expand Up @@ -54,6 +54,8 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
*/
RFuture<Integer> addBeforeAsync(V elementToFind, V element);

RFuture<Boolean> addAsync(int index, V element);

RFuture<Boolean> addAllAsync(int index, Collection<? extends V> coll);

RFuture<Integer> lastIndexOfAsync(Object o);
Expand Down Expand Up @@ -92,8 +94,10 @@ public interface RListAsync<V> extends RCollectionAsync<V>, RSortableAsync<List<
*/
RFuture<Void> trimAsync(int fromIndex, int toIndex);

RFuture<Void> fastRemoveAsync(long index);
RFuture<Void> fastRemoveAsync(int index);

RFuture<V> removeAsync(long index);
RFuture<V> removeAsync(int index);

RFuture<Boolean> removeAsync(Object o, int count);

}
18 changes: 9 additions & 9 deletions redisson/src/main/java/org/redisson/api/RListReactive.java
Expand Up @@ -62,21 +62,21 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact

Publisher<V> iterator(int startIndex);

Publisher<Long> lastIndexOf(Object o);
Publisher<Integer> lastIndexOf(Object o);

Publisher<Long> indexOf(Object o);
Publisher<Integer> indexOf(Object o);

Publisher<Integer> add(long index, V element);
Publisher<Void> add(int index, V element);

Publisher<Integer> addAll(long index, Collection<? extends V> coll);
Publisher<Boolean> addAll(int index, Collection<? extends V> coll);

Publisher<Void> fastSet(long index, V element);
Publisher<Void> fastSet(int index, V element);

Publisher<V> set(long index, V element);
Publisher<V> set(int index, V element);

Publisher<V> get(long index);
Publisher<V> get(int index);

Publisher<V> remove(long index);
Publisher<V> remove(int index);

/**
* Read all elements at once
Expand All @@ -101,6 +101,6 @@ public interface RListReactive<V> extends RCollectionReactive<V>, RSortableReact
* @param index - index of object
* @return void
*/
Publisher<Void> fastRemove(long index);
Publisher<Void> fastRemove(int index);

}
39 changes: 17 additions & 22 deletions redisson/src/main/java/org/redisson/reactive/PublisherAdder.java
Expand Up @@ -19,7 +19,10 @@

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.redisson.api.RFuture;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.action.support.DefaultSubscriber;
Expand All @@ -32,21 +35,17 @@
*/
public abstract class PublisherAdder<V> {

public abstract Publisher<Integer> add(Object o);
public abstract RFuture<Boolean> add(Object o);

public Integer sum(Integer first, Integer second) {
return first + second;
}

public Publisher<Integer> addAll(Publisher<? extends V> c) {
final Promise<Integer> promise = Promises.prepare();
public Publisher<Boolean> addAll(Publisher<? extends V> c) {
final Promise<Boolean> promise = Promises.prepare();

c.subscribe(new DefaultSubscriber<V>() {

volatile boolean completed;
AtomicLong values = new AtomicLong();
Subscription s;
Integer lastSize = 0;
Boolean lastSize = false;

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -57,21 +56,17 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(V o) {
values.getAndIncrement();
add(o).subscribe(new DefaultSubscriber<Integer>() {

@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override
public void onError(Throwable t) {
promise.onError(t);
}

add(o).addListener(new FutureListener<Boolean>() {
@Override
public void onNext(Integer o) {
lastSize = sum(lastSize, o);
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
promise.onError(future.cause());
return;
}

if (future.getNow()) {
lastSize = true;
}
s.request(1);
if (values.decrementAndGet() == 0 && completed) {
promise.onNext(lastSize);
Expand Down
Expand Up @@ -69,6 +69,7 @@ public Object invoke(Object proxy, Method method, final Object[] args) throws Th
final Method mm = instanceMethod;
if (instanceMethod.getName().endsWith("Async")) {
return commandExecutor.reactive(new Supplier<RFuture<Object>>() {
@SuppressWarnings("unchecked")
@Override
public RFuture<Object> get() {
try {
Expand Down
Expand Up @@ -169,14 +169,14 @@ public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
public <V> RSetReactive<V> getSet(String name) {
RedissonSet<V> set = new RedissonSet<V>(executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}

@Override
public <V> RSetReactive<V> getSet(String name, Codec codec) {
RedissonSet<V> set = new RedissonSet<V>(codec, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetReactive<V>(executorService, set), RSetReactive.class);
new RedissonSetReactive<V>(set), RSetReactive.class);
}

@Override
Expand Down Expand Up @@ -234,14 +234,14 @@ public RAtomicLongReactive getAtomicLongReactive(String name) {
public <V> RSetCacheReactive<V> getSetCache(String name) {
RSetCache<V> set = new RedissonSetCache<V>(evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}

@Override
public <V> RSetCacheReactive<V> getSetCache(String name, Codec codec) {
RSetCache<V> set = new RedissonSetCache<V>(codec, evictionScheduler, executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonSetCacheReactive<V>(executorService, set), RSetCacheReactive.class);
new RedissonSetCacheReactive<V>(set), RSetCacheReactive.class);
}

@Override
Expand All @@ -258,8 +258,9 @@ public <V> RScoredSortedSetReactive<V> getScoredSortedSet(String name, Codec cod

@Override
public RLexSortedSetReactive getLexSortedSet(String name) {
return ReactiveProxyBuilder.create(executorService, new RedissonLexSortedSet(executorService, name, null),
new RedissonLexSortedSetReactive(executorService, new RedissonScoredSortedSetReactive<String>(StringCodec.INSTANCE, executorService, name)),
RedissonLexSortedSet set = new RedissonLexSortedSet(executorService, name, null);
return ReactiveProxyBuilder.create(executorService, set,
new RedissonLexSortedSetReactive(set),
RLexSortedSetReactive.class);
}

Expand Down

0 comments on commit 0e78d16

Please sign in to comment.