Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 26, 2017
1 parent 96a62e6 commit a50c29b
Show file tree
Hide file tree
Showing 21 changed files with 522 additions and 163 deletions.
14 changes: 14 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonScoredSortedSet.java
Expand Up @@ -525,6 +525,20 @@ public RFuture<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, bo
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE_ENTRY, getName(), startValue, endValue, "WITHSCORES", "LIMIT", offset, count);
}

@Override
public Collection<ScoredEntry<V>> entryRangeReversed(double startScore, boolean startScoreInclusive,
double endScore, boolean endScoreInclusive) {
return get(entryRangeReversedAsync(startScore, startScoreInclusive, endScore, endScoreInclusive));
}

@Override
public RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(double startScore, boolean startScoreInclusive,
double endScore, boolean endScoreInclusive) {
String startValue = value(startScore, startScoreInclusive);
String endValue = value(endScore, endScoreInclusive);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZREVRANGEBYSCORE_ENTRY, getName(), endValue, startValue, "WITHSCORES");
}

@Override
public RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count) {
String startValue = value(startScore, startScoreInclusive);
Expand Down
Expand Up @@ -54,6 +54,6 @@ public interface RBlockingQueueReactive<V> extends RQueueReactive<V> {

Publisher<V> take();

Publisher<Long> put(V e);
Publisher<Integer> put(V e);

}
12 changes: 6 additions & 6 deletions redisson/src/main/java/org/redisson/api/RCollectionReactive.java
Expand Up @@ -33,18 +33,18 @@ public interface RCollectionReactive<V> extends RExpirableReactive {

Publisher<Boolean> removeAll(Collection<?> c);

Publisher<Boolean> contains(Object o);
Publisher<Boolean> contains(V o);

Publisher<Boolean> containsAll(Collection<?> c);

Publisher<Boolean> remove(Object o);
Publisher<Boolean> remove(V o);

Publisher<Long> size();
Publisher<Integer> size();

Publisher<Long> add(V e);
Publisher<Integer> add(V e);

Publisher<Long> addAll(Publisher<? extends V> c);
Publisher<Integer> addAll(Publisher<? extends V> c);

Publisher<Long> addAll(Collection<? extends V> c);
Publisher<Integer> addAll(Collection<? extends V> c);

}
Expand Up @@ -48,7 +48,7 @@ public interface RDequeReactive<V> extends RQueueReactive<V> {

Publisher<V> peekFirst();

Publisher<Long> offerLast(V e);
Publisher<Integer> offerLast(V e);

Publisher<V> getLast();

Expand Down
Expand Up @@ -45,7 +45,7 @@ public interface RLexSortedSetReactive extends RCollectionReactive<String> {

Publisher<Integer> lexCount(String fromElement, boolean fromInclusive, String toElement, boolean toInclusive);

Publisher<Long> rank(String o);
Publisher<Integer> rank(String o);

Publisher<Collection<String>> valueRange(int startIndex, int endIndex);

Expand Down
4 changes: 2 additions & 2 deletions redisson/src/main/java/org/redisson/api/RListReactive.java
Expand Up @@ -39,9 +39,9 @@ public interface RListReactive<V> extends RCollectionReactive<V> {

Publisher<Long> indexOf(Object o);

Publisher<Long> add(long index, V element);
Publisher<Integer> add(long index, V element);

Publisher<Long> addAll(long index, Collection<? extends V> coll);
Publisher<Integer> addAll(long index, Collection<? extends V> coll);

Publisher<Void> fastSet(long index, V element);

Expand Down
Expand Up @@ -30,7 +30,7 @@ public interface RQueueReactive<V> extends RCollectionReactive<V> {

Publisher<V> poll();

Publisher<Long> offer(V e);
Publisher<Integer> offer(V e);

Publisher<V> pollLastAndOfferFirstTo(String queueName);

Expand Down
2 changes: 2 additions & 0 deletions redisson/src/main/java/org/redisson/api/RScoredSortedSet.java
Expand Up @@ -171,6 +171,8 @@ public enum Aggregate {

Collection<ScoredEntry<V>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

Collection<ScoredEntry<V>> entryRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

Collection<ScoredEntry<V>> entryRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

/**
Expand Down
Expand Up @@ -136,6 +136,8 @@ public interface RScoredSortedSetAsync<V> extends RExpirableAsync, RSortableAsyn

RFuture<Collection<ScoredEntry<V>>> entryRangeAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

RFuture<Collection<ScoredEntry<V>>> entryRangeReversedAsync(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

/**
Expand Down
132 changes: 129 additions & 3 deletions redisson/src/main/java/org/redisson/api/RScoredSortedSetReactive.java
Expand Up @@ -16,8 +16,10 @@
package org.redisson.api;

import java.util.Collection;
import java.util.Map;

import org.reactivestreams.Publisher;
import org.redisson.api.RScoredSortedSet.Aggregate;
import org.redisson.client.protocol.ScoredEntry;

public interface RScoredSortedSetReactive<V> extends RExpirableReactive {
Expand All @@ -36,17 +38,17 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive {

Publisher<Integer> removeRangeByRank(int startIndex, int endIndex);

Publisher<Long> rank(V o);
Publisher<Integer> rank(V o);

Publisher<Double> getScore(V o);

Publisher<Boolean> add(double score, V object);

Publisher<Boolean> remove(V object);

Publisher<Long> size();
Publisher<Integer> size();

Publisher<Boolean> contains(Object o);
Publisher<Boolean> contains(V o);

Publisher<Boolean> containsAll(Collection<?> c);

Expand All @@ -68,4 +70,128 @@ public interface RScoredSortedSetReactive<V> extends RExpirableReactive {

Publisher<Collection<ScoredEntry<V>>> entryRange(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

Publisher<Collection<V>> valueRangeReversed(int startIndex, int endIndex);

/**
* Returns all values between <code>startScore</code> and <code>endScore</code> in reversed order.
*
* @param startScore - start score.
* Use <code>Double.POSITIVE_INFINITY</code> or <code>Double.NEGATIVE_INFINITY</code>
* to define infinity numbers
* @param startScoreInclusive - start score inclusive
* @param endScore - end score
* Use <code>Double.POSITIVE_INFINITY</code> or <code>Double.NEGATIVE_INFINITY</code>
* to define infinity numbers
*
* @param endScoreInclusive - end score inclusive
* @return values
*/
Publisher<Collection<V>> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

Publisher<Collection<V>> valueRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);

Publisher<Collection<ScoredEntry<V>>> entryRangeReversed(int startIndex, int endIndex);

Publisher<Collection<ScoredEntry<V>>> entryRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

Publisher<Collection<ScoredEntry<V>>> entryRangeReversed(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive, int offset, int count);


/**
* Returns the number of elements with a score between <code>startScore</code> and <code>endScore</code>.
*
* @param startScore - start score
* @param startScoreInclusive - start score inclusive
* @param endScore - end score
* @param endScoreInclusive - end score inclusive
* @return count
*/
Publisher<Long> count(double startScore, boolean startScoreInclusive, double endScore, boolean endScoreInclusive);

/**
* Read all values at once.
*
* @return values
*/
Publisher<Collection<V>> readAll();

/**
* Intersect provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
Publisher<Integer> intersection(String... names);

/**
* Intersect provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of intersection
*/
Publisher<Integer> intersection(Aggregate aggregate, String... names);

/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
Publisher<Integer> intersection(Map<String, Double> nameWithWeight);

/**
* Intersect provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of intersection
*/
Publisher<Integer> intersection(Aggregate aggregate, Map<String, Double> nameWithWeight);

/**
* Union provided ScoredSortedSets
* and store result to current ScoredSortedSet
*
* @param names - names of ScoredSortedSet
* @return length of union
*/
Publisher<Integer> union(String... names);

/**
* Union provided ScoredSortedSets with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param names - names of ScoredSortedSet
* @return length of union
*/
Publisher<Integer> union(Aggregate aggregate, String... names);

/**
* Union provided ScoredSortedSets mapped to weight multiplier
* and store result to current ScoredSortedSet
*
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
Publisher<Integer> union(Map<String, Double> nameWithWeight);

/**
* Union provided ScoredSortedSets mapped to weight multiplier
* with defined aggregation method
* and store result to current ScoredSortedSet
*
* @param aggregate - score aggregation mode
* @param nameWithWeight - name of ScoredSortedSet mapped to weight multiplier
* @return length of union
*/
Publisher<Integer> union(Aggregate aggregate, Map<String, Double> nameWithWeight);


}
Expand Up @@ -37,6 +37,6 @@ public interface RSetCacheReactive<V> extends RCollectionReactive<V> {
*
*/
@Override
Publisher<Long> size();
Publisher<Integer> size();

}
12 changes: 6 additions & 6 deletions redisson/src/main/java/org/redisson/reactive/PublisherAdder.java
Expand Up @@ -39,19 +39,19 @@ public PublisherAdder(RCollectionReactive<V> destination) {
this.destination = destination;
}

public Long sum(Long first, Long second) {
public Integer sum(Integer first, Integer second) {
return first + second;
}

public Publisher<Long> addAll(Publisher<? extends V> c) {
final Promise<Long> promise = Promises.prepare();
public Publisher<Integer> addAll(Publisher<? extends V> c) {
final Promise<Integer> promise = Promises.prepare();

c.subscribe(new DefaultSubscriber<V>() {

volatile boolean completed;
AtomicLong values = new AtomicLong();
Subscription s;
Long lastSize = 0L;
Integer lastSize = 0;

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -62,7 +62,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(V o) {
values.getAndIncrement();
destination.add(o).subscribe(new DefaultSubscriber<Long>() {
destination.add(o).subscribe(new DefaultSubscriber<Integer>() {

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -75,7 +75,7 @@ public void onError(Throwable t) {
}

@Override
public void onNext(Long o) {
public void onNext(Integer o) {
lastSize = sum(lastSize, o);
s.request(1);
if (values.decrementAndGet() == 0 && completed) {
Expand Down
Expand Up @@ -48,7 +48,7 @@ public RedissonBlockingQueueReactive(Codec codec, CommandReactiveExecutor comman
}

@Override
public Publisher<Long> put(V e) {
public Publisher<Integer> put(V e) {
return offer(e);
}

Expand Down
Expand Up @@ -66,7 +66,7 @@ public Publisher<Boolean> offerFirst(V e) {
}

@Override
public Publisher<Long> offerLast(V e) {
public Publisher<Integer> offerLast(V e) {
return offer(e);
}

Expand Down
Expand Up @@ -37,7 +37,7 @@ public RedissonLexSortedSetReactive(CommandReactiveExecutor commandExecutor, Str
}

@Override
public Publisher<Long> addAll(Publisher<? extends String> c) {
public Publisher<Integer> addAll(Publisher<? extends String> c) {
return new PublisherAdder<String>(this).addAll(c);
}

Expand Down Expand Up @@ -135,12 +135,12 @@ private String value(String fromElement, boolean fromInclusive) {
}

@Override
public Publisher<Long> add(String e) {
public Publisher<Integer> add(String e) {
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.ZADD_RAW, getName(), 0, e);
}

@Override
public Publisher<Long> addAll(Collection<? extends String> c) {
public Publisher<Integer> addAll(Collection<? extends String> c) {
List<Object> params = new ArrayList<Object>(2*c.size());
for (Object param : c) {
params.add(0);
Expand Down

0 comments on commit a50c29b

Please sign in to comment.