Skip to content

Commit

Permalink
RCache optimization. #195
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 6, 2015
1 parent a07e4d4 commit 0693c3b
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 89 deletions.
5 changes: 5 additions & 0 deletions src/main/java/org/redisson/Redisson.java
Expand Up @@ -194,6 +194,11 @@ public <K, V> RCache<K, V> getCache(String name) {
return new RedissonCache<K, V>(commandExecutor, name);
}

@Override
public <K, V> RCache<K, V> getCache(String name, Codec codec) {
return new RedissonCache<K, V>(codec, commandExecutor, name);
}

@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, commandExecutor, name);
Expand Down
178 changes: 96 additions & 82 deletions src/main/java/org/redisson/RedissonCache.java
Expand Up @@ -18,7 +18,6 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -27,20 +26,25 @@
import java.util.concurrent.TimeUnit;

import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommand.ValueType;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.convertor.BooleanReplayConvertor;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.convertor.LongReplayConvertor;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectMapReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.MapGetAllDecoder;
import org.redisson.core.RCache;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

/**
* Distributed and concurrent implementation of {@link java.util.concurrent.ConcurrentMap}
Expand All @@ -59,50 +63,60 @@ public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K,
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_GET_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_GET_TTL = new RedisCommand<List<Object>>("EVAL", new TTLMapValueReplayDecoder<Object>(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_CONTAINS = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5);

private static final RedisCommand<Boolean> EVAL_CONTAINS = new RedisCommand<Boolean>("EVAL", new BooleanReplayConvertor(), 6);
private static final RedisCommand<Set<Object>> EVAL_HKEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder(), ValueType.MAP_KEY);
private static final RedisCommand<List<Object>> EVAL_HVALS = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), ValueType.MAP_VALUE);
private static final RedisCommand<Map<Object, Object>> EVAL_HGETALL = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 2);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 2, ValueType.MAP_KEY);

private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);

protected RedissonCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
}

public RedissonCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonCache(Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);

// commandExecutor.getConnectionManager().getGroup().scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
// "local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, 1000); "
// + "if table.getn(expiredKeys) > 0 then "
// + "expiredKeys = unpack(expiredKeys); "
// + "redis.call('zrem', KEYS[2], expiredKeys); "
// + "redis.call('hdel', KEYS[1], expiredKeys); "
// + "end; ",
// Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
// }
// }, initialDelay, delay, unit)
}

@Override
public Future<Integer> sizeAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_INTEGER,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "end; "
+ "return redis.call('hlen', KEYS[1]);",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
return commandExecutor.readAsync(getName(), codec, RedisCommands.HLEN, getName());
}

@Override
public Future<Boolean> containsKeyAsync(Object key) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_CONTAINS,
"local v = redis.call('hexists', KEYS[1], ARGV[2]); "
+ "if v == 1 then "
+ "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDate ~= false and expireDate <= ARGV[1] then "
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
+ "redis.call('hdel', KEYS[1], ARGV[2]); "
+ "return false;"
+ "end;"
+ "return true;"
+ "end;"
+ "return false;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key);
Promise<Boolean> result = newPromise();

Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_CONTAINS,
"local value = redis.call('hexists', KEYS[1], ARGV[1]); " +
"local expireDate = 92233720368547758; " +
"if value == 1 then " +
"expireDate = redis.call('zscore', KEYS[2], ARGV[1]); "
+ "if expireDate ~= false then "
+ "expireDate = tonumber(expireDate) "
+ "end; " +
"end;" +
"return {expireDate, value}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), key);

addExpireListener(result, future, new BooleanReplayConvertor(), false);

return result;
}

@Override
Expand Down Expand Up @@ -153,50 +167,8 @@ public Future<Map<K, V>> getAllAsync(Set<K> keys) {
Arrays.<Object>asList(getName(), getTimeoutSetName()), args.toArray());
}

@Override
public Future<Set<K>> keySetAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HKEYS,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "end; "
+ "return redis.call('hkeys', KEYS[1]);",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
}

@Override
public Future<Collection<V>> valuesAsync() {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_HVALS,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "end; "
+ "return redis.call('hvals', KEYS[1]);",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());
}

@Override
public Set<java.util.Map.Entry<K, V>> entrySet() {
Future<Map<K, V>> f = commandExecutor.evalWriteAsync(getName(), codec, EVAL_HGETALL,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "end; "
+ "return redis.call('hgetall', KEYS[1]);",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis());

Map<K, V> map = get(f);
return map.entrySet();
}

public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) {
return get(putIfAbsent(key, value, ttl, unit));
return get(putIfAbsentAsync(key, value, ttl, unit));
}

public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) {
Expand Down Expand Up @@ -229,16 +201,58 @@ public Future<Long> removeAsync(Object key, Object value) {
}

public Future<V> getAsync(K key) {
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_GET_TTL,
"local v = redis.call('hget', KEYS[1], ARGV[2]); "
+ "local expireDate = redis.call('zscore', KEYS[2], ARGV[2]); "
+ "if expireDate ~= false and expireDate <= ARGV[1] then "
+ "redis.call('zrem', KEYS[2], ARGV[2]); "
+ "redis.call('hdel', KEYS[1], ARGV[2]); "
+ "return nil;"
+ "end;"
+ "return v;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), System.currentTimeMillis(), key);
Promise<V> result = newPromise();

Future<List<Object>> future = commandExecutor.evalReadAsync(getName(), codec, EVAL_GET_TTL,
"local value = redis.call('hget', KEYS[1], ARGV[1]); " +
"local expireDate = redis.call('zscore', KEYS[2], ARGV[1]); "
+ "if expireDate == false then "
+ "expireDate = 92233720368547758; "
+ "end; " +
"return {expireDate, value}; ",
Arrays.<Object>asList(getName(), getTimeoutSetName()), key);

addExpireListener(result, future, null, null);

return result;
}

private <T> void addExpireListener(final Promise<T> result, Future<List<Object>> future, final Convertor<T> convertor, final T nullValue) {
future.addListener(new FutureListener<List<Object>>() {
@Override
public void operationComplete(Future<List<Object>> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
return;
}

List<Object> res = future.getNow();
Long expireDate = (Long) res.get(0);
long currentDate = System.currentTimeMillis();
if (expireDate <= currentDate) {
result.setSuccess(nullValue);
expireMap(currentDate);
return;
}

if (convertor != null) {
result.setSuccess((T) convertor.convert(res.get(1)));
} else {
result.setSuccess((T) res.get(1));
}
}
});
}

private void expireMap(long currentDate) {
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL_REMOVE_EXPIRED,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1]); "
+ "if table.getn(expiredKeys) > 0 then "
+ "expiredKeys = unpack(expiredKeys); "
+ "redis.call('zrem', KEYS[2], expiredKeys); "
+ "redis.call('hdel', KEYS[1], expiredKeys); "
+ "end;",
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}

public V put(K key, V value, long ttl, TimeUnit unit) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/redisson/RedissonClient.java
Expand Up @@ -52,6 +52,8 @@
*/
public interface RedissonClient {

<K, V> RCache<K, V> getCache(String name, Codec codec);

<K, V> RCache<K, V> getCache(String name);

/**
Expand Down
17 changes: 10 additions & 7 deletions src/main/java/org/redisson/client/protocol/RedisCommand.java
Expand Up @@ -165,22 +165,25 @@ public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int objectP
}

public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder) {
this(name, replayMultiDecoder, -1);
this(name, replayMultiDecoder, null, -1);
}

public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, Convertor<R> convertor) {
this(name, replayMultiDecoder, -1);
this.convertor = convertor;
this(name, replayMultiDecoder, convertor, -1);
}

public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, Convertor<R> convertor, int inParamIndex) {
this(name, replayMultiDecoder, inParamIndex);
this.convertor = convertor;
}

public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int objectParamIndex) {
this(name, null, replayMultiDecoder, null, objectParamIndex);
public RedisCommand(String name, MultiDecoder<R> replayMultiDecoder, int inParamIndex) {
this(name, null, replayMultiDecoder, null, inParamIndex);
}

public RedisCommand(String name, String subName, MultiDecoder<R> replayMultiDecoder,
int objectParamIndex) {
this(name, subName, replayMultiDecoder, null, objectParamIndex);
int inParamIndex) {
this(name, subName, replayMultiDecoder, null, inParamIndex);
}

public RedisCommand(String name, String subName, MultiDecoder<R> replayMultiDecoder, Decoder<R> reponseDecoder, int inParamIndex) {
Expand Down
@@ -0,0 +1,42 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;

import java.util.List;

import org.redisson.client.handler.State;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class TTLMapValueReplayDecoder<T> implements MultiDecoder<List<T>> {

@Override
public Object decode(ByteBuf buf, State state) {
return Long.valueOf(buf.toString(CharsetUtil.UTF_8));
}

@Override
public List<T> decode(List<Object> parts, State state) {
return (List<T>) parts;
}

@Override
public boolean isApplicable(int paramNum, State state) {
return paramNum == 0;
}

}

0 comments on commit 0693c3b

Please sign in to comment.