Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Dec 19, 2018
1 parent a2a7fbc commit e893a86
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 26 deletions.
Expand Up @@ -146,7 +146,7 @@ public interface RedisCommands {
RedisCommand<Set<Object>> ZRANGEBYSCORE = new RedisCommand<Set<Object>>("ZRANGEBYSCORE", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<Object>> ZRANGEBYSCORE_LIST = new RedisCommand<List<Object>>("ZRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGE = new RedisCommand<List<Object>>("ZREVRANGE", new ObjectListReplayDecoder<Object>());
RedisCommand<List<Object>> ZREVRANGEBYSCORE = new RedisCommand<List<Object>>("ZREVRANGEBYSCORE", new ObjectListReplayDecoder<Object>());
RedisCommand<Set<Object>> ZREVRANGEBYSCORE = new RedisCommand<Set<Object>>("ZREVRANGEBYSCORE", new ObjectSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZREVRANGEBYSCORE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZREVRANGEBYSCORE", new ScoredSortedSetReplayDecoder<Object>());
RedisCommand<List<ScoredEntry<Object>>> ZRANGE_ENTRY = new RedisCommand<List<ScoredEntry<Object>>>("ZRANGE", new ScoredSortedSetReplayDecoder<Object>());
Expand Down
Expand Up @@ -58,6 +58,8 @@ public interface CommandAsyncExecutor {

<T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params);

<T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object ... params);

<T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object ... params);
Expand All @@ -70,8 +72,12 @@ public interface CommandAsyncExecutor {

<R, T> RFuture<R> writeAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

<T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params);

<R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R> callback, Object ... params);

<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, Codec codec, RedisCommand<T> command, Object... params);

<T, R> RFuture<R> evalReadAsync(RedisClient client, String name, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

<T, R> RFuture<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
Expand All @@ -90,8 +96,6 @@ public interface CommandAsyncExecutor {

<T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object ... params);

<T, R> RFuture<Collection<R>> readAllAsync(Collection<R> results, RedisCommand<T> command, Object ... params);

<R, T> RFuture<R> writeAllAsync(Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, Object... params);

<T> RFuture<Void> writeAllAsync(RedisCommand<T> command, Object ... params);
Expand Down
Expand Up @@ -245,13 +245,20 @@ public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand
return mainPromise;
}

@Override
public <T, R> RFuture<Collection<R>> readAllAsync(Codec codec, RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, codec, command, params);
}

@Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
List<R> results = new ArrayList<R>();
return readAllAsync(results, command, params);
return readAllAsync(results, connectionManager.getCodec(), command, params);
}

@Override
public <T, R> RFuture<Collection<R>> readAllAsync(final Collection<R> results, RedisCommand<T> command, Object... params) {
public <T, R> RFuture<Collection<R>> readAllAsync(final Collection<R> results, Codec codec, RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = createPromise();
final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size());
Expand Down Expand Up @@ -284,11 +291,11 @@ public void operationComplete(Future<Object> future) throws Exception {
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener);
async(true, new NodeSource(entry), connectionManager.getCodec(), command, params, promise, 0, true, null);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null);
}
return mainPromise;
}

@Override
public <T, R> RFuture<R> readRandomAsync(Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
Expand Down
Expand Up @@ -138,7 +138,7 @@ public boolean removeListener(ChannelName channelName, int listenerId) {
return false;
}

private void removeListener(ChannelName channelName, RedisPubSubListener<?> listener) {
public void removeListener(ChannelName channelName, RedisPubSubListener<?> listener) {
Queue<RedisPubSubListener<?>> queue = channelListeners.get(channelName);
synchronized (queue) {
if (queue.remove(listener) && queue.isEmpty()) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.function.LongConsumer;

import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;

Expand All @@ -37,13 +38,13 @@
* @param <V> value type
* @param <M> entry type
*/
public class RedissonMapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {
public class MapReactiveIterator<K, V, M> implements Consumer<FluxSink<M>> {

private final RedissonMap<K, V> map;
private final String pattern;
private final int count;

public RedissonMapReactiveIterator(RedissonMap<K, V> map, String pattern, int count) {
public MapReactiveIterator(RedissonMap<K, V> map, String pattern, int count) {
this.map = map;
this.pattern = pattern;
this.count = count;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void accept(long value) {
};

protected void nextValues(FluxSink<M> emitter) {
map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener<MapScanResult<Object, Object>>() {
scanIterator(client, nextIterPos).addListener(new FutureListener<MapScanResult<Object, Object>>() {

@Override
public void operationComplete(Future<MapScanResult<Object, Object>> future)
Expand Down Expand Up @@ -133,4 +134,8 @@ public V setValue(V value) {
};
}

public RFuture<MapScanResult<Object, Object>> scanIterator(RedisClient client, long nextIterPos) {
return map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count);
}

}
Expand Up @@ -19,15 +19,10 @@
import java.util.List;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.redisson.RedissonKeys;
import org.redisson.client.RedisClient;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.connection.MasterSlaveEntry;

Expand All @@ -43,11 +38,11 @@
*/
public class RedissonKeysReactive {

private final CommandReactiveService commandExecutor;
private final CommandReactiveExecutor commandExecutor;

private final RedissonKeys instance;

public RedissonKeysReactive(CommandReactiveService commandExecutor) {
public RedissonKeysReactive(CommandReactiveExecutor commandExecutor) {
super();
instance = new RedissonKeys(commandExecutor);
this.commandExecutor = commandExecutor;
Expand Down
Expand Up @@ -56,7 +56,7 @@ public Publisher<Map.Entry<K, V>> entryIterator(String pattern) {
}

public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) mapCache, pattern, count));
return Flux.create(new MapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) mapCache, pattern, count));
}

public Publisher<V> valueIterator() {
Expand All @@ -72,7 +72,7 @@ public Publisher<V> valueIterator(int count) {
}

public Publisher<V> valueIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) mapCache, pattern, count) {
return Flux.create(new MapReactiveIterator<K, V, V>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
Expand All @@ -93,7 +93,7 @@ public Publisher<K> keyIterator(int count) {
}

public Publisher<K> keyIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) mapCache, pattern, count) {
return Flux.create(new MapReactiveIterator<K, V, K>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
Expand Down
Expand Up @@ -65,7 +65,7 @@ public Publisher<Entry<K, V>> entryIterator(String pattern) {
}

public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count));
return Flux.create(new MapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count));
}

public Publisher<V> valueIterator() {
Expand All @@ -81,7 +81,7 @@ public Publisher<V> valueIterator(int count) {
}

public Publisher<V> valueIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
return Flux.create(new MapReactiveIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
@Override
V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue();
Expand All @@ -102,7 +102,7 @@ public Publisher<K> keyIterator(int count) {
}

public Publisher<K> keyIterator(String pattern, int count) {
return Flux.create(new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
return Flux.create(new MapReactiveIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
@Override
K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey();
Expand Down
Expand Up @@ -19,8 +19,6 @@
import java.util.function.Consumer;
import java.util.function.LongConsumer;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.redisson.api.RFuture;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.ListScanResult;
Expand Down

0 comments on commit e893a86

Please sign in to comment.