Skip to content

Commit

Permalink
LocalCachedMap.putAll optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Mar 4, 2017
1 parent 204a594 commit 9b5b7f5
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions redisson/src/main/java/org/redisson/RedissonLocalCachedMap.java
Expand Up @@ -53,7 +53,8 @@
import org.redisson.client.protocol.decoder.ObjectMapEntryReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.misc.*;
import org.redisson.misc.Hash;
import org.redisson.misc.RPromise;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
Expand All @@ -73,23 +74,23 @@ public static class LocalCachedMapClear implements Serializable {
public static class LocalCachedMapInvalidate implements Serializable {

private byte[] excludedId;
private byte[] keyHash;
private List<byte[]> keyHashes;

public LocalCachedMapInvalidate() {
}

public LocalCachedMapInvalidate(byte[] excludedId, byte[] keyHash) {
public LocalCachedMapInvalidate(byte[] excludedId, byte[]... keyHash) {
super();
this.keyHash = keyHash;
this.keyHashes = Arrays.asList(keyHash);
this.excludedId = excludedId;
}

public byte[] getExcludedId() {
return excludedId;
}

public byte[] getKeyHash() {
return keyHash;
public Collection<byte[]> getKeyHashes() {
return keyHashes;
}

}
Expand Down Expand Up @@ -230,8 +231,10 @@ public void onMessage(String channel, Object msg) {
if (msg instanceof LocalCachedMapInvalidate) {
LocalCachedMapInvalidate invalidateMsg = (LocalCachedMapInvalidate)msg;
if (!Arrays.equals(invalidateMsg.getExcludedId(), instanceId)) {
CacheKey key = new CacheKey(invalidateMsg.getKeyHash());
cache.remove(key);
for (byte[] keyHash : invalidateMsg.getKeyHashes()) {
CacheKey key = new CacheKey(keyHash);
cache.remove(key);
}
}
}
}
Expand Down Expand Up @@ -725,27 +728,30 @@ public RFuture<Void> putAllAsync(final Map<? extends K, ? extends V> map) {
}

List<Object> params = new ArrayList<Object>(map.size()*3);
List<Object> msgs = new ArrayList<Object>(map.size());
params.add(invalidateEntryOnChange);
params.add(map.size()*2);
byte[][] hashes = new byte[map.size()][];
int i = 0;
for (java.util.Map.Entry<? extends K, ? extends V> t : map.entrySet()) {
byte[] mapKey = encodeMapKey(t.getKey());
byte[] mapValue = encodeMapValue(t.getValue());
params.add(mapKey);
params.add(mapValue);
CacheKey cacheKey = toCacheKey(mapKey);
byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, cacheKey.getKeyHash()));
msgs.add(msgEncoded);
hashes[i] = cacheKey.getKeyHash();
i++;
}
params.addAll(msgs);

byte[] msgEncoded = encode(new LocalCachedMapInvalidate(instanceId, hashes));
params.add(msgEncoded);

final RPromise<Void> result = newPromise();
RFuture<Void> future = commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
"redis.call('hmset', KEYS[1], unpack(ARGV, 3, tonumber(ARGV[2]) + 2));"
+ "if ARGV[1] == '1' then "
+ "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[i]); "
+ "end; "
// + "for i = tonumber(ARGV[2]) + 3, #ARGV, 1 do "
+ "redis.call('publish', KEYS[2], ARGV[#ARGV]); "
// + "end; "
+ "end;",
Arrays.<Object>asList(getName(), invalidationTopic.getChannelNames().get(0)), params.toArray());

Expand Down

0 comments on commit 9b5b7f5

Please sign in to comment.