Skip to content

Commit

Permalink
RMapCache iterator fixed. readAll methods fixed. #471
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Apr 19, 2016
1 parent 3a848c0 commit 904d58d
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 73 deletions.
153 changes: 135 additions & 18 deletions src/main/java/org/redisson/RedissonMapCache.java
Expand Up @@ -18,6 +18,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -33,16 +34,22 @@
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.convertor.VoidReplayConvertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.LongMultiDecoder;
import org.redisson.client.protocol.decoder.MapCacheScanResult;
import org.redisson.client.protocol.decoder.MapCacheScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.MapScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.decoder.ObjectListDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ScanObjectEntry;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.RMapCache;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
* <p>Map-based cache with ability to set TTL for each entry via
Expand All @@ -69,7 +76,7 @@ public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCac
static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
static final RedisCommand<Boolean> EVAL_REPLACE_VALUE = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 7, Arrays.asList(ValueType.MAP_KEY, ValueType.MAP_VALUE, ValueType.MAP_VALUE));
private static final RedisCommand<Void> EVAL_HMSET = new RedisCommand<Void>("EVAL", new VoidReplayConvertor(), 4, ValueType.MAP);
private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private static final RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL", new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapReplayDecoder(), new ObjectListReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 9, ValueType.MAP, ValueType.MAP_VALUE);
Expand Down Expand Up @@ -168,10 +175,8 @@ public Future<Map<K, V>> getAllAsync(Set<K> keys) {

return commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local expireHead = redis.call('zrange', KEYS[2], 0, 0, 'withscores');" +
"local expireIdleHead = redis.call('zrange', KEYS[3], 0, 0, 'withscores');" +
"local maxDate = table.remove(ARGV, 1); " // index is the first parameter
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= tonumber(maxDate); "
+ "local hasExpireIdle = #expireIdleHead == 2 and tonumber(expireIdleHead[2]) <= tonumber(maxDate); "
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local hasExpire = #expireHead == 2 and tonumber(expireHead[2]) <= currentTime; "
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
Expand All @@ -182,18 +187,18 @@ public Future<Map<K, V>> getAllAsync(Set<K> keys) {

+ "if hasExpire then "
+ "local expireDate = redis.call('zscore', KEYS[2], key); "
+ "if expireDate ~= false and tonumber(expireDate) <= tonumber(maxDate) then "
+ "if expireDate ~= false and tonumber(expireDate) <= currentTime then "
+ "map[i] = false; "
+ "end; "
+ "end; "

+ "if hasExpireIdle and t ~= 0 then "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "if tonumber(expireIdle) > currentTime then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "redis.call('zadd', KEYS[3], t + currentTime, key); "
+ "else "
+ "map[i] = false; "
+ "end; "
Expand Down Expand Up @@ -526,9 +531,13 @@ public Future<Long> fastRemoveAsync(K ... keys) {

@Override
MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), new ScanCodec(codec), EVAL_HSCAN,
RedisCommand<MapCacheScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapCacheScanResult<Object, Object>>("EVAL",
new ListMultiDecoder(new LongMultiDecoder(), new ObjectMapDecoder(new ScanCodec(codec)), new ObjectListDecoder(codec), new MapCacheScanResultReplayDecoder()), ValueType.MAP);
Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
+ "local idleKeys = {}; "
+ "local res = redis.call('hscan', KEYS[1], ARGV[2]); "
+ "local currentTime = tonumber(ARGV[1]); "
+ "for i, value in ipairs(res[2]) do "
+ "if i % 2 == 0 then "
+ "local key = res[2][i-1]; " +
Expand All @@ -542,22 +551,61 @@ MapScanResult<ScanObjectEntry, ScanObjectEntry> scanIterator(InetSocketAddress c
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "if tonumber(expireIdle) > currentTime and expireDate > currentTime then "
+ "table.insert(idleKeys, key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "

+ "if expireDate > tonumber(ARGV[1]) then "
+ "if expireDate > currentTime then "
+ "table.insert(result, key); "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;"
+ "return {res[1], result};", Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos);
+ "return {res[1], result, idleKeys};", Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis(), startPos);

f.addListener(new FutureListener<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>>() {
@Override
public void operationComplete(Future<MapCacheScanResult<ScanObjectEntry, ScanObjectEntry>> future)
throws Exception {
if (future.isSuccess()) {
MapCacheScanResult<ScanObjectEntry, ScanObjectEntry> res = future.getNow();
if (res.getIdleKeys().isEmpty()) {
return;
}

List<Object> args = new ArrayList<Object>(res.getIdleKeys().size() + 1);
args.add(System.currentTimeMillis());
args.addAll(res.getIdleKeys());

commandExecutor.evalWriteAsync(getName(), codec, new RedisCommand<Map<Object, Object>>("EVAL", new MapGetAllDecoder(args, 1), 7, ValueType.MAP_KEY, ValueType.MAP_VALUE),
"local currentTime = tonumber(table.remove(ARGV, 1)); " // index is the first parameter
+ "local map = redis.call('hmget', KEYS[1], unpack(ARGV)); "
+ "for i = #map, 1, -1 do "
+ "local value = map[i]; "
+ "if value ~= false then "
+ "local key = ARGV[i]; "
+ "local t, val = struct.unpack('dLc0', value); "

+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[2], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > currentTime then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[2], t + currentTime, key); "
+ "end; "
+ "end; "
+ "end; "
+ "end; "
+ "end; ",
Arrays.<Object>asList(getName(), getIdleSetName()), args.toArray());

}
}
});

return get(f);
}
Expand Down Expand Up @@ -691,4 +739,73 @@ public Future<Boolean> clearExpireAsync() {
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()));
}

@Override
public Future<Set<java.util.Map.Entry<K, V>>> readAllEntrySetAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_ENTRY,
"local s = redis.call('hgetall', KEYS[1]); "
+ "local result = {}; "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local key = s[i-1];" +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "table.insert(result, key); "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis());
}

@Override
public Future<Collection<V>> readAllValuesAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE_LIST,
"local s = redis.call('hgetall', KEYS[1]); "
+ "local result = {}; "
+ "for i, v in ipairs(s) do "
+ "if i % 2 == 0 then "
+ "local t, val = struct.unpack('dLc0', v); "
+ "local key = s[i-1];" +
"local expireDate = 92233720368547758; " +
"local expireDateScore = redis.call('zscore', KEYS[2], key); "
+ "if expireDateScore ~= false then "
+ "expireDate = tonumber(expireDateScore) "
+ "end; "
+ "if t ~= 0 then "
+ "local expireIdle = redis.call('zscore', KEYS[3], key); "
+ "if expireIdle ~= false then "
+ "if tonumber(expireIdle) > tonumber(ARGV[1]) then "
+ "local value = struct.pack('dLc0', t, string.len(val), val); "
+ "redis.call('hset', KEYS[1], key, value); "
+ "redis.call('zadd', KEYS[3], t + tonumber(ARGV[1]), key); "
+ "end; "
+ "expireDate = math.min(expireDate, tonumber(expireIdle)) "
+ "end; "
+ "end; "
+ "if expireDate > tonumber(ARGV[1]) then "
+ "table.insert(result, val); "
+ "end; "
+ "end; "
+ "end;" +
"return result;",
Arrays.<Object>asList(getName(), getTimeoutSetName(), getIdleSetName()), System.currentTimeMillis());
}

}
13 changes: 8 additions & 5 deletions src/main/java/org/redisson/client/handler/CommandDecoder.java
Expand Up @@ -35,6 +35,7 @@
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.decoder.ListMultiDecoder;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.client.protocol.decoder.NestedMultiDecoder;
import org.redisson.client.protocol.pubsub.Message;
Expand Down Expand Up @@ -90,7 +91,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
makeCheckpoint = false;
} else {
CommandData<Object, Object> cmd = (CommandData<Object, Object>)data;
if (cmd.getCommand().getReplayMultiDecoder() != null && NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())) {
if (cmd.getCommand().getReplayMultiDecoder() != null
&& (NestedMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass())
|| ListMultiDecoder.class.isAssignableFrom(cmd.getCommand().getReplayMultiDecoder().getClass()))) {
makeCheckpoint = false;
}
}
Expand Down Expand Up @@ -139,7 +142,7 @@ private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCo
StateLevel firstLevel = state().getLevels().get(0);
StateLevel secondLevel = state().getLevels().get(1);

decodeMulti(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());
decodeList(in, cmd, firstLevel.getParts(), ctx.channel(), secondLevel.getSize(), secondLevel.getParts());

Channel channel = ctx.channel();
MultiDecoder<Object> decoder = messageDecoder(cmd, firstLevel.getParts(), channel);
Expand All @@ -156,7 +159,7 @@ private void decodeFromCheckpoint(ChannelHandlerContext ctx, ByteBuf in, QueueCo
state().resetLevel();
decode(in, cmd, null, ctx.channel());
} else {
decodeMulti(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
decodeList(in, cmd, null, ctx.channel(), firstLevel.getSize(), firstLevel.getParts());
}
}
}
Expand Down Expand Up @@ -267,13 +270,13 @@ private void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> p
}
}

decodeMulti(in, data, parts, channel, size, respParts);
decodeList(in, data, parts, channel, size, respParts);
} else {
throw new IllegalStateException("Can't decode replay " + (char)code);
}
}

private void decodeMulti(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,
Channel channel, long size, List<Object> respParts)
throws IOException {
for (int i = respParts.size(); i < size; i++) {
Expand Down
Expand Up @@ -179,6 +179,7 @@ public interface RedisCommands {
RedisCommand<Set<Object>> EVAL_SET = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder());
RedisCommand<Object> EVAL_OBJECT = new RedisCommand<Object>("EVAL");
RedisCommand<Object> EVAL_MAP_VALUE = new RedisCommand<Object>("EVAL", ValueType.MAP_VALUE);
RedisCommand<Set<Entry<Object, Object>>> EVAL_MAP_ENTRY = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
RedisCommand<List<Object>> EVAL_MAP_VALUE_LIST = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);

RedisStrictCommand<Long> INCR = new RedisStrictCommand<Long>("INCR");
Expand Down

0 comments on commit 904d58d

Please sign in to comment.