diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index 3361b8ce021..1aae2191273 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -207,7 +206,7 @@ public RFuture deleteByPatternAsync(final String pattern) { final RPromise result = commandExecutor.getConnectionManager().newPromise(); final AtomicReference failed = new AtomicReference(); final AtomicLong count = new AtomicLong(); - Set entries = commandExecutor.getConnectionManager().getEntrySet(); + Collection entries = commandExecutor.getConnectionManager().getEntrySet(); final AtomicLong executed = new AtomicLong(entries.size()); final FutureListener listener = new FutureListener() { @Override @@ -301,14 +300,13 @@ private RFuture executeAsync(RedisStrictCommand command, String ... Map> range2key = new HashMap>(); for (String key : keys) { int slot = commandExecutor.getConnectionManager().calcSlot(key); - for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { - List list = range2key.get(entry); - if (list == null) { - list = new ArrayList(); - range2key.put(entry, list); - } - list.add(key); + MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(slot); + List list = range2key.get(entry); + if (list == null) { + list = new ArrayList(); + range2key.put(entry, list); } + list.add(key); } final RPromise result = commandExecutor.getConnectionManager().newPromise(); diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index bb3e7d3521a..4eb0a1977b6 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -568,8 +568,8 @@ private void checkSlotsChange(ClusterServersConfig cfg, Collection newPartitions, Str MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); for (Integer slot : addedSlots) { - entry.addSlotRange(slot); addEntry(slot, entry); lastPartitions.put(slot, currentPartition); } @@ -623,7 +621,6 @@ private void checkSlotsMigration(Collection newPartitions, Str removedSlots.removeAll(newPartition.getSlots()); for (Integer removeSlot : removedSlots) { if (lastPartitions.remove(removeSlot, currentPartition)) { - entry.removeSlotRange(removeSlot); removeMaster(removeSlot); } } diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 784d6a278f3..cadcecefd0a 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -16,11 +16,13 @@ package org.redisson.command; import java.net.InetSocketAddress; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -71,9 +73,6 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; -import java.util.AbstractMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; /** * @@ -198,7 +197,7 @@ public RFuture readAsync(InetSocketAddress client, String key, Codec c @Override public RFuture> readAllAsync(RedisCommand command, Object... params) { final RPromise> mainPromise = connectionManager.newPromise(); - final Set nodes = connectionManager.getEntrySet(); + final Collection nodes = connectionManager.getEntrySet(); final List results = new ArrayList(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @@ -288,7 +287,7 @@ public RFuture readAllAsync(RedisCommand command, SlotCallback RFuture allAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); - final Set nodes = connectionManager.getEntrySet(); + final Collection nodes = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(nodes.size()); FutureListener listener = new FutureListener() { @Override @@ -413,7 +412,7 @@ public RFuture evalWriteAllAsync(RedisCommand command, SlotCallback public RFuture evalAllAsync(boolean readOnlyMode, RedisCommand command, final SlotCallback callback, String script, List keys, Object... params) { final RPromise mainPromise = connectionManager.newPromise(); - final Set entries = connectionManager.getEntrySet(); + final Collection entries = connectionManager.getEntrySet(); final AtomicInteger counter = new AtomicInteger(entries.size()); FutureListener listener = new FutureListener() { diff --git a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java index 6a6ee051753..b89edb115fe 100644 --- a/redisson/src/main/java/org/redisson/connection/ConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/ConnectionManager.java @@ -18,7 +18,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.Collection; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -85,7 +84,7 @@ public interface ConnectionManager { Codec getCodec(); - Set getEntrySet(); + Collection getEntrySet(); MasterSlaveEntry getEntry(int slot); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 5918060b9ad..69e357742aa 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -24,9 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Queue; -import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -130,7 +128,8 @@ public boolean cancel() { protected MasterSlaveServersConfig config; - private final Map entries = PlatformDependent.newConcurrentHashMap(); + private final Map slot2entry = PlatformDependent.newConcurrentHashMap(); + private final Map addr2entry = PlatformDependent.newConcurrentHashMap(); private final RPromise shutdownPromise; @@ -234,8 +233,9 @@ public Codec getCodec() { return codec; } - public Set getEntrySet() { - return new HashSet(entries.values()); + @Override + public Collection getEntrySet() { + return addr2entry.values(); } protected void initTimer(MasterSlaveServersConfig config) { @@ -672,29 +672,34 @@ public boolean onStatus(PubSubType type, String channel) { @Override public MasterSlaveEntry getEntry(InetSocketAddress addr) { - // TODO optimize - for (Entry entry : entries.entrySet()) { - if (entry.getValue().getClient().getAddr().equals(addr)) { - return entry.getValue(); - } - } - return null; + return addr2entry.get(addr); } + @Override public MasterSlaveEntry getEntry(int slot) { - return entries.get(slot); + return slot2entry.get(slot); } - protected void changeMaster(int slot, URI address) { - getEntry(slot).changeMaster(address); + protected final void changeMaster(int slot, URI address) { + MasterSlaveEntry entry = getEntry(slot); + addr2entry.remove(entry.getClient().getAddr()); + entry.changeMaster(address); + addr2entry.put(entry.getClient().getAddr(), entry); } - protected void addEntry(Integer slot, MasterSlaveEntry entry) { - entries.put(slot, entry); + protected final void addEntry(Integer slot, MasterSlaveEntry entry) { + slot2entry.put(slot, entry); + entry.addSlotRange(slot); + addr2entry.put(entry.getClient().getAddr(), entry); } protected MasterSlaveEntry removeMaster(Integer slot) { - return entries.remove(slot); + MasterSlaveEntry entry = slot2entry.remove(slot); + entry.removeSlotRange(slot); + if (entry.getSlotRanges().isEmpty()) { + addr2entry.remove(entry.getClient().getAddr()); + } + return entry; } @Override @@ -812,7 +817,7 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { shutdownPromise.trySuccess(true); shutdownLatch.awaitUninterruptibly(); - for (MasterSlaveEntry entry : entries.values()) { + for (MasterSlaveEntry entry : getEntrySet()) { entry.shutdown(); }