Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cluster meet reliable under link failures #461

Merged
merged 11 commits into from
Jun 17, 2024
35 changes: 28 additions & 7 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2845,7 +2845,16 @@ int clusterIsValidPacket(clusterLink *link) {
* received from the wrong sender ID). */
int clusterProcessPacket(clusterLink *link) {
/* Validate that the packet is well-formed */
if (!clusterIsValidPacket(link)) return 1;
if (!clusterIsValidPacket(link)) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) {
freeClusterLink(link);
serverLog(LL_WARNING, "Closing link for matching packet type %hu", type);
return 0;
}
return 1;
}

clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
Expand Down Expand Up @@ -2943,6 +2952,13 @@ int clusterProcessPacket(clusterLink *link) {
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG, "%s packet received: %.40s", clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");

if (sender && (sender->flags & CLUSTER_NODE_MEET)) {
/* Once we get a response for MEET from the sender, we can stop sending more MEET. */
srgsanky marked this conversation as resolved.
Show resolved Hide resolved
sender->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_NOTICE, "Successfully completed handshake with %.40s (%s)", sender->name,
sender->human_nodename);
Comment on lines +2959 to +2960
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR but I think it will be good if we could standardize the logging statement, at least in cluster.c and cluster_legacy.c (which by the way should be renamed as we don't intend to deprecate the current cluster implementation any time soon, even considering the V2 work). There has been discussion in the past about using a more "structural" logging mechanism as well but I now think "incremental perfection" is more practical. What I would like to see in the server log and int the cluster topology area is:

"who" received a message from "whom" about "which node"'s "what" properties have changed from "which value" to "which other value" at "when".

Ideally, an admin can just look at a single log statement to gather all the information.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved the discussion to #656

}
if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
Expand Down Expand Up @@ -3408,12 +3424,17 @@ void clusterLinkConnectHandler(connection *conn) {
* replaced by the clusterSendPing() call. */
node->ping_sent = old_ping_sent;
}
/* We can clear the flag after the first packet is sent.
* If we'll never receive a PONG, we'll never send new packets
* to this node. Instead after the PONG is received and we
* are no longer in meet/handshake status, we want to send
* normal PING packets. */
node->flags &= ~CLUSTER_NODE_MEET;
/* NOTE: Assume the current node is A and is asked to MEET another node B.
* Once A sends MEET to B, it cannot clear the MEET flag for B until it
* gets a response from B. If the MEET packet is not accepted by B due to
* link failure, A must continue sending MEET. If A doesn't continue sending
* MEET, A will know about B, but B will never add A. Every node always
* responds to PINGs from unknown nodes with a PONG, so A will know about B
* and continue sending PINGs. But B won't add A until it sees a MEET (or it
* gets to know about A from a trusted third node C). In this case, clearing
* the MEET flag here leads to asymmetry in the cluster membership. So, we
* clear the MEET flag in clusterProcessPacket.
*/
Comment on lines +3427 to +3437
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I would suggest pulling the conclusion up, which is the last sentence in the comment block below and then explaining the rationale.

Suggested change
/* NOTE: Assume the current node is A and is asked to MEET another node B.
* Once A sends MEET to B, it cannot clear the MEET flag for B until it
* gets a response from B. If the MEET packet is not accepted by B due to
* link failure, A must continue sending MEET. If A doesn't continue sending
* MEET, A will know about B, but B will never add A. Every node always
* responds to PINGs from unknown nodes with a PONG, so A will know about B
* and continue sending PINGs. But B won't add A until it sees a MEET (or it
* gets to know about A from a trusted third node C). In this case, clearing
* the MEET flag here leads to asymmetry in the cluster membership. So, we
* clear the MEET flag in clusterProcessPacket.
*/
/* NOTE: We clear the `CLUSTER_NODE_MEET` flag in clusterProcessPacket.
*
* Assume the current node is A and is asked to MEET another node B.
* Once A sends MEET to B, it cannot clear the MEET flag for B until it
* gets a response from B. If the MEET packet is not accepted by B due to
* link failure, A must continue sending MEET. If A doesn't continue sending
* MEET, A will know about B, but B will never add A. Every node always
* responds to PINGs from unknown nodes with a PONG, so A will know about B
* and continue sending PINGs. But B won't add A until it sees a MEET (or it
* gets to know about A from a trusted third node C). In this case, clearing
* the MEET flag here leads to asymmetry in the cluster membership. */


serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ void debugCommand(client *c) {
" Show low level info about `key` and associated value.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."
" This is effective only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type."

" When set to 1, the cluster link is closed after dropping a packet based on the filter."
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
Expand Down Expand Up @@ -593,6 +596,9 @@ void debugCommand(client *c) {
if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK) return;
server.cluster_drop_packet_filter = packet_type;
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
PingXie marked this conversation as resolved.
Show resolved Hide resolved
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "object") && c->argc == 3) {
dictEntry *de;
robj *val;
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,8 @@ struct valkeyServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX];
/* Scripting */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
Expand Down
83 changes: 83 additions & 0 deletions tests/unit/cluster/cluster-multiple-meets.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# make sure the test infra won't use SELECT
set old_singledb $::singledb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not strictly needed, right? I saw the servers are started with cluster-enabled

Copy link
Member

@madolson madolson Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed (for now at least) since normally we select a random db. We could key it off of the clustering tests though.

set ::singledb 1

tags {tls:skip external:skip cluster} {
set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generalize this into a helper function?

unit/cluster/base.tcl has a similar test https://github.com/valkey-io/valkey/blob/unstable/tests/unit/cluster/base.tcl#L12

for {set id 0} {$id < [llength $::servers]} {incr id} {
# Every node should be reachable.
wait_for_condition 1000 50 {
([catch {R $id ping} ping_reply] == 0) &&
($ping_reply eq {PONG})
} else {
catch {R $id ping} err
fail "Node #$id keeps replying '$err' to PING."
}
}
}

test "Before slots allocation, all nodes report cluster failure" {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_PONG 1
set CLUSTER_PACKET_TYPE_NONE -1
Comment on lines +25 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - these definitions are a bit too far from where they are referenced and also they are used only once. It feels like the idea is to treat them as constants? If so, implementing as a proc in some helper tcl files will help deliver that "read-only" impression and increase their usage.


test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
}

test "Allocate slots" {
cluster_allocate_slots 2 0;# primaries replicas
}

test "Multiple MEETs from Node 1 to Node 0 should work" {
# Make 1 drop the PONG responses to MEET
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_PONG
# It is important to close the connection on drop, otherwise a subsequent MEET won't be sent
R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R 1 CLUSTER MEET 127.0.0.1 [srv 0 port]

# Wait for at least a few MEETs to be sent so that we are sure that 1 is dropping the response to MEET.
wait_for_condition 1000 50 {
[CI 0 cluster_stats_messages_meet_received] > 1 &&
[CI 1 cluster_state] eq {fail} && [CI 0 cluster_state] eq {ok}
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# 0 will be connected to 1, but 1 won't see that 0 is connected
assert {[llength [get_cluster_nodes 1 connected]] == 1}
assert {[llength [get_cluster_nodes 0 connected]] == 2}

# Drop incoming and outgoing links from/to 1
R 0 DEBUG CLUSTERLINK KILL ALL [R 1 CLUSTER MYID]

# Wait for 0 to know about 1 again after 1 sends a MEET
wait_for_condition 1000 50 {
[llength [get_cluster_nodes 0 connected]] == 2
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# Undo packet drop
R 1 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE
R 1 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 0

# Both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
} ;# tags

set ::singledb $old_singledb

71 changes: 71 additions & 0 deletions tests/unit/cluster/cluster-reliable-meet.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# make sure the test infra won't use SELECT
set old_singledb $::singledb
set ::singledb 1

tags {tls:skip external:skip cluster} {
set base_conf [list cluster-enabled yes]
start_multiple_servers 2 [list overrides $base_conf] {
test "Cluster nodes are reachable" {
for {set id 0} {$id < [llength $::servers]} {incr id} {
# Every node should be reachable.
wait_for_condition 1000 50 {
([catch {R $id ping} ping_reply] == 0) &&
($ping_reply eq {PONG})
} else {
catch {R $id ping} err
fail "Node #$id keeps replying '$err' to PING."
}
}
}

test "Before slots allocation, all nodes report cluster failure" {
wait_for_cluster_state fail
}

set CLUSTER_PACKET_TYPE_MEET 2
set CLUSTER_PACKET_TYPE_NONE -1

test "Cluster nodes haven't met each other" {
assert {[llength [get_cluster_nodes 1]] == 1}
assert {[llength [get_cluster_nodes 0]] == 1}
}

test "Allocate slots" {
cluster_allocate_slots 2 0
}

test "MEET is reliable when target drops the initial MEETs" {
# Make 0 drop the initial MEET messages due to link failure
R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_MEET
R 0 DEBUG CLOSE-CLUSTER-LINK-ON-PACKET-DROP 1

R 1 CLUSTER MEET 127.0.0.1 [srv 0 port]

# Wait for at least a few MEETs to be sent so that we are sure that 0 is
# dropping them.
wait_for_condition 1000 50 {
[CI 0 cluster_stats_messages_meet_received] >= 3
} else {
fail "Cluster node 1 never sent multiple MEETs to 0"
}

# Make sure the nodes still don't know about each other
assert {[llength [get_cluster_nodes 1 connected]] == 1}
assert {[llength [get_cluster_nodes 0 connected]] == 1}

R 0 DEBUG DROP-CLUSTER-PACKET-FILTER $CLUSTER_PACKET_TYPE_NONE

# If the MEET is reliable, both a and b will turn to cluster state ok
wait_for_condition 1000 50 {
[CI 1 cluster_state] eq {ok} && [CI 0 cluster_state] eq {ok} &&
[CI 0 cluster_stats_messages_meet_received] >= 4 &&
[CI 1 cluster_stats_messages_meet_sent] == [CI 0 cluster_stats_messages_meet_received]
} else {
fail "1 cluster_state:[CI 1 cluster_state], 0 cluster_state: [CI 0 cluster_state]"
}
}
} ;# stop servers
} ;# tags

set ::singledb $old_singledb

Loading