Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 15, 2018
1 parent 65455d3 commit ad57907
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 74 deletions.
35 changes: 0 additions & 35 deletions redisson/src/main/java/org/redisson/reactive/MapReactive.java

This file was deleted.

Expand Up @@ -19,11 +19,8 @@
import java.util.Map.Entry; import java.util.Map.Entry;


import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonMapCache; import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RMapCache; import org.redisson.api.RMapCache;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;


/** /**
* *
Expand All @@ -32,19 +29,14 @@
* @param <K> key * @param <K> key
* @param <V> value * @param <V> value
*/ */
public class RedissonMapCacheReactive<K, V> implements MapReactive<K, V> { public class RedissonMapCacheReactive<K, V> {


private final RMapCache<K, V> mapCache; private final RMapCache<K, V> mapCache;


public RedissonMapCacheReactive(RMapCache<K, V> mapCache) { public RedissonMapCacheReactive(RMapCache<K, V> mapCache) {
this.mapCache = mapCache; this.mapCache = mapCache;
} }


@Override
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(RedisClient client, long startPos, String pattern, int count) {
return ((RedissonMapCache<K, V>)mapCache).scanIteratorAsync(mapCache.getName(), client, startPos, pattern, count);
}

public Publisher<Map.Entry<K, V>> entryIterator() { public Publisher<Map.Entry<K, V>> entryIterator() {
return entryIterator(null); return entryIterator(null);
} }
Expand All @@ -58,7 +50,7 @@ public Publisher<Map.Entry<K, V>> entryIterator(String pattern) {
} }


public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) { public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this, pattern, count).stream(); return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) mapCache, pattern, count).stream();
} }


public Publisher<V> valueIterator() { public Publisher<V> valueIterator() {
Expand All @@ -74,7 +66,7 @@ public Publisher<V> valueIterator(int count) {
} }


public Publisher<V> valueIterator(String pattern, int count) { public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, V>(this, pattern, count) { return new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override @Override
V getValue(Entry<Object, Object> entry) { V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue(); return (V) entry.getValue();
Expand All @@ -95,17 +87,12 @@ public Publisher<K> keyIterator(int count) {
} }


public Publisher<K> keyIterator(String pattern, int count) { public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, K>(this, pattern, count) { return new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) mapCache, pattern, count) {
@Override @Override
K getValue(Entry<Object, Object> entry) { K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey(); return (K) entry.getKey();
} }
}.stream(); }.stream();
} }


@Override
public V putSync(K key, V value) {
return mapCache.put(key, value);
}

} }
Expand Up @@ -20,10 +20,7 @@


import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.redisson.RedissonMap; import org.redisson.RedissonMap;
import org.redisson.api.RFuture;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult;




/** /**
Expand All @@ -35,24 +32,14 @@
* @param <K> key * @param <K> key
* @param <V> value * @param <V> value
*/ */
public class RedissonMapReactive<K, V> implements MapReactive<K, V> { public class RedissonMapReactive<K, V> {


private final RMap<K, V> instance; private final RMap<K, V> instance;


public RedissonMapReactive(RMap<K, V> instance) { public RedissonMapReactive(RMap<K, V> instance) {
this.instance = instance; this.instance = instance;
} }


@Override
public RFuture<MapScanResult<Object, Object>> scanIteratorAsync(final RedisClient client, final long startPos, final String pattern, final int count) {
return ((RedissonMap<K, V>)instance).scanIteratorAsync(instance.getName(), client, startPos, pattern, count);
}

@Override
public V putSync(K key, V value) {
return instance.put(key, value);
}

public Publisher<Map.Entry<K, V>> entryIterator() { public Publisher<Map.Entry<K, V>> entryIterator() {
return entryIterator(null); return entryIterator(null);
} }
Expand All @@ -66,7 +53,7 @@ public Publisher<Entry<K, V>> entryIterator(String pattern) {
} }


public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) { public Publisher<Map.Entry<K, V>> entryIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>(this, pattern, count).stream(); return new RedissonMapReactiveIterator<K, V, Map.Entry<K, V>>((RedissonMap<K, V>) instance, pattern, count).stream();
} }


public Publisher<V> valueIterator() { public Publisher<V> valueIterator() {
Expand All @@ -82,7 +69,7 @@ public Publisher<V> valueIterator(int count) {
} }


public Publisher<V> valueIterator(String pattern, int count) { public Publisher<V> valueIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, V>(this, pattern, count) { return new RedissonMapReactiveIterator<K, V, V>((RedissonMap<K, V>) instance, pattern, count) {
@Override @Override
V getValue(Entry<Object, Object> entry) { V getValue(Entry<Object, Object> entry) {
return (V) entry.getValue(); return (V) entry.getValue();
Expand All @@ -103,7 +90,7 @@ public Publisher<K> keyIterator(int count) {
} }


public Publisher<K> keyIterator(String pattern, int count) { public Publisher<K> keyIterator(String pattern, int count) {
return new RedissonMapReactiveIterator<K, V, K>(this, pattern, count) { return new RedissonMapReactiveIterator<K, V, K>((RedissonMap<K, V>) instance, pattern, count) {
@Override @Override
K getValue(Entry<Object, Object> entry) { K getValue(Entry<Object, Object> entry) {
return (K) entry.getKey(); return (K) entry.getKey();
Expand Down
Expand Up @@ -20,6 +20,7 @@


import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.redisson.RedissonMap;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.protocol.decoder.MapScanResult; import org.redisson.client.protocol.decoder.MapScanResult;


Expand All @@ -38,11 +39,11 @@
*/ */
public class RedissonMapReactiveIterator<K, V, M> { public class RedissonMapReactiveIterator<K, V, M> {


private final MapReactive<K, V> map; private final RedissonMap<K, V> map;
private final String pattern; private final String pattern;
private final int count; private final int count;


public RedissonMapReactiveIterator(MapReactive<K, V> map, String pattern, int count) { public RedissonMapReactiveIterator(RedissonMap<K, V> map, String pattern, int count) {
this.map = map; this.map = map;
this.pattern = pattern; this.pattern = pattern;
this.count = count; this.count = count;
Expand All @@ -68,7 +69,7 @@ protected void onRequest(final long n) {


protected void nextValues() { protected void nextValues() {
final ReactiveSubscription<M> m = this; final ReactiveSubscription<M> m = this;
map.scanIteratorAsync(client, nextIterPos, pattern, count).addListener(new FutureListener<MapScanResult<Object, Object>>() { map.scanIteratorAsync(map.getName(), client, nextIterPos, pattern, count).addListener(new FutureListener<MapScanResult<Object, Object>>() {


@Override @Override
public void operationComplete(Future<MapScanResult<Object, Object>> future) public void operationComplete(Future<MapScanResult<Object, Object>> future)
Expand Down Expand Up @@ -122,7 +123,7 @@ M getValue(final Entry<Object, Object> entry) {


@Override @Override
public V setValue(V value) { public V setValue(V value) {
return map.putSync((K) entry.getKey(), value); return map.put((K) entry.getKey(), value);
} }


}; };
Expand Down

0 comments on commit ad57907

Please sign in to comment.