From 9ffc35c98e4629faa0faa4f17a9196da7b4ef9c7 Mon Sep 17 00:00:00 2001 From: "debing.sun" Date: Tue, 21 May 2024 09:25:13 +0800 Subject: [PATCH] Have consistent behavior of SPUBLISH within multi/exec like regular command (#13276) This PR is based on the commits from PR #12944. Allow SPUBLISH command within multi/exec on replica Behavior on unstable: ``` 127.0.0.1:6380> CLUSTER NODES 39ce8aa20f1f0d91f1a88d976ee1926dfefcdf1a 127.0.0.1:6380@16380 myself,slave 8b0feb120b68aac489d6a5af9c77dc40d71bc792 0 0 0 connected 8b0feb120b68aac489d6a5af9c77dc40d71bc792 127.0.0.1:6379@16379 master - 0 1705091681202 0 connected 0-16383 127.0.0.1:6380> SPUBLISH hello world (integer) 0 127.0.0.1:6380> MULTI OK 127.0.0.1:6380(TX)> SPUBLISH hello world QUEUED 127.0.0.1:6380(TX)> EXEC (error) MOVED 866 127.0.0.1:6379 ``` With this change: ``` 127.0.0.1:6380> SPUBLISH hello world (integer) 0 127.0.0.1:6380> MULTI OK 127.0.0.1:6380(TX)> SPUBLISH hello world QUEUED 127.0.0.1:6380(TX)> EXEC 1) (integer) 0 ``` --------- Co-authored-by: Harkrishn Patro Co-authored-by: oranagra --- src/cluster.c | 20 +++++--- tests/test_helper.tcl | 1 + tests/unit/cluster/sharded-pubsub.tcl | 66 +++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 7 deletions(-) create mode 100644 tests/unit/cluster/sharded-pubsub.tcl diff --git a/src/cluster.c b/src/cluster.c index d8c05d383fa2..6674c3aa0d00 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4,6 +4,8 @@ * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* @@ -978,6 +980,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; + int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ /* Allow any key to be set if a module disabled cluster redirections. */ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) @@ -1009,10 +1012,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in mc.cmd = cmd; } - int is_pubsubshard = cmd->proc == ssubscribeCommand || - cmd->proc == sunsubscribeCommand || - cmd->proc == spublishCommand; - /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ for (i = 0; i < ms->count; i++) { @@ -1025,6 +1024,13 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in margc = ms->commands[i].argc; margv = ms->commands[i].argv; + /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ + if (!pubsubshard_included && + doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) + { + pubsubshard_included = 1; + } + getKeysResult result = GETKEYS_RESULT_INIT; numkeys = getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; @@ -1088,7 +1094,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * node until the migration completes with CLUSTER SETSLOT * NODE . */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; - if ((migrating_slot || importing_slot) && !is_pubsubshard) + if ((migrating_slot || importing_slot) && !pubsubshard_included) { if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; else existing_keys++; @@ -1105,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (!isClusterHealthy()) { - if (is_pubsubshard) { + if (pubsubshard_included) { if (!server.cluster_allow_pubsubshard_when_down) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; @@ -1168,7 +1174,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * is serving, we can reply without redirection. */ int is_write_command = (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && + if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == n) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index c33ec73473f1..ffe89862fbda 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -109,6 +109,7 @@ set ::all_tests { unit/cluster/links unit/cluster/cluster-response-tls unit/cluster/failure-marking + unit/cluster/sharded-pubsub } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl new file mode 100644 index 000000000000..66a1ca2fb493 --- /dev/null +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -0,0 +1,66 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + +start_cluster 1 1 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica [Rn $replica1_id] + + test "Sharded pubsub publish behavior within multi/exec" { + foreach {node} {primary replica} { + set node [set $node] + $node MULTI + $node SPUBLISH ch1 "hello" + $node EXEC + } + } + + test "Sharded pubsub within multi/exec with cross slot operation" { + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary GET foo + catch {[$primary EXEC]} err + assert_match {CROSSSLOT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with read operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary GET foo + $primary EXEC + } {0 {}} + + test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica GET foo]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with write operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary SET foo bar + $primary EXEC + } {0 OK} + + test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica SET foo bar]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } +}