diff --git a/src/cluster.c b/src/cluster.c index d8c05d383fa2..6674c3aa0d00 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4,6 +4,8 @@ * * Licensed under your choice of the Redis Source Available License 2.0 * (RSALv2) or the Server Side Public License v1 (SSPLv1). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. */ /* @@ -978,6 +980,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, existing_keys = 0; + int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ /* Allow any key to be set if a module disabled cluster redirections. */ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) @@ -1009,10 +1012,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in mc.cmd = cmd; } - int is_pubsubshard = cmd->proc == ssubscribeCommand || - cmd->proc == sunsubscribeCommand || - cmd->proc == spublishCommand; - /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ for (i = 0; i < ms->count; i++) { @@ -1025,6 +1024,13 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in margc = ms->commands[i].argc; margv = ms->commands[i].argv; + /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ + if (!pubsubshard_included && + doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) + { + pubsubshard_included = 1; + } + getKeysResult result = GETKEYS_RESULT_INIT; numkeys = getKeysFromCommand(mcmd,margv,margc,&result); keyindex = result.keys; @@ -1088,7 +1094,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * node until the migration completes with CLUSTER SETSLOT * NODE . */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; - if ((migrating_slot || importing_slot) && !is_pubsubshard) + if ((migrating_slot || importing_slot) && !pubsubshard_included) { if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; else existing_keys++; @@ -1105,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (!isClusterHealthy()) { - if (is_pubsubshard) { + if (pubsubshard_included) { if (!server.cluster_allow_pubsubshard_when_down) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; @@ -1168,7 +1174,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * is serving, we can reply without redirection. */ int is_write_command = (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && + if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == n) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index c33ec73473f1..ffe89862fbda 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -109,6 +109,7 @@ set ::all_tests { unit/cluster/links unit/cluster/cluster-response-tls unit/cluster/failure-marking + unit/cluster/sharded-pubsub } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl new file mode 100644 index 000000000000..66a1ca2fb493 --- /dev/null +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -0,0 +1,66 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Licensed under your choice of the Redis Source Available License 2.0 +# (RSALv2) or the Server Side Public License v1 (SSPLv1). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + +start_cluster 1 1 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica [Rn $replica1_id] + + test "Sharded pubsub publish behavior within multi/exec" { + foreach {node} {primary replica} { + set node [set $node] + $node MULTI + $node SPUBLISH ch1 "hello" + $node EXEC + } + } + + test "Sharded pubsub within multi/exec with cross slot operation" { + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary GET foo + catch {[$primary EXEC]} err + assert_match {CROSSSLOT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with read operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary GET foo + $primary EXEC + } {0 {}} + + test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica GET foo]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with write operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary SET foo bar + $primary EXEC + } {0 OK} + + test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica SET foo bar]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } +}