Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Feb 8, 2017
1 parent 3ab9de7 commit 53f98a3
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 27 deletions.
4 changes: 4 additions & 0 deletions redisson/src/main/java/org/redisson/RedissonObject.java
Expand Up @@ -81,6 +81,10 @@ protected <V> RFuture<V> newSucceededFuture(V result) {
public String getName() {
return name;
}

protected String getName(Object o) {
return getName();
}

@Override
public void rename(String newName) {
Expand Down
5 changes: 3 additions & 2 deletions redisson/src/main/java/org/redisson/RedissonSet.java
Expand Up @@ -44,7 +44,7 @@
*
* @param <V> value
*/
public class RedissonSet<V> extends RedissonExpirable implements RSet<V> {
public class RedissonSet<V> extends RedissonExpirable implements RSet<V>, ScanIterator {

protected RedissonSet(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
Expand Down Expand Up @@ -83,7 +83,8 @@ protected String getName(Object o) {
return getName();
}

ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
@Override
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = commandExecutor.readAsync(client, name, new ScanCodec(codec), RedisCommands.SSCAN, name, startPos);
return get(f);
}
Expand Down
47 changes: 23 additions & 24 deletions redisson/src/main/java/org/redisson/RedissonSetCache.java
Expand Up @@ -57,8 +57,16 @@
*
* @param <V> value
*/
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V> {
public class RedissonSetCache<V> extends RedissonExpirable implements RSetCache<V>, ScanIterator {

RedissonSetCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}

RedissonSetCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
}

public RedissonSetCache(EvictionScheduler evictionScheduler, CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
evictionScheduler.schedule(getName());
Expand Down Expand Up @@ -91,7 +99,7 @@ public boolean contains(Object o) {

@Override
public RFuture<Boolean> containsAsync(Object o) {
return commandExecutor.evalReadAsync(getName(), codec, new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
return commandExecutor.evalReadAsync(getName(o), codec, new RedisStrictCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 5),
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[2]); " +
"if expireDateScore ~= false then " +
"if tonumber(expireDateScore) <= tonumber(ARGV[1]) then " +
Expand All @@ -102,16 +110,16 @@ public RFuture<Boolean> containsAsync(Object o) {
"else " +
"return 0;" +
"end; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), o);
Arrays.<Object>asList(getName(o)), System.currentTimeMillis(), o);
}

ListScanResult<ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(client, startPos);
public ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos) {
RFuture<ListScanResult<ScanObjectEntry>> f = scanIteratorAsync(name, client, startPos);
return get(f);
}

public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(String name, InetSocketAddress client, long startPos) {
return commandExecutor.evalReadAsync(client, name, new ScanCodec(codec), RedisCommands.EVAL_ZSCAN,
"local result = {}; "
+ "local res = redis.call('zscan', KEYS[1], ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
Expand All @@ -122,7 +130,7 @@ public RFuture<ListScanResult<ScanObjectEntry>> scanIteratorAsync(InetSocketAddr
+ "end; "
+ "end;"
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName()), startPos, System.currentTimeMillis());
+ "return {res[1], result};", Arrays.<Object>asList(name), startPos, System.currentTimeMillis());
}

@Override
Expand All @@ -131,7 +139,7 @@ public Iterator<V> iterator() {

@Override
ListScanResult<ScanObjectEntry> iterator(InetSocketAddress client, long nextIterPos) {
return scanIterator(client, nextIterPos);
return scanIterator(getName(), client, nextIterPos);
}

@Override
Expand All @@ -149,27 +157,18 @@ public Set<V> readAll() {

@Override
public RFuture<Set<V>> readAllAsync() {
return (RFuture<Set<V>>)readAllAsync(RedisCommands.ZRANGEBYSCORE);
}

private RFuture<?> readAllAsync(RedisCommand<? extends Collection<?>> command) {
return commandExecutor.readAsync(getName(), codec, command, getName(), System.currentTimeMillis(), 92233720368547758L);
}


private RFuture<List<Object>> readAllasListAsync() {
return (RFuture<List<Object>>)readAllAsync(RedisCommands.ZRANGEBYSCORE_LIST);
return commandExecutor.readAsync(getName(), codec, RedisCommands.ZRANGEBYSCORE, getName(), System.currentTimeMillis(), 92233720368547758L);
}

@Override
public Object[] toArray() {
List<Object> res = get(readAllasListAsync());
Set<V> res = get(readAllAsync());
return res.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
List<Object> res = get(readAllasListAsync());
Set<V> res = get(readAllAsync());
return res.toArray(a);
}

Expand Down Expand Up @@ -199,14 +198,14 @@ public RFuture<Boolean> addAsync(V value, long ttl, TimeUnit unit) {
byte[] objectState = encode(value);

long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(value), codec, RedisCommands.EVAL_BOOLEAN,
"local expireDateScore = redis.call('zscore', KEYS[1], ARGV[3]); " +
"redis.call('zadd', KEYS[1], ARGV[2], ARGV[3]); " +
"if expireDateScore ~= false and tonumber(expireDateScore) > tonumber(ARGV[1]) then " +
"return 0;" +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName()), System.currentTimeMillis(), timeoutDate, objectState);
Arrays.<Object>asList(getName(value)), System.currentTimeMillis(), timeoutDate, objectState);
}

@Override
Expand All @@ -216,7 +215,7 @@ public RFuture<Boolean> addAsync(V value) {

@Override
public RFuture<Boolean> removeAsync(Object o) {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.ZREM, getName(), o);
return commandExecutor.writeAsync(getName(o), codec, RedisCommands.ZREM, getName(o), o);
}

@Override
Expand Down
29 changes: 29 additions & 0 deletions redisson/src/main/java/org/redisson/ScanIterator.java
@@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.net.InetSocketAddress;

import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ScanObjectEntry;

public interface ScanIterator {

ListScanResult<ScanObjectEntry> scanIterator(String name, InetSocketAddress client, long startPos);

boolean remove(Object value);

}
Expand Up @@ -78,7 +78,7 @@ public Publisher<Boolean> contains(Object o) {
}

Publisher<ListScanResult<ScanObjectEntry>> scanIterator(InetSocketAddress client, long startPos) {
return reactive(instance.scanIteratorAsync(client, startPos));
return reactive(instance.scanIteratorAsync(getName(), client, startPos));
}

@Override
Expand Down

0 comments on commit 53f98a3

Please sign in to comment.