Skip to content

Commit

Permalink
MapWriter (write-trough) support for RMap added. #927
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jun 20, 2017
1 parent 2da21fb commit 369e306
Show file tree
Hide file tree
Showing 13 changed files with 1,129 additions and 208 deletions.
158 changes: 116 additions & 42 deletions redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ public String toString() {
private static final RedisCommand<Set<Object>> ALL_KEYS = new RedisCommand<Set<Object>>("EVAL", new ObjectSetReplayDecoder<Object>(), ValueType.MAP_KEY);
private static final RedisCommand<Set<Entry<Object, Object>>> ALL_ENTRIES = new RedisCommand<Set<Entry<Object, Object>>>("EVAL", new ObjectMapEntryReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Map<Object, Object>> ALL_MAP = new RedisCommand<Map<Object, Object>>("EVAL", new ObjectMapReplayDecoder(), ValueType.MAP);
private static final RedisCommand<Object> EVAL_PUT = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);
private static final RedisCommand<Object> EVAL_REMOVE = new RedisCommand<Object>("EVAL", -1, ValueType.OBJECT, ValueType.MAP_VALUE);

private long cacheUpdateLogTime = TimeUnit.MINUTES.toMillis(10);
private byte[] instanceId;
Expand Down Expand Up @@ -416,21 +414,14 @@ protected static byte[] generateLogEntryId(byte[] keyHash) {


@Override
public RFuture<V> putAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}

public RFuture<V> putOperationAsync(K key, V value) {
byte[] mapKey = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
CacheValue cacheValue = new CacheValue(key, value);
cache.put(cacheKey, cacheValue);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_PUT,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hset', KEYS[1], ARGV[1], ARGV[2]) == 0 then "
+ "if ARGV[4] == '1' then "
Expand All @@ -447,14 +438,7 @@ public RFuture<V> putAsync(K key, V value) {
}

@Override
public RFuture<Boolean> fastPutAsync(K key, V value) {
if (key == null) {
throw new NullPointerException();
}
if (value == null) {
throw new NullPointerException();
}

protected RFuture<Boolean> fastPutOperationAsync(K key, V value) {
byte[] encodedKey = encodeMapKey(key);
byte[] encodedValue = encodeMapValue(value);
CacheKey cacheKey = toCacheKey(encodedKey);
Expand Down Expand Up @@ -489,17 +473,13 @@ public void destroy() {
}

@Override
public RFuture<V> removeAsync(K key) {
if (key == null) {
throw new NullPointerException();
}

public RFuture<V> removeOperationAsync(K key) {
byte[] keyEncoded = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyEncoded);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
cache.remove(cacheKey);
return commandExecutor.evalWriteAsync(getName(), codec, EVAL_REMOVE,
return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_MAP_VALUE,
"local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "if redis.call('hdel', KEYS[1], ARGV[1]) == 1 then "
+ "if ARGV[3] == '1' then "
Expand All @@ -516,10 +496,87 @@ public RFuture<V> removeAsync(K key) {
}

@Override
public RFuture<Long> fastRemoveAsync(K ... keys) {
if (keys == null) {
throw new NullPointerException();
protected RFuture<List<Long>> fastRemoveOperationBatchAsync(@SuppressWarnings("unchecked") K... keys) {
if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);

CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);
}

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 1, #ARGV, 2 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)),
params.toArray());
}

if (invalidateEntryOnChange == 2) {
List<Object> params = new ArrayList<Object>(keys.length*3);
params.add(System.currentTimeMillis());
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);

CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
params.add(msgEncoded);

byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
params.add(entryId);
}

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for j = 2, #ARGV, 3 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[j]);"
+ "if val == 1 then "
+ "redis.call('zadd', KEYS[3], ARGV[1], ARGV[j+2]);"
+ "redis.call('publish', KEYS[2], ARGV[j+1]); "
+ "end;"
+ "table.insert(result, val);"
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
params.toArray());
}

List<Object> params = new ArrayList<Object>(keys.length);
for (K k : keys) {
byte[] keyEncoded = encodeMapKey(k);
params.add(keyEncoded);

CacheKey cacheKey = toCacheKey(keyEncoded);
cache.remove(cacheKey);
}

RFuture<List<Long>> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LIST,
"local result = {}; " +
"for i = 1, #ARGV, 1 do "
+ "local val = redis.call('hdel', KEYS[1], ARGV[i]); "
+ "table.insert(result, val); "
+ "end;"
+ "return result;",
Arrays.<Object>asList(getName()),
params.toArray());
return future;
}

@Override
protected RFuture<Long> fastRemoveOperationAsync(K ... keys) {

if (invalidateEntryOnChange == 1) {
List<Object> params = new ArrayList<Object>(keys.length*2);
Expand Down Expand Up @@ -892,11 +949,7 @@ private void cacheMap(Map<?, ?> map) {
}

@Override
public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
if (map.isEmpty()) {
return newSucceededFuture(null);
}

public RFuture<Void> putAllOperationAsync(final Map<? extends K, ? extends V> map) {
List<Object> params = new ArrayList<Object>(map.size()*3);
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
Expand Down Expand Up @@ -952,7 +1005,7 @@ public void operationComplete(Future<Void> future) throws Exception {
}

@Override
public RFuture<V> addAndGetAsync(final K key, Number value) {
public RFuture<V> addAndGetOperationAsync(final K key, Number value) {
final byte[] keyState = encodeMapKey(key);
CacheKey cacheKey = toCacheKey(keyState);
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
Expand Down Expand Up @@ -1151,14 +1204,14 @@ private <R> RFuture<R> readAll(RedisCommand<?> evalCommandType, List<Object> map
}

@Override
public RFuture<V> replaceAsync(final K key, final V value) {
protected RFuture<V> replaceOperationAsync(K key, V value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));

RFuture<V> future = commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
return commandExecutor.evalWriteAsync(getName(key), codec, RedisCommands.EVAL_MAP_VALUE,
"if redis.call('hexists', KEYS[1], ARGV[1]) == 1 then "
+ "local v = redis.call('hget', KEYS[1], ARGV[1]); "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[2]); "
Expand All @@ -1177,7 +1230,14 @@ public RFuture<V> replaceAsync(final K key, final V value) {
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}

@Override
public RFuture<V> replaceAsync(final K key, final V value) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);

RFuture<V> future = super.replaceAsync(key, value);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
Expand All @@ -1186,25 +1246,24 @@ public void operationComplete(Future<V> future) throws Exception {
}

if (future.getNow() != null) {
CacheKey cacheKey = toCacheKey(key);
cache.put(cacheKey, new CacheValue(key, value));
}
}
});

return future;
}

@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
protected RFuture<Boolean> replaceOperationAsync(K key, V oldValue, V newValue) {
final byte[] keyState = encodeMapKey(key);
byte[] oldValueState = encodeMapValue(oldValue);
byte[] newValueState = encodeMapValue(newValue);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));

RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "redis.call('hset', KEYS[1], ARGV[1], ARGV[3]); "
+ "if ARGV[4] == '1' then "
Expand All @@ -1220,7 +1279,14 @@ public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue)
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, oldValueState, newValueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}

@Override
public RFuture<Boolean> replaceAsync(final K key, V oldValue, final V newValue) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);

RFuture<Boolean> future = super.replaceAsync(key, oldValue, newValue);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
Expand All @@ -1238,14 +1304,14 @@ public void operationComplete(Future<Boolean> future) throws Exception {
}

@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
protected RFuture<Boolean> removeOperationAsync(Object key, Object value) {
final byte[] keyState = encodeMapKey(key);
byte[] valueState = encodeMapValue(value);
final CacheKey cacheKey = toCacheKey(keyState);
byte[] entryId = generateLogEntryId(cacheKey.getKeyHash());
byte[] msg = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));

RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
return commandExecutor.evalWriteAsync(getName(key), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('hget', KEYS[1], ARGV[1]) == ARGV[2] then "
+ "if ARGV[3] == '1' then "
+ "redis.call('publish', KEYS[2], ARGV[4]); "
Expand All @@ -1260,6 +1326,13 @@ public RFuture<Boolean> removeAsync(Object key, Object value) {
+ "end",
Arrays.<Object>asList(getName(key), invalidationTopic.getChannelNames().get(0), getUpdatesLogName()),
keyState, valueState, invalidateEntryOnChange, msg, System.currentTimeMillis(), entryId);
}

@Override
public RFuture<Boolean> removeAsync(Object key, Object value) {
final byte[] keyState = encodeMapKey(key);
final CacheKey cacheKey = toCacheKey(keyState);
RFuture<Boolean> future = super.removeAsync(key, value);

future.addListener(new FutureListener<Boolean>() {
@Override
Expand All @@ -1276,6 +1349,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
return future;
}


@Override
public RFuture<V> putIfAbsentAsync(final K key, final V value) {
RFuture<V> future = super.putIfAbsentAsync(key, value);
Expand Down

0 comments on commit 369e306

Please sign in to comment.