Skip to content

Commit

Permalink
Add synchronization to Partitions #333
Browse files Browse the repository at this point in the history
All non-threadsafe access to Partitions is now synchronized. Functional blocks that change the state of Partitions are grouped into atomar blocks that prevent reads to stale data and reads of partially updated data.
  • Loading branch information
mp911de committed Aug 12, 2016
1 parent cf30acc commit 713c97d
Showing 1 changed file with 93 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;

import com.lambdaworks.redis.cluster.SlotHash;
import com.lambdaworks.redis.internal.LettuceAssert;

/**
* Cluster topology view. An instance of {@link Partitions} provides access to the partitions of a Redis Cluster. A partition is
Expand Down Expand Up @@ -33,9 +34,12 @@
*/
public class Partitions implements Collection<RedisClusterNode> {

private List<RedisClusterNode> partitions = new ArrayList<>();
private final List<RedisClusterNode> partitions = new ArrayList<>();
private final static RedisClusterNode[] EMPTY = new RedisClusterNode[SlotHash.SLOT_COUNT];
private final static RedisClusterNode[] NO_NODES = new RedisClusterNode[0];

private volatile RedisClusterNode slotCache[] = EMPTY;
private volatile RedisClusterNode nodes[] = NO_NODES;

/**
* Retrieve a {@link RedisClusterNode} by its slot number. This method does not distinguish between masters and slaves.
Expand All @@ -54,7 +58,9 @@ public RedisClusterNode getPartitionBySlot(int slot) {
* @return RedisClusterNode or {@literal null}
*/
public RedisClusterNode getPartitionByNodeId(String nodeId) {
for (RedisClusterNode partition : partitions) {

RedisClusterNode nodes[] = this.nodes;
for (RedisClusterNode partition : nodes) {
if (partition.getNodeId().equals(nodeId)) {
return partition;
}
Expand All @@ -65,20 +71,29 @@ public RedisClusterNode getPartitionByNodeId(String nodeId) {
/**
* Update the partition cache. Updates are necessary after the partition details have changed.
*/
public synchronized void updateCache() {
public void updateCache() {

if(partitions.isEmpty()) {
this.slotCache = EMPTY;
return;
}
synchronized (partitions) {
if (partitions.isEmpty()) {
this.slotCache = EMPTY;
return;
}

RedisClusterNode[] slotCache = new RedisClusterNode[SlotHash.SLOT_COUNT];
RedisClusterNode[] nodes = new RedisClusterNode[partitions.size()];

int i = 0;
for (RedisClusterNode partition : partitions) {

RedisClusterNode[] slotCache = new RedisClusterNode[SlotHash.SLOT_COUNT];
for (RedisClusterNode partition : partitions) {
for (Integer integer : partition.getSlots()) {
slotCache[integer.intValue()] = partition;
nodes[i++] = partition;
for (Integer integer : partition.getSlots()) {
slotCache[integer.intValue()] = partition;
}
}

this.slotCache = slotCache;
this.nodes = nodes;
}
this.slotCache = slotCache;
}

@Override
Expand All @@ -91,16 +106,13 @@ public List<RedisClusterNode> getPartitions() {
}

public void addPartition(RedisClusterNode partition) {
slotCache = EMPTY;
partitions.add(partition);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
sb.append(" ").append(partitions);
return sb.toString();
LettuceAssert.notNull(partition, "Partition must not be null");

synchronized (this) {
slotCache = EMPTY;
partitions.add(partition);
}
}

@Override
Expand All @@ -109,7 +121,7 @@ public int size() {
}

public RedisClusterNode getPartition(int index) {
return getPartitions().get(index);
return partitions.get(index);
}

/**
Expand All @@ -118,9 +130,12 @@ public RedisClusterNode getPartition(int index) {
* @param partitions list of new partitions
*/
public void reload(List<RedisClusterNode> partitions) {
this.partitions.clear();
this.partitions.addAll(partitions);
updateCache();

synchronized (partitions) {
this.partitions.clear();
this.partitions.addAll(partitions);
updateCache();
}
}

@Override
Expand All @@ -135,57 +150,91 @@ public boolean contains(Object o) {

@Override
public boolean addAll(Collection<? extends RedisClusterNode> c) {
boolean b = partitions.addAll(c);
updateCache();
return b;

synchronized (partitions) {
boolean b = partitions.addAll(c);
updateCache();
return b;
}
}

@Override
public boolean removeAll(Collection<?> c) {
boolean b = getPartitions().removeAll(c);
updateCache();
return b;

synchronized (partitions) {
boolean b = getPartitions().removeAll(c);
updateCache();
return b;
}
}

@Override
public boolean retainAll(Collection<?> c) {
boolean b = getPartitions().retainAll(c);
updateCache();
return b;

synchronized (partitions) {
boolean b = getPartitions().retainAll(c);
updateCache();
return b;
}
}

@Override
public void clear() {
getPartitions().clear();
updateCache();

synchronized (partitions) {
getPartitions().clear();
updateCache();
}
}

@Override
public Object[] toArray() {
return getPartitions().toArray();

synchronized (partitions) {
return getPartitions().toArray();
}
}

@Override
public <T> T[] toArray(T[] a) {
return getPartitions().toArray(a);

synchronized (partitions) {
return getPartitions().toArray(a);
}
}

@Override
public boolean add(RedisClusterNode redisClusterNode) {
boolean add = getPartitions().add(redisClusterNode);
updateCache();
return add;

synchronized (partitions) {
LettuceAssert.notNull(redisClusterNode, "RedisClusterNode must not be null");

boolean add = getPartitions().add(redisClusterNode);
updateCache();
return add;
}
}

@Override
public boolean remove(Object o) {
boolean remove = getPartitions().remove(o);
updateCache();
return remove;

synchronized (partitions) {
boolean remove = getPartitions().remove(o);
updateCache();
return remove;
}
}

@Override
public boolean containsAll(Collection<?> c) {
return getPartitions().containsAll(c);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
sb.append(" ").append(partitions);
return sb.toString();
}
}

0 comments on commit 713c97d

Please sign in to comment.