Skip to content

Commit

Permalink
Improvement - reduced memory consumption by ClusterConnectionManager. #…
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed May 23, 2019
1 parent 18a6cea commit b777fed
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -399,7 +400,7 @@ private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
continue;
}

MasterSlaveEntry entry = getEntry(currentPart.getSlots().iterator().next());
MasterSlaveEntry entry = getEntry(currentPart.slots().nextSetBit(0));
// should be invoked first in order to remove stale failedSlaveAddresses
Set<URI> addedSlaves = addRemoveSlaves(entry, currentPart, newPart);
// Do some slaves have changed state from failed to alive?
Expand Down Expand Up @@ -464,7 +465,7 @@ private Set<URI> addRemoveSlaves(MasterSlaveEntry entry, ClusterPartition curren
private int slotsAmount(Collection<ClusterPartition> partitions) {
int result = 0;
for (ClusterPartition clusterPartition : partitions) {
result += clusterPartition.getSlots().size();
result += clusterPartition.getSlotsAmount();
}
return result;
}
Expand Down Expand Up @@ -558,7 +559,7 @@ private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartit
for (Integer slot : lastPartitions.keySet()) {
boolean found = false;
for (ClusterPartition clusterPartition : newPartitions) {
if (clusterPartition.getSlots().contains(slot)) {
if (clusterPartition.hasSlot(slot)) {
found = true;
break;
}
Expand All @@ -580,58 +581,58 @@ private void checkSlotsChange(ClusterServersConfig cfg, Collection<ClusterPartit
}
}

Set<Integer> addedSlots = new HashSet<Integer>();
BitSet addedSlots = new BitSet();
for (ClusterPartition clusterPartition : newPartitions) {
for (Integer slot : clusterPartition.getSlots()) {
if (!lastPartitions.containsKey(slot)) {
addedSlots.add(slot);
addedSlots.set(slot);
}
}
}
if (!addedSlots.isEmpty()) {
log.info("{} slots found to add", addedSlots.size());
}
for (Integer slot : addedSlots) {
for (Integer slot : (Iterable<Integer>) addedSlots.stream()::iterator) {
ClusterPartition partition = find(newPartitions, slot);

Set<Integer> oldSlots = new HashSet<Integer>(partition.getSlots());
oldSlots.removeAll(addedSlots);
BitSet oldSlots = partition.copySlots();
oldSlots.andNot(addedSlots);
if (oldSlots.isEmpty()) {
continue;
}

MasterSlaveEntry entry = getEntry(oldSlots.iterator().next());
MasterSlaveEntry entry = getEntry(oldSlots.nextSetBit(0));
if (entry != null) {
addEntry(slot, entry);
lastPartitions.put(slot, partition);
}
}
}

private void checkSlotsMigration(Collection<ClusterPartition> newPartitions) {
for (ClusterPartition currentPartition : getLastPartitions()) {
for (ClusterPartition newPartition : newPartitions) {
if (!currentPartition.getNodeId().equals(newPartition.getNodeId())) {
continue;
}

MasterSlaveEntry entry = getEntry(currentPartition.getSlots().iterator().next());
Set<Integer> addedSlots = new HashSet<Integer>(newPartition.getSlots());
addedSlots.removeAll(currentPartition.getSlots());
MasterSlaveEntry entry = getEntry(currentPartition.slots().nextSetBit(0));
BitSet addedSlots = newPartition.copySlots();
addedSlots.andNot(currentPartition.slots());
currentPartition.addSlots(addedSlots);


for (Integer slot : addedSlots) {
for (Integer slot : (Iterable<Integer>) addedSlots.stream()::iterator) {
addEntry(slot, entry);
lastPartitions.put(slot, currentPartition);
}
if (!addedSlots.isEmpty()) {
log.info("{} slots added to {}", addedSlots.size(), currentPartition.getMasterAddress());
}

Set<Integer> removedSlots = new HashSet<Integer>(currentPartition.getSlots());
removedSlots.removeAll(newPartition.getSlots());
for (Integer removeSlot : removedSlots) {
BitSet removedSlots = currentPartition.copySlots();
removedSlots.andNot(newPartition.slots());
for (Integer removeSlot : (Iterable<Integer>) removedSlots.stream()::iterator) {
if (lastPartitions.remove(removeSlot, currentPartition)) {
removeEntry(removeSlot);
}
Expand Down
34 changes: 26 additions & 8 deletions redisson/src/main/java/org/redisson/cluster/ClusterPartition.java
Expand Up @@ -16,6 +16,7 @@
package org.redisson.cluster;

import java.net.URI;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -37,7 +38,7 @@ public enum Type {MASTER, SLAVE}
private final Set<URI> slaveAddresses = new HashSet<URI>();
private final Set<URI> failedSlaves = new HashSet<URI>();

private final Set<Integer> slots = new HashSet<Integer>();
private final BitSet slots = new BitSet();
private final Set<ClusterSlotRange> slotRanges = new HashSet<ClusterSlotRange>();

private ClusterPartition parent;
Expand Down Expand Up @@ -74,36 +75,53 @@ public boolean isMasterFail() {
return masterFail;
}

public void addSlots(Set<Integer> slots) {
this.slots.addAll(slots);
public void addSlots(BitSet slots) {
this.slots.or(slots);
}

public void removeSlots(Set<Integer> slots) {
this.slots.removeAll(slots);
public void removeSlots(BitSet slots) {
this.slots.andNot(slots);
}

public void addSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.add(i);
slots.set(i);
}
}
slotRanges.addAll(ranges);
}
public void removeSlotRanges(Set<ClusterSlotRange> ranges) {
for (ClusterSlotRange clusterSlotRange : ranges) {
for (int i = clusterSlotRange.getStartSlot(); i < clusterSlotRange.getEndSlot() + 1; i++) {
slots.remove(i);
slots.clear(i);
}
}
slotRanges.removeAll(ranges);
}
public Set<ClusterSlotRange> getSlotRanges() {
return slotRanges;
}
public Set<Integer> getSlots() {

public Iterable<Integer> getSlots() {
return (Iterable<Integer>) slots.stream()::iterator;
}

public BitSet slots() {
return slots;
}

public BitSet copySlots() {
return (BitSet) slots.clone();
}

public boolean hasSlot(int slot) {
return slots.get(slot);
}

public int getSlotsAmount() {
return slots.cardinality();
}

public URI getMasterAddress() {
return masterAddress;
Expand Down

0 comments on commit b777fed

Please sign in to comment.