Skip to content

Commit

Permalink
Fix the bug that CLIENT REPLY OFF|SKIP cannot receive push notificati…
Browse files Browse the repository at this point in the history
…ons (redis#11875)

This bug seems to be there forever, CLIENT REPLY OFF|SKIP will
mark the client with CLIENT_REPLY_OFF or CLIENT_REPLY_SKIP flags.
With these flags, prepareClientToWrite called by addReply* will
return C_ERR directly. So the client can't receive the Pub/Sub
messages and any other push notifications, e.g client side tracking.

In this PR, we adding a CLIENT_PUSHING flag, disables the reply
silencing flags. When adding push replies, set the flag, after the reply,
clear the flag. Then add the flag check in prepareClientToWrite.

Fixes redis#11874

Note, the SUBSCRIBE command response is a bit awkward,
see redis/redis-doc#2327

Co-authored-by: Oran Agra <oran@redislabs.com>
(cherry picked from commit 416842e)
(cherry picked from commit f8ae7a4)
(cherry picked from commit 96814a32da61e5ed523864e00609a4aa6be065b3)
  • Loading branch information
enjoy-binbin authored and oranagra committed Apr 13, 2023
1 parent 2b85ac6 commit 6720926
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 21 deletions.
7 changes: 7 additions & 0 deletions src/debug.c
Expand Up @@ -681,9 +681,16 @@ NULL
* also have a normal reply type after the attribute. */
addReplyBulkCString(c,"Some real reply following the attribute");
} else if (!strcasecmp(name,"push")) {
if (c->resp < 3) {
addReplyError(c,"RESP2 is not supported by this command");
return;
}
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
addReplyPushLen(c,2);
addReplyBulkCString(c,"server-cpu-usage");
addReplyLongLong(c,42);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
/* Push replies are not synchronous replies, so we emit also a
* normal reply in order for blocking clients just discarding the
* push reply, to actually consume the reply and continue. */
Expand Down
7 changes: 5 additions & 2 deletions src/networking.c
Expand Up @@ -248,8 +248,10 @@ int prepareClientToWrite(client *c) {
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
/* CLIENT REPLY OFF / SKIP handling: don't send replies.
* CLIENT_PUSHING handling: disables the reply silencing flags. */
if ((c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) &&
!(c->flags & CLIENT_PUSHING)) return C_ERR;

/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
* is set. */
Expand Down Expand Up @@ -722,6 +724,7 @@ void addReplyAttributeLen(client *c, long length) {

void addReplyPushLen(client *c, long length) {
serverAssert(c->resp >= 3);
serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
addReplyAggregateLen(c,length,'>');
}

Expand Down
18 changes: 18 additions & 0 deletions src/pubsub.c
Expand Up @@ -41,19 +41,24 @@ int clientSubscriptionsCount(client *c);
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
Expand All @@ -62,24 +67,30 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub unsubscription notification to the client.
* Channel can be NULL: this is useful when the client sends a mass
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
Expand All @@ -90,24 +101,30 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel) {
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub pattern subscription notification to the client. */
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* Send the pubsub pattern unsubscription notification to the client.
* Pattern can be NULL: this is useful when the client sends a mass
* punsubscribe command but there are no pattern to unsubscribe from: we
* still send a notification. */
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
Expand All @@ -118,6 +135,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/*-----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Expand Up @@ -272,6 +272,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
* and writing entire reply. */
#define CLIENT_PUSHING (1ULL<<41) /* This client is pushing notifications. */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
Expand Down
9 changes: 9 additions & 0 deletions src/tracking.c
Expand Up @@ -209,6 +209,9 @@ void trackingRememberKeys(client *c) {
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;

int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
Expand All @@ -222,10 +225,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
c = redir;
using_redirection = 1;
old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
}

/* Only send such info for clients in RESP version 3 or more. However
Expand All @@ -244,6 +251,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}

Expand All @@ -254,6 +262,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}

/* This function is called when a key is modified in Redis and in the case
Expand Down
48 changes: 30 additions & 18 deletions tests/support/test.tcl
Expand Up @@ -31,6 +31,15 @@ proc assert_match {pattern value} {
}
}

proc assert_failed {expected_err detail} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -2])"
}
error "assertion:$expected_err $detail"
}

proc assert_not_equal {value expected {detail ""}} {
if {!($expected ne $value)} {
assert_failed "Expected '$value' not equal to '$expected'" $detail
Expand All @@ -39,34 +48,37 @@ proc assert_not_equal {value expected {detail ""}} {

proc assert_equal {value expected {detail ""}} {
if {$expected ne $value} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be equal to '$expected' $detail"
assert_failed "Expected '$value' to be equal to '$expected'" $detail
}
}

proc assert_lessthan {value expected {detail ""}} {
if {!($value < $expected)} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be lessthan to '$expected' $detail"
assert_failed "Expected '$value' to be less than '$expected'" $detail
}
}

proc assert_lessthan_equal {value expected {detail ""}} {
if {!($value <= $expected)} {
assert_failed "Expected '$value' to be less than or equal to '$expected'" $detail
}
}

proc assert_morethan {value expected {detail ""}} {
if {!($value > $expected)} {
assert_failed "Expected '$value' to be more than '$expected'" $detail
}
}

proc assert_morethan_equal {value expected {detail ""}} {
if {!($value >= $expected)} {
assert_failed "Expected '$value' to be more than or equal to '$expected'" $detail
}
}

proc assert_range {value min max {detail ""}} {
if {!($value <= $max && $value >= $min)} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be between to '$min' and '$max' $detail"
assert_failed "Expected '$value' to be between to '$min' and '$max'" $detail
}
}

Expand Down
42 changes: 42 additions & 0 deletions tests/unit/introspection.tcl
Expand Up @@ -3,6 +3,48 @@ start_server {tags {"introspection"}} {
r client list
} {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*}

test "CLIENT REPLY OFF/ON: disable all commands reply" {
set rd [redis_deferring_client]

# These replies were silenced.
$rd client reply off
$rd ping pong
$rd ping pong2

$rd client reply on
assert_equal {OK} [$rd read]
$rd ping pong3
assert_equal {pong3} [$rd read]

$rd close
}

test "CLIENT REPLY SKIP: skip the next command reply" {
set rd [redis_deferring_client]

# The first pong reply was silenced.
$rd client reply skip
$rd ping pong

$rd ping pong2
assert_equal {pong2} [$rd read]

$rd close
}

test "CLIENT REPLY ON: unset SKIP flag" {
set rd [redis_deferring_client]

$rd client reply skip
$rd client reply on
assert_equal {OK} [$rd read] ;# OK from CLIENT REPLY ON command

$rd ping
assert_equal {PONG} [$rd read]

$rd close
}

test {MONITOR can log executed commands} {
set rd [redis_deferring_client]
$rd monitor
Expand Down
28 changes: 28 additions & 0 deletions tests/unit/pubsub.tcl
Expand Up @@ -188,6 +188,30 @@ start_server {tags {"pubsub"}} {
$rd1 close
}

test "PubSub messages with CLIENT REPLY OFF" {
set rd [redis_deferring_client]
$rd hello 3
$rd read ;# Discard the hello reply

# Test that the subscribe/psubscribe notification is ok
$rd client reply off
assert_equal {1} [subscribe $rd channel]
assert_equal {2} [psubscribe $rd ch*]

# Test that the publish notification is ok
$rd client reply off
assert_equal 2 [r publish channel hello]
assert_equal {message channel hello} [$rd read]
assert_equal {pmessage ch* channel hello} [$rd read]

# Test that the unsubscribe/punsubscribe notification is ok
$rd client reply off
assert_equal {1} [unsubscribe $rd channel]
assert_equal {0} [punsubscribe $rd ch*]

$rd close
}

test "PUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0 0 0} [punsubscribe $rd1 {foo.* bar.* quux.*}]
Expand Down Expand Up @@ -228,6 +252,7 @@ start_server {tags {"pubsub"}} {
test "Keyspace notifications: we receive keyspace notifications" {
r config set notify-keyspace-events KA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY OFF ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal {pmessage * __keyspace@9__:foo set} [$rd1 read]
Expand All @@ -237,6 +262,7 @@ start_server {tags {"pubsub"}} {
test "Keyspace notifications: we receive keyevent notifications" {
r config set notify-keyspace-events EA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY SKIP ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal {pmessage * __keyevent@9__:set foo} [$rd1 read]
Expand All @@ -246,6 +272,8 @@ start_server {tags {"pubsub"}} {
test "Keyspace notifications: we can receive both kind of events" {
r config set notify-keyspace-events KEA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY ON ;# Just coverage
assert_equal {OK} [$rd1 read]
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal {pmessage * __keyspace@9__:foo set} [$rd1 read]
Expand Down

0 comments on commit 6720926

Please sign in to comment.