Skip to content

Commit

Permalink
MasterSlaveConnectionManager#getEntry(java.net.InetSocketAddress) opt…
Browse files Browse the repository at this point in the history
…imization. #1081
  • Loading branch information
Nikita committed Oct 2, 2017
1 parent 0be813d commit 063dcab
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 47 deletions.
16 changes: 7 additions & 9 deletions redisson/src/main/java/org/redisson/RedissonKeys.java
Expand Up @@ -23,7 +23,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -207,7 +206,7 @@ public RFuture<Long> deleteByPatternAsync(final String pattern) {
final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise();
final AtomicReference<Throwable> failed = new AtomicReference<Throwable>(); final AtomicReference<Throwable> failed = new AtomicReference<Throwable>();
final AtomicLong count = new AtomicLong(); final AtomicLong count = new AtomicLong();
Set<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet(); Collection<MasterSlaveEntry> entries = commandExecutor.getConnectionManager().getEntrySet();
final AtomicLong executed = new AtomicLong(entries.size()); final AtomicLong executed = new AtomicLong(entries.size());
final FutureListener<Long> listener = new FutureListener<Long>() { final FutureListener<Long> listener = new FutureListener<Long>() {
@Override @Override
Expand Down Expand Up @@ -301,14 +300,13 @@ private RFuture<Long> executeAsync(RedisStrictCommand<Long> command, String ...
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>(); Map<MasterSlaveEntry, List<String>> range2key = new HashMap<MasterSlaveEntry, List<String>>();
for (String key : keys) { for (String key : keys) {
int slot = commandExecutor.getConnectionManager().calcSlot(key); int slot = commandExecutor.getConnectionManager().calcSlot(key);
for (MasterSlaveEntry entry : commandExecutor.getConnectionManager().getEntrySet()) { MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(slot);
List<String> list = range2key.get(entry); List<String> list = range2key.get(entry);
if (list == null) { if (list == null) {
list = new ArrayList<String>(); list = new ArrayList<String>();
range2key.put(entry, list); range2key.put(entry, list);
}
list.add(key);
} }
list.add(key);
} }


final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise(); final RPromise<Long> result = commandExecutor.getConnectionManager().newPromise();
Expand Down
Expand Up @@ -568,8 +568,8 @@ private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartit
} }


for (Integer slot : removedSlots) { for (Integer slot : removedSlots) {
MasterSlaveEntry entry = removeMaster(slot); MasterSlaveEntry entry = getEntry(slot);
entry.removeSlotRange(slot); removeMaster(slot);
if (entry.getSlotRanges().isEmpty()) { if (entry.getSlotRanges().isEmpty()) {
entry.shutdownMasterAsync(); entry.shutdownMasterAsync();
log.info("{} master and slaves for it removed", entry.getClient().getAddr()); log.info("{} master and slaves for it removed", entry.getClient().getAddr());
Expand All @@ -584,12 +584,11 @@ private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartit
} }
for (final Integer slot : addedSlots) { for (final Integer slot : addedSlots) {
ClusterPartition partition = find(newPartitions, slot); ClusterPartition partition = find(newPartitions, slot);
for (MasterSlaveEntry entry : getEntrySet()) { MasterSlaveEntry entry = getEntry(partition.getMasterAddr());
if (entry.getClient().getAddr().equals(partition.getMasterAddr())) { if (entry != null && entry.getClient().getAddr().equals(partition.getMasterAddr())) {
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, partition); lastPartitions.put(slot, partition);
break; break;
}
} }
} }
} }
Expand All @@ -611,7 +610,6 @@ private void checkSlotsMigration(Collection<ClusterPartition> newPartitions, Str
MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr()); MasterSlaveEntry entry = getEntry(currentPartition.getMasterAddr());


for (Integer slot : addedSlots) { for (Integer slot : addedSlots) {
entry.addSlotRange(slot);
addEntry(slot, entry); addEntry(slot, entry);
lastPartitions.put(slot, currentPartition); lastPartitions.put(slot, currentPartition);
} }
Expand All @@ -623,7 +621,6 @@ private void checkSlotsMigration(Collection<ClusterPartition> newPartitions, Str
removedSlots.removeAll(newPartition.getSlots()); removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) { for (Integer removeSlot : removedSlots) {
if (lastPartitions.remove(removeSlot, currentPartition)) { if (lastPartitions.remove(removeSlot, currentPartition)) {
entry.removeSlotRange(removeSlot);
removeMaster(removeSlot); removeMaster(removeSlot);
} }
} }
Expand Down
Expand Up @@ -16,11 +16,13 @@
package org.redisson.command; package org.redisson.command;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.AbstractMap;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -71,9 +73,6 @@
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import java.util.AbstractMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;


/** /**
* *
Expand Down Expand Up @@ -198,7 +197,7 @@ public <T, R> RFuture<R> readAsync(InetSocketAddress client, String key, Codec c
@Override @Override
public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) { public <T, R> RFuture<Collection<R>> readAllAsync(RedisCommand<T> command, Object... params) {
final RPromise<Collection<R>> mainPromise = connectionManager.newPromise(); final RPromise<Collection<R>> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final List<R> results = new ArrayList<R>(); final List<R> results = new ArrayList<R>();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<R> listener = new FutureListener<R>() { FutureListener<R> listener = new FutureListener<R>() {
Expand Down Expand Up @@ -288,7 +287,7 @@ public <R, T> RFuture<R> readAllAsync(RedisCommand<T> command, SlotCallback<T, R


private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) { private <T, R> RFuture<R> allAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> nodes = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> nodes = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(nodes.size()); final AtomicInteger counter = new AtomicInteger(nodes.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {
@Override @Override
Expand Down Expand Up @@ -413,7 +412,7 @@ public <T, R> RFuture<R> evalWriteAllAsync(RedisCommand<T> command, SlotCallback


public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) { public <T, R> RFuture<R> evalAllAsync(boolean readOnlyMode, RedisCommand<T> command, final SlotCallback<T, R> callback, String script, List<Object> keys, Object... params) {
final RPromise<R> mainPromise = connectionManager.newPromise(); final RPromise<R> mainPromise = connectionManager.newPromise();
final Set<MasterSlaveEntry> entries = connectionManager.getEntrySet(); final Collection<MasterSlaveEntry> entries = connectionManager.getEntrySet();
final AtomicInteger counter = new AtomicInteger(entries.size()); final AtomicInteger counter = new AtomicInteger(entries.size());
FutureListener<T> listener = new FutureListener<T>() { FutureListener<T> listener = new FutureListener<T>() {


Expand Down
Expand Up @@ -18,7 +18,6 @@
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


Expand Down Expand Up @@ -85,7 +84,7 @@ public interface ConnectionManager {


Codec getCodec(); Codec getCodec();


Set<MasterSlaveEntry> getEntrySet(); Collection<MasterSlaveEntry> getEntrySet();


MasterSlaveEntry getEntry(int slot); MasterSlaveEntry getEntry(int slot);


Expand Down
Expand Up @@ -24,9 +24,7 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue; import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -130,7 +128,8 @@ public boolean cancel() {


protected MasterSlaveServersConfig config; protected MasterSlaveServersConfig config;


private final Map<Integer, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap(); private final Map<Integer, MasterSlaveEntry> slot2entry = PlatformDependent.newConcurrentHashMap();
private final Map<InetSocketAddress, MasterSlaveEntry> addr2entry = PlatformDependent.newConcurrentHashMap();


private final RPromise<Boolean> shutdownPromise; private final RPromise<Boolean> shutdownPromise;


Expand Down Expand Up @@ -234,8 +233,9 @@ public Codec getCodec() {
return codec; return codec;
} }


public Set<MasterSlaveEntry> getEntrySet() { @Override
return new HashSet<MasterSlaveEntry>(entries.values()); public Collection<MasterSlaveEntry> getEntrySet() {
return addr2entry.values();
} }


protected void initTimer(MasterSlaveServersConfig config) { protected void initTimer(MasterSlaveServersConfig config) {
Expand Down Expand Up @@ -672,29 +672,34 @@ public boolean onStatus(PubSubType type, String channel) {


@Override @Override
public MasterSlaveEntry getEntry(InetSocketAddress addr) { public MasterSlaveEntry getEntry(InetSocketAddress addr) {
// TODO optimize return addr2entry.get(addr);
for (Entry<Integer, MasterSlaveEntry> entry : entries.entrySet()) {
if (entry.getValue().getClient().getAddr().equals(addr)) {
return entry.getValue();
}
}
return null;
} }


@Override
public MasterSlaveEntry getEntry(int slot) { public MasterSlaveEntry getEntry(int slot) {
return entries.get(slot); return slot2entry.get(slot);
} }


protected void changeMaster(int slot, URI address) { protected final void changeMaster(int slot, URI address) {
getEntry(slot).changeMaster(address); MasterSlaveEntry entry = getEntry(slot);
addr2entry.remove(entry.getClient().getAddr());
entry.changeMaster(address);
addr2entry.put(entry.getClient().getAddr(), entry);
} }


protected void addEntry(Integer slot, MasterSlaveEntry entry) { protected final void addEntry(Integer slot, MasterSlaveEntry entry) {
entries.put(slot, entry); slot2entry.put(slot, entry);
entry.addSlotRange(slot);
addr2entry.put(entry.getClient().getAddr(), entry);
} }


protected MasterSlaveEntry removeMaster(Integer slot) { protected MasterSlaveEntry removeMaster(Integer slot) {
return entries.remove(slot); MasterSlaveEntry entry = slot2entry.remove(slot);
entry.removeSlotRange(slot);
if (entry.getSlotRanges().isEmpty()) {
addr2entry.remove(entry.getClient().getAddr());
}
return entry;
} }


@Override @Override
Expand Down Expand Up @@ -812,7 +817,7 @@ public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
shutdownPromise.trySuccess(true); shutdownPromise.trySuccess(true);
shutdownLatch.awaitUninterruptibly(); shutdownLatch.awaitUninterruptibly();


for (MasterSlaveEntry entry : entries.values()) { for (MasterSlaveEntry entry : getEntrySet()) {
entry.shutdown(); entry.shutdown();
} }


Expand Down

0 comments on commit 063dcab

Please sign in to comment.