Skip to content

Commit

Permalink
Make cluster meet reliable under link failures
Browse files Browse the repository at this point in the history
When there is a link failure while an ongoing MEET request is sent the
sending node stops sending anymore MEET and starts sending PINGs. Since
every node responds to PINGs from unknown nodes with a PONG, the
receiving node never adds the sending node. But the sending node adds
the receiving node when it sees a PONG. This can lead to asymmetry in
cluster membership. This changes makes the sender keep sending MEET
until it sees a PONG, avoiding the asymmetry.
  • Loading branch information
srgsanky committed May 8, 2024
1 parent 4e944ce commit 49a884c
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 17 deletions.
32 changes: 24 additions & 8 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2975,7 +2975,13 @@ int clusterIsValidPacket(clusterLink *link) {
int clusterProcessPacket(clusterLink *link) {

/* Validate that the packet is well-formed */
if (!clusterIsValidPacket(link)) return 1;
if (!clusterIsValidPacket(link)) {
if (server.cluster_close_link_on_packet_drop) {
freeClusterLink(link);
return 0;
}
return 1;
}

clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint16_t type = ntohs(hdr->type);
Expand Down Expand Up @@ -3088,6 +3094,12 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG,"%s packet received: %.40s",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
// Once we get a response for MEET from the sender, we can stop sending more MEET
sender->flags &= ~CLUSTER_NODE_MEET;
}

if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
Expand Down Expand Up @@ -3613,13 +3625,17 @@ void clusterLinkConnectHandler(connection *conn) {
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;

/* NOTE: We cannot clear the MEET flag from the node until we get a response
* from the other node. If the MEET packet is not accepted by the other node
* due to link failure, we want to continue sending MEET. If we don't
* continue sending MEET, this node will know about the other node, but the
* other node will never add this node. Every node always responds to PINGs
* from unknown nodes with a PONG, so this node will know about the other
* node and continue sending PINGs. But the other node won't add this node
* until it sees a MEET (or it gets to know about this node from a trusted
* third node). In this case, clearing the MEET flag here leads to asymmetry
* in the cluster membership.
*/
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ void debugCommand(client *c) {
" Show low level info about `key` and associated value.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."
" When set to 1, the cluster link is closed after dropping a packet based on the filter."
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -603,6 +606,9 @@ NULL
return;
server.cluster_drop_packet_filter = packet_type;
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de;
robj *val;
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2027,6 +2027,8 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
int cluster_close_link_on_packet_drop; /* Debug config that goes along with cluster_drop_packet_filter.
When set, the link is closed on packet drop. */
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
int pre_command_oom_state; /* OOM before command (script?) was started */
Expand Down
54 changes: 54 additions & 0 deletions tests/cluster/tests/30-reliable-meet.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
set do_not_join_cluster_nodes 1

source "../tests/includes/init-tests.tcl"

# Create a cluster composed of the specified number of primaries.
proc create_primaries_only_cluster {primaries} {
cluster_allocate_slots $primaries
set ::cluster_master_nodes $primaries
}

set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1

set a 0
set b 1

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes $a]] == 1}
assert {[llength [get_cluster_nodes $b]] == 1}
}

test "Create a 2 nodes cluster with 2 shards" {
create_primaries_only_cluster 2
}

test "MEET is reliabile when target drops the initial MEETs" {
set b_port [get_instance_attrib valkey $b port]

# Make b drop the initial MEET messages due to link failure
R $b DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET
R $b DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R $a CLUSTER MEET 127.0.0.1 $b_port

wait_for_condition 1000 50 {
[CI $b cluster_stats_messages_meet_received] >= 3
} else {
fail "Cluster node $a never sent multiple MEETs to $b"
}

# Make sure the nodes still don't know about each other
assert {[llength [get_cluster_nodes $a connected]] == 1}
assert {[llength [get_cluster_nodes $b connected]] == 1}

R $b DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# If the MEET is reliable, both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI $a cluster_state] eq {ok} && [CI $b cluster_state] eq {ok}
} else {
fail "$a cluster_state:[CI $a cluster_state], $b cluster_state: [CI $b cluster_state]"
}
}

21 changes: 12 additions & 9 deletions tests/cluster/tests/includes/init-tests.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test "Cluster nodes hard reset" {
R $id config set repl-diskless-load disabled
R $id config set cluster-announce-hostname ""
R $id DEBUG DROP-CLUSTER-PACKET-FILTER -1
R $id DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0
R $id config rewrite
}
}
Expand Down Expand Up @@ -73,16 +74,18 @@ proc join_nodes_in_cluster {} {
return 1
}

test "Cluster Join and auto-discovery test" {
# Use multiple attempts since sometimes nodes timeout
# while attempting to connect.
for {set attempts 3} {$attempts > 0} {incr attempts -1} {
if {[join_nodes_in_cluster] == 1} {
break
if {![info exists do_not_join_cluster_nodes] || !$do_not_join_cluster_nodes} {
test "Cluster Join and auto-discovery test" {
# Use multiple attempts since sometimes nodes timeout
# while attempting to connect.
for {set attempts 3} {$attempts > 0} {incr attempts -1} {
if {[join_nodes_in_cluster] == 1} {
break
}
}
if {$attempts == 0} {
fail "Cluster failed to form full mesh"
}
}
if {$attempts == 0} {
fail "Cluster failed to form full mesh"
}
}

Expand Down

0 comments on commit 49a884c

Please sign in to comment.