Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug that CLIENT REPLY OFF|SKIP cannot receive push notifications #11875

Merged
merged 12 commits into from
Mar 12, 2023
3 changes: 3 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -807,9 +807,12 @@ NULL
addReplyError(c,"RESP2 is not supported by this command");
return;
}
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
addReplyPushLen(c,2);
addReplyBulkCString(c,"server-cpu-usage");
addReplyLongLong(c,42);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
/* Push replies are not synchronous replies, so we emit also a
* normal reply in order for blocking clients just discarding the
* push reply, to actually consume the reply and continue. */
Expand Down
7 changes: 5 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,10 @@ int prepareClientToWrite(client *c) {
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
/* CLIENT REPLY OFF / SKIP handling: don't send replies.
* CLIENT_PUSHING handling: disables the reply silencing flags. */
if ((c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) &&
!(c->flags & CLIENT_PUSHING)) return C_ERR;

/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
* is set. */
Expand Down Expand Up @@ -963,6 +965,7 @@ void addReplyAttributeLen(client *c, long length) {

void addReplyPushLen(client *c, long length) {
serverAssert(c->resp >= 3);
serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
addReplyAggregateLen(c,length,'>');
}

Expand Down
18 changes: 18 additions & 0 deletions src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,24 @@ pubsubtype pubSubShardType = {
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,message_bulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
Expand All @@ -126,24 +131,30 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,*type.subscribeMsg);
addReplyBulk(c,channel);
addReplyLongLong(c,type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub unsubscription notification to the client.
* Channel can be NULL: this is useful when the client sends a mass
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
Expand All @@ -154,24 +165,30 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
else
addReplyNull(c);
addReplyLongLong(c,type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub pattern subscription notification to the client. */
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub pattern unsubscription notification to the client.
* Pattern can be NULL: this is useful when the client sends a mass
* punsubscribe command but there are no pattern to unsubscribe from: we
* still send a notification. */
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
Expand All @@ -182,6 +199,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/*-----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_ALLOW_OOM (1ULL<<44) /* Client used by RM_Call is allowed to fully execute
scripts even when in OOM */
#define CLIENT_NO_TOUCH (1ULL<<45) /* This client will not touch LFU/LRU stats. */
#define CLIENT_PUSHING (1ULL<<46) /* This client is pushing notifications. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
9 changes: 9 additions & 0 deletions src/tracking.c
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ void trackingRememberKeys(client *tracking, client *executing) {
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;

int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
Expand All @@ -279,10 +282,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
c = redir;
using_redirection = 1;
old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
}

/* Only send such info for clients in RESP version 3 or more. However
Expand All @@ -301,6 +308,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}

Expand All @@ -312,6 +320,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,keyname,keylen);
}
updateClientMemUsageAndBucket(c);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* This function is called when a key is modified in Redis and in the case
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/pubsub.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,39 @@ start_server {tags {"pubsub network"}} {
$rd1 close
}

foreach type {OFF SKIP} {
test "PubSub messages with CLIENT REPLY $type" {
set rd [redis_deferring_client]
$rd hello 3
$rd read ;# Discard the hello reply

# Test that the subscribe/psubscribe notification is ok
$rd client reply $type
assert_equal {1} [subscribe $rd channel]
assert_equal {2} [psubscribe $rd ch*]
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved

# Test that the publish notification is ok
$rd client reply $type
assert_equal 2 [r publish channel hello]
assert_equal {message channel hello} [$rd read]
assert_equal {pmessage ch* channel hello} [$rd read]

# Test that the unsubscribe/punsubscribe notification is ok
$rd client reply $type
assert_equal {1} [unsubscribe $rd channel]
assert_equal {0} [punsubscribe $rd ch*]

if {$type == "SKIP"} {
$rd client reply $type
$rd ping pong1
$rd ping pong2
assert_equal {pong2} [$rd read]
}
oranagra marked this conversation as resolved.
Show resolved Hide resolved

$rd close
}
}

test "PUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0 0 0} [punsubscribe $rd1 {foo.* bar.* quux.*}]
Expand Down Expand Up @@ -206,6 +239,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyspace notifications" {
r config set notify-keyspace-events KA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY OFF ;# Test the event notification is ok
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]
Expand All @@ -215,6 +249,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyevent notifications" {
r config set notify-keyspace-events EA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY SKIP ;# Test the event notification is ok
enjoy-binbin marked this conversation as resolved.
Show resolved Hide resolved
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyevent@${db}__:set foo" [$rd1 read]
Expand All @@ -224,6 +259,8 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we can receive both kind of events" {
r config set notify-keyspace-events KEA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY ON ;# Test the event notification is ok
assert_equal {OK} [$rd1 read]
oranagra marked this conversation as resolved.
Show resolved Hide resolved
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]
Expand Down
39 changes: 36 additions & 3 deletions tests/unit/pubsubshard.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd2 close
}

test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
test "SPUBLISH/SSUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {2} [ssubscribe $rd1 {chan2}]
Expand All @@ -54,7 +54,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}

test "SUBSCRIBE to one channel more than once" {
test "SSUBSCRIBE to one channel more than once" {
set rd1 [redis_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
Expand All @@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}

test "UNSUBSCRIBE from non-subscribed channels" {
test "SUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0} [sunsubscribe $rd1 {foo}]
assert_equal {0} [sunsubscribe $rd1 {bar}]
Expand Down Expand Up @@ -105,6 +105,39 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal "chan1 1" [r pubsub numsub chan1]
assert_equal "chan1" [r pubsub shardchannels]
assert_equal "chan1" [r pubsub channels]

$rd1 close
$rd2 close
}

foreach type {OFF SKIP} {
test "PubSubShard with CLIENT REPLY $type" {
set rd [redis_deferring_client]
$rd hello 3
$rd read ;# Discard the hello reply

# Test that the ssubscribe notification is ok
$rd client reply $type
assert_equal {1} [ssubscribe $rd channel]

# Test that the spublish notification is ok
$rd client reply $type
assert_equal 1 [r spublish channel hello]
assert_equal {smessage channel hello} [$rd read]

# Test that sunsubscribe notification is ok
$rd client reply $type
assert_equal {0} [sunsubscribe $rd channel]

if {$type == "SKIP"} {
$rd client reply $type
$rd ping pong1
$rd ping pong2
assert_equal {pong2} [$rd read]
}

$rd close
}
}
}

Expand Down
75 changes: 75 additions & 0 deletions tests/unit/tracking.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,81 @@ start_server {tags {"tracking network"}} {
r debug pause-cron 0
} {OK} {needs:debug}

foreach resp {3 2} {
test "RESP$resp based basic invalidation with client reply off" {
clean_all

$rd hello $resp
$rd read
$rd client tracking on
$rd read

$rd_sg set foo bar
$rd get foo
$rd read

$rd client reply off

$rd_sg set foo bar2

if {$resp == 3} {
assert_equal {invalidate foo} [$rd read]
}
oranagra marked this conversation as resolved.
Show resolved Hide resolved
}
}

test {RESP3 based basic redirect invalidation with client reply off} {
clean_all

set rd_redir [redis_deferring_client]
$rd_redir hello 3
$rd_redir read

$rd_redir subscribe __redis__:invalidate
$rd_redir read
oranagra marked this conversation as resolved.
Show resolved Hide resolved

$rd_redir client id
set rd_redir_id [$rd_redir read]

$rd client tracking on redirect $rd_redir_id
$rd read

$rd_sg set foo bar
$rd get foo
$rd read

$rd_redir client reply off

$rd_sg set foo bar2
assert_equal {invalidate foo} [$rd_redir read]

$rd_redir close
}

test {RESP3 based basic tracking-redir-broken with client reply off} {
clean_all

$rd hello 3
$rd read
$rd client tracking on redirect $redir_id
$rd read

$rd_sg set foo bar
$rd get foo
$rd read

$rd client reply off

$rd_redirection quit
$rd_redirection read

$rd_sg set foo bar2

set res [lsearch -exact [$rd read] "tracking-redir-broken"]
assert_morethan_equal $res 0
}

$rd_redirection close
$rd_sg close
$rd close
}