Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make tracking invalidation messages always after command's reply #9422

Merged
merged 12 commits into from Oct 7, 2021
16 changes: 16 additions & 0 deletions src/blocked.c
Expand Up @@ -295,6 +295,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* call. */
if (dstkey) incrRefCount(dstkey);

client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
serveClientBlockedOnList(receiver, o,
Expand All @@ -303,6 +305,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
&deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;

if (dstkey) decrRefCount(dstkey);

Expand Down Expand Up @@ -343,11 +347,15 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
? 1 : 0;
int reply_nil_when_empty = use_nested_array;

client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;

/* Replicate the command. */
int argc = 2;
Expand Down Expand Up @@ -442,6 +450,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
}
}

client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
/* Emit the two elements sub-array consisting of
Expand Down Expand Up @@ -470,6 +480,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
}
}
}
Expand Down Expand Up @@ -514,12 +526,16 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
* different modules with different triggers to consider if a key
* is ready or not. This means we can't exit the loop but need
* to continue after the first failure. */
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

moduleUnblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Expand Up @@ -556,7 +556,7 @@ long long dbTotalServerKeyCount() {
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
trackingInvalidateKey(c,key,1);
}

void signalFlushedDb(int dbid, int async) {
Expand Down
28 changes: 28 additions & 0 deletions src/server.c
Expand Up @@ -2933,6 +2933,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* our clients. */
updateFailoverStatus();

/* Since we rely on current_client to send scheduled invalidation messages
* we have to flush them after each command, so when we get here, the list
* must be empty. */
serverAssert(listLength(server.tracking_pending_keys) == 0);

/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
Expand Down Expand Up @@ -3655,6 +3660,7 @@ void initServer(void) {
server.current_client = NULL;
server.errors = raxNew();
server.fixed_time_expire = 0;
server.in_nested_call = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
Expand All @@ -3667,6 +3673,7 @@ void initServer(void) {
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.tracking_pending_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.client_pause_type = CLIENT_PAUSE_OFF;
Expand Down Expand Up @@ -4314,6 +4321,7 @@ void call(client *c, int flags) {
if (server.fixed_time_expire++ == 0) {
updateCachedTime(0);
}
server.in_nested_call++;

elapsedStart(&call_timer);
c->cmd->proc(c);
Expand All @@ -4322,6 +4330,8 @@ void call(client *c, int flags) {
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;

server.in_nested_call--;

/* Update failed command calls if required.
* We leverage a static variable (prev_err_count) to retain
* the counter across nested function calls and avoid logging
Expand Down Expand Up @@ -4496,6 +4506,9 @@ void call(client *c, int flags) {
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;

/* Do some maintenance job and cleanup */
afterCommand(c);
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
}

/* Used when a command that is ready for execution needs to be rejected, due to
Expand Down Expand Up @@ -4533,6 +4546,14 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
}
}

/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
UNUSED(c);
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (!server.in_nested_call) trackingHandlePendingKeyInvalidations();
}

/* Returns 1 for commands that may have key names in their arguments, but the legacy range
* spec doesn't cover all of them. */
void populateCommandMovableKeys(struct redisCommand *cmd) {
Expand Down Expand Up @@ -4710,6 +4731,13 @@ int processCommand(client *c) {
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = (performEvictions() == EVICT_FAIL);

/* performEvictions may evict keys, so we need flush pending tracking
* invalidation keys. If we don't do this, we may get an invalidation
* message after we perform operation on the key, where in fact this
* message belongs to the old value of the key before it gets evicted.*/
trackingHandlePendingKeyInvalidations();

/* performEvictions may flush slave output buffers. This may result
* in a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
Expand Down
8 changes: 7 additions & 1 deletion src/server.h
Expand Up @@ -1312,6 +1312,7 @@ struct redisServer {

rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
int in_nested_call; /* If > 0, in a nested call of a call */
rax *clients_index; /* Active clients dictionary by client ID. */
pause_type client_pause_type; /* True if clients are currently paused */
list *paused_clients; /* List of pause clients */
Expand Down Expand Up @@ -1596,6 +1597,7 @@ struct redisServer {
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
list *tracking_pending_keys; /* tracking invalidation keys pending to flush */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
Expand Down Expand Up @@ -2105,7 +2107,9 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKey(client *c, robj *keyobj, int bcast);
void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj);
void trackingHandlePendingKeyInvalidations(void);
void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt);
Expand Down Expand Up @@ -2429,6 +2433,8 @@ void preventCommandAOF(client *c);
void preventCommandReplication(client *c);
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
int prepareForShutdown(int flags);
void afterCommand(client *c);
int inNestedCall(void);
#ifdef __GNUC__
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
Expand Down
43 changes: 34 additions & 9 deletions src/tracking.c
Expand Up @@ -348,13 +348,16 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
if (TrackingTable == NULL) return;

unsigned char *key = (unsigned char*)keyobj->ptr;
size_t keylen = sdslen(keyobj->ptr);

if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,key,keylen);
trackingRememberKeyToBroadcast(c,(char *)key,keylen);

rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen);
rax *ids = raxFind(TrackingTable,key,keylen);
if (ids == raxNotFound) return;

raxIterator ri;
Expand Down Expand Up @@ -384,7 +387,15 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
continue;
}

sendTrackingMessage(target,key,keylen,0);
/* If target is current client, we need schedule key invalidation.
* As the invalidation messages may be interleaved with command
* response and should after command response */
if (target == server.current_client){
incrRefCount(keyobj);
listAddNodeTail(server.tracking_pending_keys, keyobj);
} else {
sendTrackingMessage(target,(char *)keyobj->ptr,sdslen(keyobj->ptr),0);
}
}
raxStop(&ri);

Expand All @@ -395,10 +406,22 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}

/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1);
void trackingHandlePendingKeyInvalidations() {
if (!listLength(server.tracking_pending_keys)) return;

listNode *ln;
listIter li;

listRewind(server.tracking_pending_keys,&li);
while ((ln = listNext(&li)) != NULL) {
robj *key = listNodeValue(ln);
/* current_client maybe freed, so we need to send invalidation
* message only when current_client is still alive */
if (server.current_client != NULL)
oranagra marked this conversation as resolved.
Show resolved Hide resolved
sendTrackingMessage(server.current_client,(char *)key->ptr,sdslen(key->ptr),0);
decrRefCount(key);
}
listEmpty(server.tracking_pending_keys);
}

/* This function is called when one or all the Redis databases are
Expand Down Expand Up @@ -475,7 +498,9 @@ void trackingLimitUsedSlots(void) {
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
if (raxEOF(&ri)) break;
trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0);
robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
trackingInvalidateKey(NULL,keyobj,0);
decrRefCount(keyobj);
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
raxStop(&ri);
Expand Down
95 changes: 95 additions & 0 deletions tests/unit/tracking.tcl
Expand Up @@ -369,6 +369,101 @@ start_server {tags {"tracking network"}} {
$r CLIENT TRACKING OFF
}

test {hdel deliver invlidate message after response in the same connection} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r HSET myhash f 1
r HGET myhash f
set res [r HDEL myhash f]
assert_equal $res 1
set res [r read]
assert_equal $res {invalidate myhash}
}

test {Tracking invalidation message is not interleaved with multiple keys response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
# We need disable active expire, so we can trigger lazy expire
r DEBUG SET-ACTIVE-EXPIRE 0
r MULTI
r MSET x{t} 1 y{t} 2
r PEXPIRE y{t} 100
r GET y{t}
r EXEC
after 110
# Read expired key y{t}, generate invalidate message about this key
set res [r MGET x{t} y{t}]
assert_equal $res {1 {}}
# Consume the invalidate message which is after command response
set res [r read]
assert_equal $res {invalidate y{t}}
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}

test {Tracking invalidation message is not interleaved with transaction response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r MSET a{t} 1 b{t} 2
r GET a{t}
# Start a transaction, make a{t} generate an invalidate message
r MULTI
r INCR a{t}
r GET b{t}
set res [r EXEC]
assert_equal $res {2 2}
set res [r read]
# Consume the invalidate message which is after command response
assert_equal $res {invalidate a{t}}
}

test {Tracking invalidation message of eviction keys should be before response} {
# Get the current memory limit and calculate a new limit.
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
set used [s used_memory]
set limit [expr {$used+100*1024}]
set old_policy [lindex [r config get maxmemory-policy] 1]
r config set maxmemory $limit
# We set policy volatile-random, so only keys with ttl will be evicted
r config set maxmemory-policy volatile-random
# Add a volatile key and tracking it.
r setex volatile-key 10000 x
r get volatile-key
# We use SETBIT here, so we can set a big key and get the used_memory
# bigger than maxmemory. Next command will evict volatile keys. We
# can't use SET, as SET uses big input buffer, so it will fail.
r setbit big-key 1000000 0
# volatile-key is evicted before response.
set res [r getbit big-key 0]
assert_equal $res {invalidate volatile-key}
set res [r read]
assert_equal $res 0
r config set maxmemory-policy $old_policy
r config set maxmemory 0
}

test {Unblocked BLMOVE gets notification after response} {
r RPUSH list2{t} a
$rd HELLO 3
$rd read
$rd CLIENT TRACKING on
$rd read
# Tracking key list2{t}
$rd LRANGE list2{t} 0 -1
$rd read
# We block on list1{t}
$rd BLMOVE list1{t} list2{t} left left 0
wait_for_blocked_clients_count 1
# unblock $rd, list2{t} gets element and generate invalidation message
r rpush list1{t} foo
assert_equal [$rd read] {foo}
assert_equal [$rd read] {invalidate list2{t}}
}

test {Tracking gets notification on tracking table key eviction} {
r CLIENT TRACKING off
r CLIENT TRACKING on REDIRECT $redir_id NOLOOP
Expand Down