Skip to content

Commit

Permalink
Improvement - methods of RxJava2 interfaces should use full set of re…
Browse files Browse the repository at this point in the history
…sult objects: Single, Maybe or Completable #1912
  • Loading branch information
Nikita Koksharov committed Mar 15, 2019
1 parent b1f3d34 commit f0c7812
Show file tree
Hide file tree
Showing 55 changed files with 1,377 additions and 609 deletions.
7 changes: 6 additions & 1 deletion redisson/src/main/java/org/redisson/RedissonRx.java
Expand Up @@ -379,9 +379,14 @@ public RScriptRx getScript(Codec codec) {
return RxProxyBuilder.create(commandExecutor, new RedissonScript(commandExecutor, codec), RScriptRx.class);
}

@Override
public RBatchRx createBatch() {
return createBatch(BatchOptions.defaults());
}

@Override
public RBatchRx createBatch(BatchOptions options) {
RedissonBatchRx batch = new RedissonBatchRx(evictionScheduler, connectionManager, options);
RedissonBatchRx batch = new RedissonBatchRx(evictionScheduler, connectionManager, commandExecutor, options);
if (config.isReferenceEnabled()) {
batch.enableRedissonReferenceSupport(this);
}
Expand Down
25 changes: 13 additions & 12 deletions redisson/src/main/java/org/redisson/api/RAtomicDoubleRx.java
Expand Up @@ -15,7 +15,8 @@
*/
package org.redisson.api;

import io.reactivex.Flowable;
import io.reactivex.Completable;
import io.reactivex.Single;

/**
* Reactive interface for AtomicDouble object
Expand All @@ -34,80 +35,80 @@ public interface RAtomicDoubleRx extends RExpirableRx {
* @return true if successful; or false if the actual value
* was not equal to the expected value.
*/
Flowable<Boolean> compareAndSet(double expect, double update);
Single<Boolean> compareAndSet(double expect, double update);

/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
Flowable<Double> addAndGet(double delta);
Single<Double> addAndGet(double delta);

/**
* Atomically decrements the current value by one.
*
* @return the updated value
*/
Flowable<Double> decrementAndGet();
Single<Double> decrementAndGet();

/**
* Returns current value.
*
* @return current value
*/
Flowable<Double> get();
Single<Double> get();

/**
* Returns and deletes object
*
* @return the current value
*/
Flowable<Double> getAndDelete();
Single<Double> getAndDelete();

/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
Flowable<Double> getAndAdd(double delta);
Single<Double> getAndAdd(double delta);

/**
* Atomically sets the given value and returns the old value.
*
* @param newValue the new value
* @return the old value
*/
Flowable<Double> getAndSet(double newValue);
Single<Double> getAndSet(double newValue);

/**
* Atomically increments the current value by one.
*
* @return the updated value
*/
Flowable<Double> incrementAndGet();
Single<Double> incrementAndGet();

/**
* Atomically increments the current value by one.
*
* @return the old value
*/
Flowable<Double> getAndIncrement();
Single<Double> getAndIncrement();

/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
Flowable<Double> getAndDecrement();
Single<Double> getAndDecrement();

/**
* Atomically sets the given value.
*
* @param newValue the new value
* @return void
*/
Flowable<Void> set(double newValue);
Completable set(double newValue);

}
25 changes: 13 additions & 12 deletions redisson/src/main/java/org/redisson/api/RAtomicLongRx.java
Expand Up @@ -15,7 +15,8 @@
*/
package org.redisson.api;

import io.reactivex.Flowable;
import io.reactivex.Completable;
import io.reactivex.Single;

/**
* RxJava2 interface for AtomicLong object
Expand All @@ -34,80 +35,80 @@ public interface RAtomicLongRx extends RExpirableRx {
* @return true if successful; or false if the actual value
* was not equal to the expected value.
*/
Flowable<Boolean> compareAndSet(long expect, long update);
Single<Boolean> compareAndSet(long expect, long update);

/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
Flowable<Long> addAndGet(long delta);
Single<Long> addAndGet(long delta);

/**
* Atomically decrements the current value by one.
*
* @return the updated value
*/
Flowable<Long> decrementAndGet();
Single<Long> decrementAndGet();

/**
* Returns current value.
*
* @return the current value
*/
Flowable<Long> get();
Single<Long> get();

/**
* Returns and deletes object
*
* @return the current value
*/
Flowable<Long> getAndDelete();
Single<Long> getAndDelete();

/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the old value before the add
*/
Flowable<Long> getAndAdd(long delta);
Single<Long> getAndAdd(long delta);

/**
* Atomically sets the given value and returns the old value.
*
* @param newValue the new value
* @return the old value
*/
Flowable<Long> getAndSet(long newValue);
Single<Long> getAndSet(long newValue);

/**
* Atomically increments the current value by one.
*
* @return the updated value
*/
Flowable<Long> incrementAndGet();
Single<Long> incrementAndGet();

/**
* Atomically increments the current value by one.
*
* @return the old value
*/
Flowable<Long> getAndIncrement();
Single<Long> getAndIncrement();

/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
Flowable<Long> getAndDecrement();
Single<Long> getAndDecrement();

/**
* Atomically sets the given value.
*
* @param newValue the new value
* @return void
*/
Flowable<Void> set(long newValue);
Completable set(long newValue);

}
4 changes: 2 additions & 2 deletions redisson/src/main/java/org/redisson/api/RBatchRx.java
Expand Up @@ -19,7 +19,7 @@

import org.redisson.client.codec.Codec;

import io.reactivex.Flowable;
import io.reactivex.Maybe;

/**
* RxJava2 interface for Redis pipeline feature.
Expand Down Expand Up @@ -366,7 +366,7 @@ public interface RBatchRx {
*
* @return List with result object for each command
*/
Flowable<BatchResult<?>> execute();
Maybe<BatchResult<?>> execute();

/*
* Use BatchOptions#atomic
Expand Down
37 changes: 19 additions & 18 deletions redisson/src/main/java/org/redisson/api/RBitSetRx.java
Expand Up @@ -17,7 +17,8 @@

import java.util.BitSet;

import io.reactivex.Flowable;
import io.reactivex.Completable;
import io.reactivex.Single;

/**
* RxJava2 interface for BitSet object
Expand All @@ -27,15 +28,15 @@
*/
public interface RBitSetRx extends RExpirableRx {

Flowable<byte[]> toByteArray();
Single<byte[]> toByteArray();

/**
* Returns "logical size" = index of highest set bit plus one.
* Returns zero if there are no any set bit.
*
* @return "logical size" = index of highest set bit plus one
*/
Flowable<Long> length();
Single<Long> length();

/**
* Set all bits to <code>value</code> from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
Expand All @@ -46,7 +47,7 @@ public interface RBitSetRx extends RExpirableRx {
* @return void
*
*/
Flowable<Void> set(long fromIndex, long toIndex, boolean value);
Completable set(long fromIndex, long toIndex, boolean value);

/**
* Set all bits to zero from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
Expand All @@ -56,22 +57,22 @@ public interface RBitSetRx extends RExpirableRx {
* @return void
*
*/
Flowable<Void> clear(long fromIndex, long toIndex);
Completable clear(long fromIndex, long toIndex);

/**
* Copy bits state of source BitSet object to this object
*
* @param bs - BitSet source
* @return void
*/
Flowable<Void> set(BitSet bs);
Completable set(BitSet bs);

/**
* Executes NOT operation over all bits
*
* @return void
*/
Flowable<Void> not();
Completable not();

/**
* Set all bits to one from <code>fromIndex</code> (inclusive) to <code>toIndex</code> (exclusive)
Expand All @@ -80,22 +81,22 @@ public interface RBitSetRx extends RExpirableRx {
* @param toIndex exclusive
* @return void
*/
Flowable<Void> set(long fromIndex, long toIndex);
Completable set(long fromIndex, long toIndex);

/**
* Returns number of set bits.
*
* @return number of set bits.
*/
Flowable<Long> size();
Single<Long> size();

/**
* Returns <code>true</code> if bit set to one and <code>false</code> overwise.
*
* @param bitIndex - index of bit
* @return <code>true</code> if bit set to one and <code>false</code> overwise.
*/
Flowable<Boolean> get(long bitIndex);
Single<Boolean> get(long bitIndex);

/**
* Set bit to one at specified bitIndex
Expand All @@ -104,7 +105,7 @@ public interface RBitSetRx extends RExpirableRx {
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Flowable<Boolean> set(long bitIndex);
Single<Boolean> set(long bitIndex);

/**
* Set bit to <code>value</code> at specified <code>bitIndex</code>
Expand All @@ -114,14 +115,14 @@ public interface RBitSetRx extends RExpirableRx {
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Flowable<Boolean> set(long bitIndex, boolean value);
Single<Boolean> set(long bitIndex, boolean value);

/**
* Returns the number of bits set to one.
*
* @return number of bits
*/
Flowable<Long> cardinality();
Single<Long> cardinality();

/**
* Set bit to zero at specified <code>bitIndex</code>
Expand All @@ -130,14 +131,14 @@ public interface RBitSetRx extends RExpirableRx {
* @return <code>true</code> - if previous value was true,
* <code>false</code> - if previous value was false
*/
Flowable<Boolean> clear(long bitIndex);
Single<Boolean> clear(long bitIndex);

/**
* Set all bits to zero
*
* @return void
*/
Flowable<Void> clear();
Completable clear();

/**
* Executes OR operation over this object and specified bitsets.
Expand All @@ -146,7 +147,7 @@ public interface RBitSetRx extends RExpirableRx {
* @param bitSetNames - name of stored bitsets
* @return void
*/
Flowable<Void> or(String... bitSetNames);
Completable or(String... bitSetNames);

/**
* Executes AND operation over this object and specified bitsets.
Expand All @@ -155,7 +156,7 @@ public interface RBitSetRx extends RExpirableRx {
* @param bitSetNames - name of stored bitsets
* @return void
*/
Flowable<Void> and(String... bitSetNames);
Completable and(String... bitSetNames);

/**
* Executes XOR operation over this object and specified bitsets.
Expand All @@ -164,6 +165,6 @@ public interface RBitSetRx extends RExpirableRx {
* @param bitSetNames - name of stored bitsets
* @return void
*/
Flowable<Void> xor(String... bitSetNames);
Completable xor(String... bitSetNames);

}

0 comments on commit f0c7812

Please sign in to comment.