Skip to content

Commit

Permalink
Code cleanup. #210
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 1, 2015
1 parent f47006a commit ec3b9fa
Show file tree
Hide file tree
Showing 20 changed files with 175 additions and 179 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/redisson/RedissonAtomicLongReactive.java
Expand Up @@ -39,12 +39,12 @@ protected RedissonAtomicLongReactive(CommandReactiveExecutor commandExecutor, St

@Override
public Publisher<Long> addAndGet(long delta) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta);
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.INCRBY, getName(), delta);
}

@Override
public Publisher<Boolean> compareAndSet(long expect, long update) {
return commandExecutor.evalWriteObservable(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return true "
Expand All @@ -55,7 +55,7 @@ public Publisher<Boolean> compareAndSet(long expect, long update) {

@Override
public Publisher<Long> decrementAndGet() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.DECR, getName());
}

@Override
Expand All @@ -65,7 +65,7 @@ public Publisher<Long> get() {

@Override
public Publisher<Long> getAndAdd(long delta) {
return commandExecutor.evalWriteObservable(getName(),
return commandExecutor.evalWriteReactive(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]) or 0; "
+ "redis.call('set', KEYS[1], v + ARGV[1]); "
Expand All @@ -76,15 +76,15 @@ public Publisher<Long> getAndAdd(long delta) {

@Override
public Publisher<Long> getAndSet(long newValue) {
return commandExecutor.evalWriteObservable(getName(),
return commandExecutor.evalWriteReactive(getName(),
StringCodec.INSTANCE, RedisCommands.EVAL_LONG,
"local v = redis.call('get', KEYS[1]) or 0; redis.call('set', KEYS[1], ARGV[1]); return tonumber(v)",
Collections.<Object>singletonList(getName()), newValue);
}

@Override
public Publisher<Long> incrementAndGet() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName());
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.INCR, getName());
}

@Override
Expand All @@ -99,7 +99,7 @@ public Publisher<Long> getAndDecrement() {

@Override
public Publisher<Void> set(long newValue) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue);
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.SET, getName(), newValue);
}

public String toString() {
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/org/redisson/RedissonBitSetReactive.java
Expand Up @@ -38,15 +38,15 @@ protected RedissonBitSetReactive(CommandReactiveExecutor connectionManager, Stri
}

public Publisher<Boolean> get(int bitIndex) {
return commandExecutor.readObservable(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
return commandExecutor.readReactive(getName(), codec, RedisCommands.GETBIT, getName(), bitIndex);
}

public Publisher<Void> set(int bitIndex, boolean value) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SETBIT, getName(), bitIndex, value ? 1 : 0);
}

public Publisher<byte[]> toByteArray() {
return commandExecutor.readObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
return commandExecutor.readReactive(getName(), ByteArrayCodec.INSTANCE, RedisCommands.GET, getName());
}

private Publisher<Void> op(String op, String... bitSetNames) {
Expand All @@ -55,11 +55,11 @@ private Publisher<Void> op(String op, String... bitSetNames) {
params.add(getName());
params.add(getName());
params.addAll(Arrays.asList(bitSetNames));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BITOP, params.toArray());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BITOP, params.toArray());
}

public Publisher<BitSet> asBitSet() {
return commandExecutor.readObservable(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName());
return commandExecutor.readReactive(getName(), BitSetCodec.INSTANCE, RedisCommands.GET, getName());
}

//Copied from: https://github.com/xetorthio/jedis/issues/301
Expand All @@ -76,7 +76,7 @@ private static byte[] toByteArrayReverse(BitSet bits) {

@Override
public Publisher<Integer> length() {
return commandExecutor.evalReadObservable(getName(), codec, RedisCommands.EVAL_INTEGER,
return commandExecutor.evalReadReactive(getName(), codec, RedisCommands.EVAL_INTEGER,
"local fromBit = redis.call('bitpos', KEYS[1], 1, -1);"
+ "local toBit = 8*(fromBit/8 + 1) - fromBit % 8;"
+ "for i = toBit, fromBit, -1 do "
Expand Down Expand Up @@ -107,7 +107,7 @@ public Publisher<Void> clear(int fromIndex, int toIndex) {

@Override
public Publisher<Void> set(BitSet bs) {
return commandExecutor.writeObservable(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
return commandExecutor.writeReactive(getName(), ByteArrayCodec.INSTANCE, RedisCommands.SET, getName(), toByteArrayReverse(bs));
}

@Override
Expand All @@ -126,7 +126,7 @@ public Publisher<Void> set(int fromIndex, int toIndex) {

@Override
public Publisher<Integer> size() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.BITS_SIZE, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.BITS_SIZE, getName());
}

@Override
Expand All @@ -136,7 +136,7 @@ public Publisher<Void> set(int bitIndex) {

@Override
public Publisher<Integer> cardinality() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.BITCOUNT, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.BITCOUNT, getName());
}

@Override
Expand All @@ -146,7 +146,7 @@ public Publisher<Void> clear(int bitIndex) {

@Override
public Publisher<Void> clear() {
return commandExecutor.writeObservable(getName(), RedisCommands.DEL_VOID, getName());
return commandExecutor.writeReactive(getName(), RedisCommands.DEL_VOID, getName());
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/org/redisson/RedissonBlockingQueueReactive.java
Expand Up @@ -56,12 +56,12 @@ public Publisher<Long> put(V e) {

@Override
public Publisher<V> take() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), 0);
}

@Override
public Publisher<V> poll(long timeout, TimeUnit unit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BLPOP_VALUE, getName(), unit.toSeconds(timeout));
}

/*
Expand All @@ -76,12 +76,12 @@ public Publisher<V> pollFromAny(long timeout, TimeUnit unit, String ... queueNam
params.add(name);
}
params.add(unit.toSeconds(timeout));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BLPOP_VALUE, params.toArray());
}

@Override
public Publisher<V> pollLastAndOfferFirstTo(String queueName, long timeout, TimeUnit unit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
return commandExecutor.writeReactive(getName(), codec, RedisCommands.BRPOPLPUSH, getName(), queueName, unit.toSeconds(timeout));
}

@Override
Expand All @@ -90,7 +90,7 @@ public Publisher<Integer> drainTo(Collection<? super V> c) {
throw new NullPointerException();
}

return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local vals = redis.call('lrange', KEYS[1], 0, -1); " +
"redis.call('ltrim', KEYS[1], -1, 0); " +
"return vals", Collections.<Object>singletonList(getName()));
Expand All @@ -101,7 +101,7 @@ public Publisher<Integer> drainTo(Collection<? super V> c, int maxElements) {
if (c == null) {
throw new NullPointerException();
}
return commandExecutor.evalWriteObservable(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
return commandExecutor.evalWriteReactive(getName(), codec, new RedisCommand<Object>("EVAL", new ListDrainToDecoder(c)),
"local elemNum = math.min(ARGV[1], redis.call('llen', KEYS[1])) - 1;" +
"local vals = redis.call('lrange', KEYS[1], 0, elemNum); " +
"redis.call('ltrim', KEYS[1], elemNum + 1, -1); " +
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/redisson/RedissonBucketReactive.java
Expand Up @@ -35,22 +35,22 @@ protected RedissonBucketReactive(Codec codec, CommandReactiveExecutor connection

@Override
public Publisher<V> get() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.GET, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.GET, getName());
}

@Override
public Publisher<Void> set(V value) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SET, getName(), value);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SET, getName(), value);
}

@Override
public Publisher<Void> set(V value, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.SETEX, getName(), timeUnit.toSeconds(timeToLive), value);
}

@Override
public Publisher<Boolean> exists() {
return commandExecutor.readObservable(getName(), codec, RedisCommands.EXISTS, getName());
return commandExecutor.readReactive(getName(), codec, RedisCommands.EXISTS, getName());
}

}
12 changes: 6 additions & 6 deletions src/main/java/org/redisson/RedissonDequeReactive.java
Expand Up @@ -51,22 +51,22 @@ public RedissonDequeReactive(Codec codec, CommandReactiveExecutor commandExecuto

@Override
public Publisher<Void> addFirst(V e) {
return commandExecutor.writeObservable(getName(), codec, LPUSH_VOID, getName(), e);
return commandExecutor.writeReactive(getName(), codec, LPUSH_VOID, getName(), e);
}

@Override
public Publisher<Void> addLast(V e) {
return commandExecutor.writeObservable(getName(), codec, RPUSH_VOID, getName(), e);
return commandExecutor.writeReactive(getName(), codec, RPUSH_VOID, getName(), e);
}

@Override
public Publisher<V> getLast() {
return commandExecutor.readObservable(getName(), codec, LRANGE_SINGLE, getName(), -1, -1);
return commandExecutor.readReactive(getName(), codec, LRANGE_SINGLE, getName(), -1, -1);
}

@Override
public Publisher<Boolean> offerFirst(V e) {
return commandExecutor.writeObservable(getName(), codec, LPUSH_BOOLEAN, getName(), e);
return commandExecutor.writeReactive(getName(), codec, LPUSH_BOOLEAN, getName(), e);
}

@Override
Expand All @@ -91,7 +91,7 @@ public Publisher<V> pollFirst() {

@Override
public Publisher<V> pollLast() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.RPOP, getName());
}

@Override
Expand All @@ -116,7 +116,7 @@ public Publisher<V> removeFirst() {

@Override
public Publisher<V> removeLast() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.RPOP, getName());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.RPOP, getName());
}

@Override
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/redisson/RedissonExpirableReactive.java
Expand Up @@ -38,12 +38,12 @@ abstract class RedissonExpirableReactive extends RedissonObjectReactive implemen

@Override
public Publisher<Boolean> expire(long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive));
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.EXPIRE, getName(), timeUnit.toSeconds(timeToLive));
}

@Override
public Publisher<Boolean> expireAt(long timestamp) {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp);
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.EXPIREAT, getName(), timestamp);
}

@Override
Expand All @@ -53,12 +53,12 @@ public Publisher<Boolean> expireAt(Date timestamp) {

@Override
public Publisher<Boolean> clearExpire() {
return commandExecutor.writeObservable(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
return commandExecutor.writeReactive(getName(), StringCodec.INSTANCE, RedisCommands.PERSIST, getName());
}

@Override
public Publisher<Long> remainTimeToLive() {
return commandExecutor.readObservable(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName());
return commandExecutor.readReactive(getName(), StringCodec.INSTANCE, RedisCommands.TTL, getName());
}

}
10 changes: 5 additions & 5 deletions src/main/java/org/redisson/RedissonHyperLogLogReactive.java
Expand Up @@ -38,36 +38,36 @@ protected RedissonHyperLogLogReactive(Codec codec, CommandReactiveExecutor comma

@Override
public Publisher<Boolean> add(V obj) {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFADD, getName(), obj);
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), obj);
}

@Override
public Publisher<Boolean> addAll(Collection<V> objects) {
List<Object> args = new ArrayList<Object>(objects.size() + 1);
args.add(getName());
args.addAll(objects);
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFADD, getName(), args.toArray());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFADD, getName(), args.toArray());
}

@Override
public Publisher<Long> count() {
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFCOUNT, getName());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, getName());
}

@Override
public Publisher<Long> countWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFCOUNT, args.toArray());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFCOUNT, args.toArray());
}

@Override
public Publisher<Void> mergeWith(String... otherLogNames) {
List<Object> args = new ArrayList<Object>(otherLogNames.length + 1);
args.add(getName());
args.addAll(Arrays.asList(otherLogNames));
return commandExecutor.writeObservable(getName(), codec, RedisCommands.PFMERGE, args.toArray());
return commandExecutor.writeReactive(getName(), codec, RedisCommands.PFMERGE, args.toArray());
}

}
14 changes: 7 additions & 7 deletions src/main/java/org/redisson/RedissonKeysReactive.java
Expand Up @@ -47,7 +47,7 @@ public RedissonKeysReactive(CommandReactiveService commandExecutor) {

@Override
public Publisher<Integer> getSlot(String key) {
return commandExecutor.readObservable(null, RedisCommands.KEYSLOT, key);
return commandExecutor.readReactive(null, RedisCommands.KEYSLOT, key);
}

@Override
Expand All @@ -66,9 +66,9 @@ public Publisher<String> getKeys() {

private Publisher<ListScanResult<String>> scanIterator(int slot, long startPos, String pattern) {
if (pattern == null) {
return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos);
}
return commandExecutor.writeObservable(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
return commandExecutor.writeReactive(slot, StringCodec.INSTANCE, RedisCommands.SCAN, startPos, "MATCH", pattern);
}

private Publisher<String> createKeysIterator(final int slot, final String pattern) {
Expand Down Expand Up @@ -163,12 +163,12 @@ public void onComplete() {
*/
@Override
public Publisher<Collection<String>> findKeysByPattern(String pattern) {
return commandExecutor.readAllObservable(RedisCommands.KEYS, pattern);
return commandExecutor.readAllReactive(RedisCommands.KEYS, pattern);
}

@Override
public Publisher<String> randomKey() {
return commandExecutor.readRandomObservable(RedisCommands.RANDOM_KEY);
return commandExecutor.readRandomReactive(RedisCommands.RANDOM_KEY);
}

/**
Expand All @@ -184,7 +184,7 @@ public Publisher<String> randomKey() {
*/
@Override
public Publisher<Long> deleteByPattern(String pattern) {
return commandExecutor.evalWriteAllObservable(RedisCommands.EVAL_LONG, new SlotCallback<Long, Long>() {
return commandExecutor.evalWriteAllReactive(RedisCommands.EVAL_LONG, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
Expand All @@ -211,7 +211,7 @@ public Long onFinish() {
*/
@Override
public Publisher<Long> delete(String ... keys) {
return commandExecutor.writeAllObservable(RedisCommands.DEL, new SlotCallback<Long, Long>() {
return commandExecutor.writeAllReactive(RedisCommands.DEL, new SlotCallback<Long, Long>() {
AtomicLong results = new AtomicLong();
@Override
public void onSlotResult(Long result) {
Expand Down

0 comments on commit ec3b9fa

Please sign in to comment.