Skip to content

Commit

Permalink
RCacheReactive implemented. #318
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 9, 2015
1 parent c6255fa commit 4cc4785
Show file tree
Hide file tree
Showing 23 changed files with 1,097 additions and 201 deletions.
10 changes: 5 additions & 5 deletions src/main/java/org/redisson/Redisson.java
Expand Up @@ -40,7 +40,7 @@
import org.redisson.core.RBitSet;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket;
import org.redisson.core.RCache;
import org.redisson.core.RMapCache;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
Expand Down Expand Up @@ -197,13 +197,13 @@ public <K, V> RMap<K, V> getMap(String name) {
}

@Override
public <K, V> RCache<K, V> getCache(String name) {
return new RedissonCache<K, V>(commandExecutor, name);
public <K, V> RMapCache<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(commandExecutor, name);
}

@Override
public <K, V> RCache<K, V> getCache(String name, Codec codec) {
return new RedissonCache<K, V>(codec, commandExecutor, name);
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, commandExecutor, name);
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/redisson/RedissonBatch.java
Expand Up @@ -25,7 +25,7 @@
import org.redisson.core.RBitSetAsync;
import org.redisson.core.RBlockingQueueAsync;
import org.redisson.core.RBucketAsync;
import org.redisson.core.RCacheAsync;
import org.redisson.core.RMapCacheAsync;
import org.redisson.core.RDequeAsync;
import org.redisson.core.RHyperLogLogAsync;
import org.redisson.core.RKeysAsync;
Expand Down Expand Up @@ -164,13 +164,13 @@ public RBitSetAsync getBitSet(String name) {
}

@Override
public <K, V> RCacheAsync<K, V> getCache(String name, Codec codec) {
return new RedissonCache<K, V>(codec, executorService, name);
public <K, V> RMapCacheAsync<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, executorService, name);
}

@Override
public <K, V> RCacheAsync<K, V> getCache(String name) {
return new RedissonCache<K, V>(executorService, name);
public <K, V> RMapCacheAsync<K, V> getMapCache(String name) {
return new RedissonMapCache<K, V>(executorService, name);
}

@Override
Expand Down
Expand Up @@ -23,49 +23,55 @@

import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.internal.PlatformDependent;

/**
* Eviction scheduler for RCache object.
* Eviction scheduler for RMapCache object.
* Deletes expired entries in time interval between 5 seconds to 2 hours.
* It analyzes deleted amount of expired keys
* and 'tune' next execution delay depending on it.
*
* @author Nikita Koksharov
*
*/
public class RedissonCacheEvictScheduler {
public class RedissonCacheEvictionScheduler {

public static class RedissonCacheTask implements Runnable {

final RedissonCache<?, ?> cache;
final String name;
final String timeoutSetName;
final CommandAsyncExecutor executor;
final Deque<Integer> sizeHistory = new LinkedList<Integer>();
int delay = 10;

int minDelay = 5;
int maxDelay = 2*60*60;
int keysLimit = 500;

public RedissonCacheTask(RedissonCache<?, ?> cache) {
this.cache = cache;
public RedissonCacheTask(String name, String timeoutSetName, CommandAsyncExecutor executor) {
this.name = name;
this.timeoutSetName = timeoutSetName;
this.executor = executor;
}

public void schedule() {
cache.commandExecutor.getConnectionManager().getGroup().schedule(this, delay, TimeUnit.SECONDS);
executor.getConnectionManager().getGroup().schedule(this, delay, TimeUnit.SECONDS);
}

@Override
public void run() {
Future<Integer> future = cache.commandExecutor.evalWriteAsync(cache.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
Future<Integer> future = executor.evalWriteAsync(name, LongCodec.INSTANCE, RedisCommands.EVAL_INTEGER,
"local expiredKeys = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredKeys > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredKeys)); "
+ "redis.call('hdel', KEYS[1], unpack(expiredKeys)); "
+ "end; "
+ "return #expiredKeys;",
Arrays.<Object>asList(cache.getName(), cache.getTimeoutSetName()), System.currentTimeMillis(), keysLimit);
Arrays.<Object>asList(name, timeoutSetName), System.currentTimeMillis(), keysLimit);

future.addListener(new FutureListener<Integer>() {
@Override
Expand Down Expand Up @@ -111,9 +117,9 @@ public void operationComplete(Future<Integer> future) throws Exception {

private final ConcurrentMap<String, RedissonCacheTask> tasks = PlatformDependent.newConcurrentHashMap();

public void schedule(RedissonCache<?, ?> cache) {
RedissonCacheTask task = new RedissonCacheTask(cache);
RedissonCacheTask prevTask = tasks.putIfAbsent(cache.getName(), task);
public void schedule(String name, String timeoutSetName, CommandAsyncExecutor executor) {
RedissonCacheTask task = new RedissonCacheTask(name, timeoutSetName, executor);
RedissonCacheTask prevTask = tasks.putIfAbsent(name, task);
if (prevTask == null) {
task.schedule();
}
Expand Down
25 changes: 18 additions & 7 deletions src/main/java/org/redisson/RedissonClient.java
Expand Up @@ -16,7 +16,6 @@
package org.redisson;

import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;

import org.redisson.client.codec.Codec;
import org.redisson.core.ClusterNode;
Expand All @@ -27,7 +26,7 @@
import org.redisson.core.RBitSet;
import org.redisson.core.RBlockingQueue;
import org.redisson.core.RBucket;
import org.redisson.core.RCache;
import org.redisson.core.RMapCache;
import org.redisson.core.RCountDownLatch;
import org.redisson.core.RDeque;
import org.redisson.core.RHyperLogLog;
Expand All @@ -54,25 +53,37 @@
*/
public interface RedissonClient {

/**
* Returns readWriteLock instance by name.
*
* @param name
* @return
*/
RReadWriteLock getReadWriteLock(String name);

/**
* Returns map-based cache instance with eviction support by name
* using provided codec for both cache keys and values.
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<K, V> RCache<K, V> getCache(String name, Codec codec);
<K, V> RMapCache<K, V> getMapCache(String name, Codec codec);

/**
* Returns map-based cache instance with eviction support by name.
* Returns map-based cache instance by name.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
*
* @param name
* @return
*/
<K, V> RCache<K, V> getCache(String name);
<K, V> RMapCache<K, V> getMapCache(String name);

/**
* Returns object holder instance by name.
Expand Down
Expand Up @@ -40,7 +40,7 @@
import org.redisson.client.protocol.decoder.TTLMapValueReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.connection.decoder.CacheGetAllDecoder;
import org.redisson.core.RCache;
import org.redisson.core.RMapCache;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
Expand All @@ -58,32 +58,28 @@
* @param <K> key
* @param <V> value
*/
public class RedissonCache<K, V> extends RedissonMap<K, V> implements RCache<K, V> {
public class RedissonMapCache<K, V> extends RedissonMap<K, V> implements RMapCache<K, V> {

private static final RedisCommand<MapScanResult<Object, Object>> EVAL_HSCAN = new RedisCommand<MapScanResult<Object, Object>>("EVAL", new NestedMultiDecoder(new ObjectMapReplayDecoder(), new MapScanResultReplayDecoder()), ValueType.MAP);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REPLACE = new RedisCommand<Object>("EVAL", 4, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_REMOVE_VALUE = new RedisCommand<Long>("EVAL", new LongReplayConvertor(), 5, ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = EVAL_REPLACE;
private static final RedisCommand<Object> EVAL_PUT_TTL = new RedisCommand<Object>("EVAL", 6, ValueType.MAP, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_GET_TTL = new RedisCommand<List<Object>>("EVAL", new TTLMapValueReplayDecoder<Object>(), 5, ValueType.MAP_KEY, ValueType.MAP_VALUE);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_KEY = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_KEY);
private static final RedisCommand<List<Object>> EVAL_CONTAINS_VALUE = new RedisCommand<List<Object>>("EVAL", new ObjectListReplayDecoder<Object>(), 5, ValueType.MAP_VALUE);
private static final RedisCommand<Long> EVAL_FAST_REMOVE = new RedisCommand<Long>("EVAL", 5, ValueType.MAP_KEY);
private static final RedisCommand<Long> EVAL_REMOVE_EXPIRED = new RedisCommand<Long>("EVAL", 5);

private static final RedissonCacheEvictScheduler SCHEDULER = new RedissonCacheEvictScheduler();
private static final RedissonCacheEvictionScheduler SCHEDULER = new RedissonCacheEvictionScheduler();

{
SCHEDULER.schedule(this);
}

protected RedissonCache(CommandAsyncExecutor commandExecutor, String name) {
protected RedissonMapCache(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
}

public RedissonCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
public RedissonMapCache(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
super(codec, commandExecutor, name);
SCHEDULER.schedule(getName(), getTimeoutSetName(), commandExecutor);
}

@Override
Expand Down Expand Up @@ -183,24 +179,26 @@ public void operationComplete(Future<List<Object>> future) throws Exception {

}

@Override
public V putIfAbsent(K key, V value, long ttl, TimeUnit unit) {
return get(putIfAbsentAsync(key, value, ttl, unit));
}

@Override
public Future<V> putIfAbsentAsync(K key, V value, long ttl, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("TimeUnit param can't be null");
}

long timeoutDate = System.currentTimeMillis() + unit.toMillis(ttl);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[1]) "
+ "end",
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT_TTL,
"if redis.call('hexists', KEYS[1], ARGV[2]) == 0 then "
+ "redis.call('zadd', KEYS[2], ARGV[1], ARGV[2]); "
+ "redis.call('hset', KEYS[1], ARGV[2], ARGV[3]); "
+ "return nil "
+ "else "
+ "return redis.call('hget', KEYS[1], ARGV[2]) "
+ "end",
Arrays.<Object>asList(getName(), getTimeoutSetName()), timeoutDate, key, value);
}

Expand All @@ -216,6 +214,7 @@ public Future<Long> removeAsync(Object key, Object value) {
Arrays.<Object>asList(getName(), getTimeoutSetName()), key, value);
}

@Override
public Future<V> getAsync(K key) {
Promise<V> result = newPromise();

Expand Down Expand Up @@ -270,10 +269,12 @@ private void expireMap(long currentDate) {
Arrays.<Object>asList(getName(), getTimeoutSetName()), currentDate);
}

@Override
public V put(K key, V value, long ttl, TimeUnit unit) {
return get(putAsync(key, value, ttl, unit));
}

@Override
public Future<V> putAsync(K key, V value, long ttl, TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("TimeUnit param can't be null");
Expand Down Expand Up @@ -315,6 +316,7 @@ public Future<Long> fastRemoveAsync(K ... keys) {
Arrays.<Object>asList(getName(), getTimeoutSetName()), keys);
}

@Override
MapScanResult<Object, V> scanIterator(InetSocketAddress client, long startPos) {
Future<MapScanResult<Object, V>> f = commandExecutor.evalReadAsync(client, getName(), codec, EVAL_HSCAN,
"local result = {}; "
Expand Down
19 changes: 13 additions & 6 deletions src/main/java/org/redisson/RedissonReactive.java
Expand Up @@ -24,6 +24,7 @@
import org.redisson.api.RBitSetReactive;
import org.redisson.api.RBlockingQueueReactive;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RMapCacheReactive;
import org.redisson.api.RDequeReactive;
import org.redisson.api.RHyperLogLogReactive;
import org.redisson.api.RKeysReactive;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.redisson.reactive.RedissonBitSetReactive;
import org.redisson.reactive.RedissonBlockingQueueReactive;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
import org.redisson.reactive.RedissonDequeReactive;
import org.redisson.reactive.RedissonHyperLogLogReactive;
import org.redisson.reactive.RedissonKeysReactive;
Expand Down Expand Up @@ -102,12 +104,17 @@ public class RedissonReactive implements RedissonReactiveClient {
}


/**
* Returns object holder by name
*
* @param name of object
* @return
*/
@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCacheReactive<K, V>(codec, commandExecutor, name);
}


@Override
public <K, V> RMapCacheReactive<K, V> getMapCache(String name) {
return new RedissonMapCacheReactive<K, V>(commandExecutor, name);
}

@Override
public <V> RBucketReactive<V> getBucket(String name) {
return new RedissonBucketReactive<V>(commandExecutor, name);
Expand Down
27 changes: 24 additions & 3 deletions src/main/java/org/redisson/api/RBatchReactive.java
Expand Up @@ -18,11 +18,8 @@
import java.util.List;

import org.reactivestreams.Publisher;
import org.redisson.client.RedisException;
import org.redisson.client.codec.Codec;

import io.netty.util.concurrent.Future;

/**
* Interface for using pipeline feature.
*
Expand All @@ -37,6 +34,30 @@
*/
public interface RBatchReactive {

/**
* Returns map-based cache instance by <code>name</code>
* using provided <code>codec</code> for both cache keys and values.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String, Codec)}.</p>
*
* @param name
* @param codec
* @return
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name, Codec codec);

/**
* Returns map-based cache instance by <code>name</code>.
* Supports entry eviction with a given TTL value.
*
* <p>If eviction is not required then it's better to use regular map {@link #getMap(String)}.</p>
*
* @param name
* @return
*/
<K, V> RMapCacheReactive<K, V> getMapCache(String name);

/**
* Returns object holder by name
*
Expand Down

0 comments on commit 4cc4785

Please sign in to comment.