Skip to content

Commit

Permalink
Future interface replaced to RFuture for async methods. #577
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 11, 2016
1 parent 2874e75 commit cbe9ae6
Show file tree
Hide file tree
Showing 92 changed files with 1,269 additions and 1,062 deletions.
23 changes: 11 additions & 12 deletions redisson/src/main/java/org/redisson/RedissonAtomicDouble.java
Expand Up @@ -19,15 +19,14 @@
import java.util.Collections; import java.util.Collections;


import org.redisson.api.RAtomicDouble; import org.redisson.api.RAtomicDouble;
import org.redisson.api.RFuture;
import org.redisson.client.codec.DoubleCodec; import org.redisson.client.codec.DoubleCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.SingleConvertor; import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;


import io.netty.util.concurrent.Future;

/** /**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
* *
Expand All @@ -46,7 +45,7 @@ public double addAndGet(double delta) {
} }


@Override @Override
public Future<Double> addAndGetAsync(double delta) { public RFuture<Double> addAndGetAsync(double delta) {
if (delta == 0) { if (delta == 0) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBYFLOAT, getName(), 0); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBYFLOAT, getName(), 0);
} }
Expand All @@ -59,7 +58,7 @@ public boolean compareAndSet(double expect, double update) {
} }


@Override @Override
public Future<Boolean> compareAndSetAsync(double expect, double update) { public RFuture<Boolean> compareAndSetAsync(double expect, double update) {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if tonumber(redis.call('get', KEYS[1])) == tonumber(ARGV[1]) then " "if tonumber(redis.call('get', KEYS[1])) == tonumber(ARGV[1]) then "
+ "redis.call('set', KEYS[1], ARGV[2]); " + "redis.call('set', KEYS[1], ARGV[2]); "
Expand All @@ -75,7 +74,7 @@ public double decrementAndGet() {
} }


@Override @Override
public Future<Double> decrementAndGetAsync() { public RFuture<Double> decrementAndGetAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName()); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
} }


Expand All @@ -85,7 +84,7 @@ public double get() {
} }


@Override @Override
public Future<Double> getAsync() { public RFuture<Double> getAsync() {
return addAndGetAsync(0); return addAndGetAsync(0);
} }


Expand All @@ -95,7 +94,7 @@ public double getAndAdd(double delta) {
} }


@Override @Override
public Future<Double> getAndAddAsync(final double delta) { public RFuture<Double> getAndAddAsync(final double delta) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Double>("INCRBYFLOAT", new SingleConvertor<Double>() { return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Double>("INCRBYFLOAT", new SingleConvertor<Double>() {
@Override @Override
public Double convert(Object obj) { public Double convert(Object obj) {
Expand All @@ -111,7 +110,7 @@ public double getAndSet(double newValue) {
} }


@Override @Override
public Future<Double> getAndSetAsync(double newValue) { public RFuture<Double> getAndSetAsync(double newValue) {
return commandExecutor.writeAsync(getName(), DoubleCodec.INSTANCE, RedisCommands.GETSET, getName(), BigDecimal.valueOf(newValue).toPlainString()); return commandExecutor.writeAsync(getName(), DoubleCodec.INSTANCE, RedisCommands.GETSET, getName(), BigDecimal.valueOf(newValue).toPlainString());
} }


Expand All @@ -121,7 +120,7 @@ public double incrementAndGet() {
} }


@Override @Override
public Future<Double> incrementAndGetAsync() { public RFuture<Double> incrementAndGetAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBYFLOAT, getName(), 1); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBYFLOAT, getName(), 1);
} }


Expand All @@ -131,7 +130,7 @@ public double getAndIncrement() {
} }


@Override @Override
public Future<Double> getAndIncrementAsync() { public RFuture<Double> getAndIncrementAsync() {
return getAndAddAsync(1); return getAndAddAsync(1);
} }


Expand All @@ -141,7 +140,7 @@ public double getAndDecrement() {
} }


@Override @Override
public Future<Double> getAndDecrementAsync() { public RFuture<Double> getAndDecrementAsync() {
return getAndAddAsync(-1); return getAndAddAsync(-1);
} }


Expand All @@ -151,7 +150,7 @@ public void set(double newValue) {
} }


@Override @Override
public Future<Void> setAsync(double newValue) { public RFuture<Void> setAsync(double newValue) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), BigDecimal.valueOf(newValue)); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), BigDecimal.valueOf(newValue));
} }


Expand Down
23 changes: 11 additions & 12 deletions redisson/src/main/java/org/redisson/RedissonAtomicLong.java
Expand Up @@ -18,15 +18,14 @@
import java.util.Collections; import java.util.Collections;


import org.redisson.api.RAtomicLong; import org.redisson.api.RAtomicLong;
import org.redisson.api.RFuture;
import org.redisson.client.codec.LongCodec; import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand; import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.SingleConvertor; import org.redisson.client.protocol.convertor.SingleConvertor;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;


import io.netty.util.concurrent.Future;

/** /**
* Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong} * Distributed alternative to the {@link java.util.concurrent.atomic.AtomicLong}
* *
Expand All @@ -45,7 +44,7 @@ public long addAndGet(long delta) {
} }


@Override @Override
public Future<Long> addAndGetAsync(long delta) { public RFuture<Long> addAndGetAsync(long delta) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta);
} }


Expand All @@ -55,7 +54,7 @@ public boolean compareAndSet(long expect, long update) {
} }


@Override @Override
public Future<Boolean> compareAndSetAsync(long expect, long update) { public RFuture<Boolean> compareAndSetAsync(long expect, long update) {
return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local currValue = redis.call('get', KEYS[1]); " "local currValue = redis.call('get', KEYS[1]); "
+ "if currValue == ARGV[1] " + "if currValue == ARGV[1] "
Expand All @@ -74,7 +73,7 @@ public long decrementAndGet() {
} }


@Override @Override
public Future<Long> decrementAndGetAsync() { public RFuture<Long> decrementAndGetAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName()); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
} }


Expand All @@ -84,7 +83,7 @@ public long get() {
} }


@Override @Override
public Future<Long> getAsync() { public RFuture<Long> getAsync() {
return addAndGetAsync(0); return addAndGetAsync(0);
} }


Expand All @@ -94,7 +93,7 @@ public long getAndAdd(long delta) {
} }


@Override @Override
public Future<Long> getAndAddAsync(final long delta) { public RFuture<Long> getAndAddAsync(final long delta) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new SingleConvertor<Long>() { return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, new RedisStrictCommand<Long>("INCRBY", new SingleConvertor<Long>() {
@Override @Override
public Long convert(Object obj) { public Long convert(Object obj) {
Expand All @@ -110,7 +109,7 @@ public long getAndSet(long newValue) {
} }


@Override @Override
public Future<Long> getAndSetAsync(long newValue) { public RFuture<Long> getAndSetAsync(long newValue) {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GETSET, getName(), newValue); return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.GETSET, getName(), newValue);
} }


Expand All @@ -120,7 +119,7 @@ public long incrementAndGet() {
} }


@Override @Override
public Future<Long> incrementAndGetAsync() { public RFuture<Long> incrementAndGetAsync() {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName()); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName());
} }


Expand All @@ -130,7 +129,7 @@ public long getAndIncrement() {
} }


@Override @Override
public Future<Long> getAndIncrementAsync() { public RFuture<Long> getAndIncrementAsync() {
return getAndAddAsync(1); return getAndAddAsync(1);
} }


Expand All @@ -140,7 +139,7 @@ public long getAndDecrement() {
} }


@Override @Override
public Future<Long> getAndDecrementAsync() { public RFuture<Long> getAndDecrementAsync() {
return getAndAddAsync(-1); return getAndAddAsync(-1);
} }


Expand All @@ -150,7 +149,7 @@ public void set(long newValue) {
} }


@Override @Override
public Future<Void> setAsync(long newValue) { public RFuture<Void> setAsync(long newValue) {
return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue); return commandExecutor.writeAsync(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue);
} }


Expand Down
3 changes: 2 additions & 1 deletion redisson/src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -25,6 +25,7 @@
import org.redisson.api.RBlockingQueueAsync; import org.redisson.api.RBlockingQueueAsync;
import org.redisson.api.RBucketAsync; import org.redisson.api.RBucketAsync;
import org.redisson.api.RDequeAsync; import org.redisson.api.RDequeAsync;
import org.redisson.api.RFuture;
import org.redisson.api.RGeoAsync; import org.redisson.api.RGeoAsync;
import org.redisson.api.RHyperLogLogAsync; import org.redisson.api.RHyperLogLogAsync;
import org.redisson.api.RKeysAsync; import org.redisson.api.RKeysAsync;
Expand Down Expand Up @@ -228,7 +229,7 @@ public List<?> execute() {
} }


@Override @Override
public Future<List<?>> executeAsync() { public RFuture<List<?>> executeAsync() {
return executorService.executeAsync(); return executorService.executeAsync();
} }


Expand Down
44 changes: 24 additions & 20 deletions redisson/src/main/java/org/redisson/RedissonBitSet.java
Expand Up @@ -22,13 +22,17 @@
import java.util.List; import java.util.List;


import org.redisson.api.RBitSet; import org.redisson.api.RBitSet;
import org.redisson.api.RFuture;
import org.redisson.client.codec.ByteArrayCodec; import org.redisson.client.codec.ByteArrayCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService; import org.redisson.command.CommandBatchService;


import io.netty.util.concurrent.Future; /**

*
* @author Nikita Koksharov
*
*/
public class RedissonBitSet extends RedissonExpirable implements RBitSet { public class RedissonBitSet extends RedissonExpirable implements RBitSet {


public RedissonBitSet(CommandAsyncExecutor connectionManager, String name) { public RedissonBitSet(CommandAsyncExecutor connectionManager, String name) {
Expand All @@ -51,7 +55,7 @@ public boolean get(long bitIndex) {
} }


@Override @Override
public Future<Boolean> getAsync(long bitIndex) { public RFuture<Boolean> getAsync(long bitIndex) {
return commandExecutor.readAsync(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex); return commandExecutor.readAsync(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
} }


Expand All @@ -76,7 +80,7 @@ public void set(long bitIndex, boolean value) {
} }


@Override @Override
public Future<Void> setAsync(long bitIndex, boolean value) { public RFuture<Void> setAsync(long bitIndex, boolean value) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), bitIndex, value ? 1 : 0); return commandExecutor.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), bitIndex, value ? 1 : 0);
} }


Expand All @@ -86,7 +90,7 @@ public byte[] toByteArray() {
} }


@Override @Override
public Future<byte[]> toByteArrayAsync() { public RFuture<byte[]> toByteArrayAsync() {
return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName()); return commandExecutor.readAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
} }


Expand Down Expand Up @@ -135,7 +139,7 @@ public void not() {
get(notAsync()); get(notAsync());
} }


private Future<Void> opAsync(String op, String... bitSetNames) { private RFuture<Void> opAsync(String op, String... bitSetNames) {
List<Object> params = new ArrayList<Object>(bitSetNames.length + 3); List<Object> params = new ArrayList<Object>(bitSetNames.length + 3);
params.add(op); params.add(op);
params.add(getName()); params.add(getName());
Expand Down Expand Up @@ -178,7 +182,7 @@ public String toString() {
} }


@Override @Override
public Future<Long> lengthAsync() { public RFuture<Long> lengthAsync() {
return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LONG, return commandExecutor.evalReadAsync(getName(), codec, RedisCommands.EVAL_LONG,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);" "local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;" + "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
Expand All @@ -192,15 +196,15 @@ public Future<Long> lengthAsync() {
} }


@Override @Override
public Future<Void> setAsync(long fromIndex, long toIndex, boolean value) { public RFuture<Void> setAsync(long fromIndex, long toIndex, boolean value) {
if (value) { if (value) {
return setAsync(fromIndex, toIndex); return setAsync(fromIndex, toIndex);
} }
return clearAsync(fromIndex, toIndex); return clearAsync(fromIndex, toIndex);
} }


@Override @Override
public Future<Void> clearAsync(long fromIndex, long toIndex) { public RFuture<Void> clearAsync(long fromIndex, long toIndex) {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (long i = fromIndex; i < toIndex; i++) { for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 0); executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 0);
Expand All @@ -209,17 +213,17 @@ public Future<Void> clearAsync(long fromIndex, long toIndex) {
} }


@Override @Override
public Future<Void> setAsync(BitSet bs) { public RFuture<Void> setAsync(BitSet bs) {
return commandExecutor.writeAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs)); return commandExecutor.writeAsync(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
} }


@Override @Override
public Future<Void> notAsync() { public RFuture<Void> notAsync() {
return opAsync("NOT"); return opAsync("NOT");
} }


@Override @Override
public Future<Void> setAsync(long fromIndex, long toIndex) { public RFuture<Void> setAsync(long fromIndex, long toIndex) {
CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager()); CommandBatchService executorService = new CommandBatchService(commandExecutor.getConnectionManager());
for (long i = fromIndex; i < toIndex; i++) { for (long i = fromIndex; i < toIndex; i++) {
executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 1); executorService.writeAsync(getName(), codec, RedisCommands.SETBIT_VOID, getName(), i, 1);
Expand All @@ -228,42 +232,42 @@ public Future<Void> setAsync(long fromIndex, long toIndex) {
} }


@Override @Override
public Future<Integer> sizeAsync() { public RFuture<Integer> sizeAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITS_SIZE, getName()); return commandExecutor.readAsync(getName(), codec, RedisCommands.BITS_SIZE, getName());
} }


@Override @Override
public Future<Void> setAsync(long bitIndex) { public RFuture<Void> setAsync(long bitIndex) {
return setAsync(bitIndex, true); return setAsync(bitIndex, true);
} }


@Override @Override
public Future<Long> cardinalityAsync() { public RFuture<Long> cardinalityAsync() {
return commandExecutor.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName()); return commandExecutor.readAsync(getName(), codec, RedisCommands.BITCOUNT, getName());
} }


@Override @Override
public Future<Void> clearAsync(long bitIndex) { public RFuture<Void> clearAsync(long bitIndex) {
return setAsync(bitIndex, false); return setAsync(bitIndex, false);
} }


@Override @Override
public Future<Void> clearAsync() { public RFuture<Void> clearAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName()); return commandExecutor.writeAsync(getName(), RedisCommands.DEL_VOID, getName());
} }


@Override @Override
public Future<Void> orAsync(String... bitSetNames) { public RFuture<Void> orAsync(String... bitSetNames) {
return opAsync("OR", bitSetNames); return opAsync("OR", bitSetNames);
} }


@Override @Override
public Future<Void> andAsync(String... bitSetNames) { public RFuture<Void> andAsync(String... bitSetNames) {
return opAsync("AND", bitSetNames); return opAsync("AND", bitSetNames);
} }


@Override @Override
public Future<Void> xorAsync(String... bitSetNames) { public RFuture<Void> xorAsync(String... bitSetNames) {
return opAsync("XOR", bitSetNames); return opAsync("XOR", bitSetNames);
} }


Expand Down

0 comments on commit cbe9ae6

Please sign in to comment.