Skip to content

Commit

Permalink
Add support for topology consensus #355
Browse files Browse the repository at this point in the history
Previously, cluster topology refreshing could get stuck on a node that was previously discovered but got removed from the cluster. This was possible because multiple views were obtained and any arbitrary topology view was chosen.

Lettuce now implements two consensus algorithms: Healthy Majority and Known Majority. Healthy Majority is applied on the very first topology retrieval, Known Majority for all subsequent topology refreshes.

Healthy Majority votes for topology views containing the most nodes with healthy flags (without FAIL/PFAIL/NOADDR flags) to use a most healthy view. Known Majority selects topology views that contain nodes that were previously known. This consensus works for adding and removing nodes one-by-one or even multiple nodes. In case a cluster is split into even partitions the client can still get stuck on either side, but that issue can be solved by disabling dynamic refresh sources and specifying stable cluster seed nodes.
  • Loading branch information
mp911de committed Sep 16, 2016
1 parent 98dd531 commit 5126790
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.lambdaworks.redis.cluster;

import java.util.Map;

import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;

/**
* Consensus API to decide on the {@link com.lambdaworks.redis.cluster.models.partitions.Partitions topology view} to be used by
* {@link RedisClusterClient}.
* <p>
* {@link PartitionsConsensus} takes the current {@link Partitions} and a {@link java.util.Map} of newly retrieved
* {@link Partitions} to determine a view that shall be used. Implementing classes may reuse {@link Partitions} from input
* arguments or construct a new {@link Partitions} object.
*
* @author Mark Paluch
* @since 4.2
* @see com.lambdaworks.redis.cluster.models.partitions.Partitions
* @see RedisClusterClient
*/
abstract class PartitionsConsensus {

/**
* Consensus algorithm to select a partition containing the most previously known nodes.
*/
public static final PartitionsConsensus KNOWN_MAJORITY = new PartitionsConsensusImpl.KnownMajority();

/**
* Consensus algorithm to select a topology view containing the most active nodes.
*/
public static final PartitionsConsensus HEALTHY_MAJORITY = new PartitionsConsensusImpl.HealthyMajority();

/**
* Determine the {@link Partitions} to be used by {@link RedisClusterClient}.
*
* @param current the currently used topology view, must not be {@literal null}.
* @param topologyViews the newly retrieved views, must not be {@literal null}.
* @return the resulting {@link Partitions} to be used by {@link RedisClusterClient}.
*/
abstract Partitions getPartitions(Partitions current, Map<RedisURI, Partitions> topologyViews);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.lambdaworks.redis.cluster;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* Implementations for {@link PartitionsConsensus}.
*
* @author Mark Paluch
* @since 4.2
*/
class PartitionsConsensusImpl {

/**
* Votes for {@link Partitions} that contains the most known (previously existing) nodes.
*/
static final class KnownMajority extends PartitionsConsensus {

@Override
Partitions getPartitions(Partitions current, Map<RedisURI, Partitions> topologyViews) {

if (topologyViews.isEmpty()) {
return current;
}

List<VotedPartitions> votedList = new ArrayList<>();

for (Partitions partitions : topologyViews.values()) {

int knownNodes = 0;
for (RedisClusterNode knownNode : current) {

if (partitions.getPartitionByNodeId(knownNode.getNodeId()) != null) {
knownNodes++;
}
}

votedList.add(new VotedPartitions(knownNodes, partitions));
}

Collections.shuffle(votedList);
Collections.sort(votedList, (o1, o2) -> Integer.compare(o2.votes, o1.votes));

return votedList.get(0).partitions;
}
}

/**
* Votes for {@link Partitions} that contains the most active (in total) nodes.
*/
static final class HealthyMajority extends PartitionsConsensus {

@Override
Partitions getPartitions(Partitions current, Map<RedisURI, Partitions> topologyViews) {

if (topologyViews.isEmpty()) {
return current;
}

List<VotedPartitions> votedList = new ArrayList<>();

for (Partitions partitions : topologyViews.values()) {

int votes = 0;

for (RedisClusterNode node : partitions) {

if (node.is(RedisClusterNode.NodeFlag.FAIL) || node.is(RedisClusterNode.NodeFlag.EVENTUAL_FAIL)
|| node.is(RedisClusterNode.NodeFlag.NOADDR)) {
continue;
}

votes++;

}

votedList.add(new VotedPartitions(votes, partitions));
}

Collections.shuffle(votedList);
Collections.sort(votedList, (o1, o2) -> Integer.compare(o2.votes, o1.votes));

return votedList.get(0).partitions;
}
}

static final class VotedPartitions {

final int votes;
final Partitions partitions;

public VotedPartitions(int votes, Partitions partitions) {
this.votes = votes;
this.partitions = partitions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ protected Partitions loadPartitions() {
throw new RedisException("Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource);
}

Partitions loadedPartitions = partitions.values().iterator().next();
Partitions loadedPartitions = determinePartitions(this.partitions, partitions);
RedisURI viewedBy = refresh.getViewedBy(partitions, loadedPartitions);

for (RedisClusterNode partition : loadedPartitions) {
Expand All @@ -760,6 +760,22 @@ protected Partitions loadPartitions() {
return loadedPartitions;
}

/**
* Determines a {@link Partitions topology view} based on the current and the obtain topology views.
*
* @param current the current topology view. May be {@literal null} if {@link RedisClusterClient} has no topology view yet.
* @param topologyViews the obtain topology views
* @return the {@link Partitions topology view} to use.
*/
protected Partitions determinePartitions(Partitions current, Map<RedisURI, Partitions> topologyViews) {

if (current == null) {
return PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, topologyViews);
}

return PartitionsConsensus.KNOWN_MAJORITY.getPartitions(current, topologyViews);
}

private void activateTopologyRefreshIfNeeded() {

if (getOptions() instanceof ClusterClientOptions) {
Expand All @@ -771,9 +787,8 @@ private void activateTopologyRefreshIfNeeded() {
}

if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {
ScheduledFuture<?> scheduledFuture = genericWorkerPool
.scheduleAtFixedRate(clusterTopologyRefreshScheduler, options.getRefreshPeriod(),
options.getRefreshPeriod(), options.getRefreshPeriodUnit());
ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,
options.getRefreshPeriod(), options.getRefreshPeriod(), options.getRefreshPeriodUnit());
clusterTopologyRefreshFuture.set(scheduledFuture);
}
}
Expand Down Expand Up @@ -843,7 +858,7 @@ public ClientResources getResources() {
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit timeUnit) {

if(clusterTopologyRefreshActivated.compareAndSet(true, false)){
if (clusterTopologyRefreshActivated.compareAndSet(true, false)) {

ScheduledFuture<?> scheduledFuture = clusterTopologyRefreshFuture.get();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.lambdaworks.redis.cluster;

import static com.lambdaworks.redis.cluster.PartitionsConsensusTestSupport.createMap;
import static com.lambdaworks.redis.cluster.PartitionsConsensusTestSupport.createNode;
import static com.lambdaworks.redis.cluster.PartitionsConsensusTestSupport.createPartitions;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import org.junit.Test;

import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;

/**
* @author Mark Paluch
*/
public class HealthyMajorityPartitionsConsensusTest {

private RedisClusterNode node1 = createNode(1);
private RedisClusterNode node2 = createNode(2);
private RedisClusterNode node3 = createNode(3);
private RedisClusterNode node4 = createNode(4);
private RedisClusterNode node5 = createNode(5);

@Test
public void sameSharedViewShouldDecideForHealthyNodes() throws Exception {

Partitions partitions1 = createPartitions(node1, node2, node3, node4, node5);
Partitions partitions2 = createPartitions(node1, node2, node3, node4, node5);
Partitions partitions3 = createPartitions(node1, node2, node3, node4, node5);

Map<RedisURI, Partitions> map = createMap(partitions1, partitions2, partitions3);

Partitions result = PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, map);

assertThat(Arrays.asList(partitions1, partitions2, partitions3)).contains(result);
}

@Test
public void unhealthyNodeViewShouldDecideForHealthyNodes() throws Exception {

Partitions partitions1 = createPartitions(node1, node2);
Partitions partitions2 = createPartitions(node2, node3, node4, node5);
Partitions partitions3 = createPartitions(node2, node3, node4, node5);

Map<RedisURI, Partitions> map = createMap(partitions1, partitions2, partitions3);

node2.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node3.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node4.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node5.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));

Partitions result = PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, map);

assertThat(result).isSameAs(partitions1);
}

@Test
public void splitNodeViewShouldDecideForHealthyNodes() throws Exception {

Partitions partitions1 = createPartitions(node1, node2, node3);
Partitions partitions2 = createPartitions();
Partitions partitions3 = createPartitions(node3, node4, node5);

Map<RedisURI, Partitions> map = createMap(partitions1, partitions2, partitions3);

node1.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node2.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node3.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));

Partitions result = PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, map);

assertThat(result).isSameAs(partitions3);
}

@Test
public void splitUnhealthyNodeViewShouldDecideForHealthyNodes() throws Exception {

Partitions partitions1 = createPartitions(node1, node2);
Partitions partitions2 = createPartitions(node2, node3);
Partitions partitions3 = createPartitions(node3, node4, node5);

Map<RedisURI, Partitions> map = createMap(partitions1, partitions2, partitions3);

node2.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node3.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));
node4.setFlags(Collections.singleton(RedisClusterNode.NodeFlag.FAIL));

Partitions result = PartitionsConsensus.HEALTHY_MAJORITY.getPartitions(null, map);

assertThat(Arrays.asList(partitions1, partitions3)).contains(result);
}
}
Loading

0 comments on commit 5126790

Please sign in to comment.