Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make tracking invalidation messages always after command's reply #9422

Merged
merged 12 commits into from Oct 7, 2021
4 changes: 4 additions & 0 deletions src/blocked.c
Expand Up @@ -303,6 +303,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
&deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);

if (dstkey) decrRefCount(dstkey);

Expand Down Expand Up @@ -348,6 +349,7 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
genericZpopCommand(receiver, &rl->key, 1, where, 1, count, use_nested_array, reply_nil_when_empty, &deleted);
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
unblockClient(receiver);
afterCommand(receiver);

/* Replicate the command. */
int argc = 2;
Expand Down Expand Up @@ -470,6 +472,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
* valid, so we must do the setup above before
* this call. */
unblockClient(receiver);
afterCommand(receiver);
}
}
}
Expand Down Expand Up @@ -520,6 +523,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));

moduleUnblockClient(receiver);
afterCommand(receiver);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/db.c
Expand Up @@ -556,7 +556,7 @@ long long dbTotalServerKeyCount() {
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
trackingInvalidateKey(c,key,1);
}

void signalFlushedDb(int dbid, int async) {
Expand Down
28 changes: 28 additions & 0 deletions src/server.c
Expand Up @@ -2933,6 +2933,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* our clients. */
updateFailoverStatus();

/* Flush the pending invalidation messages to clients participating to the
* client side caching protocol in general mode */
trackingHandlePendingKeyInvalidations();

/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
Expand Down Expand Up @@ -3667,6 +3671,7 @@ void initServer(void) {
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.tracking_pending_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.client_pause_type = CLIENT_PAUSE_OFF;
Expand Down Expand Up @@ -4496,6 +4501,9 @@ void call(client *c, int flags) {
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;

/* Do some maintenance job and cleanup */
afterCommand(c);
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
}

/* Used when a command that is ready for execution needs to be rejected, due to
Expand Down Expand Up @@ -4533,6 +4541,19 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
}
}

/* This is called after a command in call, we can do some maintenance job in it. */
void afterCommand(client *c) {
UNUSED(c);
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
if (!server.fixed_time_expire) trackingHandlePendingKeyInvalidations();
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
}

/* This means we are in a nested call of a call*/
int inNestedCall(void) {
return !server.fixed_time_expire;
}

/* Returns 1 for commands that may have key names in their arguments, but the legacy range
* spec doesn't cover all of them. */
void populateCommandMovableKeys(struct redisCommand *cmd) {
Expand Down Expand Up @@ -4710,6 +4731,13 @@ int processCommand(client *c) {
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = (performEvictions() == EVICT_FAIL);

/* performEvictions may evict keys, so we need flush pending tracking
* invalidation keys. If we don't do this, we may get an invalidation
* message after we perform operation on the key, where in fact this
* message belongs to the old value of the key before it gets evicted.*/
trackingHandlePendingKeyInvalidations();

/* performEvictions may flush slave output buffers. This may result
* in a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
Expand Down
7 changes: 6 additions & 1 deletion src/server.h
Expand Up @@ -1596,6 +1596,7 @@ struct redisServer {
/* Client side caching. */
unsigned int tracking_clients; /* # of clients with tracking enabled.*/
size_t tracking_table_max_keys; /* Max number of keys in tracking table. */
list *tracking_pending_keys; /* tracking invalidation keys pending to flush */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
Expand Down Expand Up @@ -2105,7 +2106,9 @@ void addReplyStatusFormat(client *c, const char *fmt, ...);
void enableTracking(client *c, uint64_t redirect_to, uint64_t options, robj **prefix, size_t numprefix);
void disableTracking(client *c);
void trackingRememberKeys(client *c);
void trackingInvalidateKey(client *c, robj *keyobj);
void trackingInvalidateKey(client *c, robj *keyobj, int bcast);
void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj);
void trackingHandlePendingKeyInvalidations(void);
void trackingInvalidateKeysOnFlush(int async);
void freeTrackingRadixTree(rax *rt);
void freeTrackingRadixTreeAsync(rax *rt);
Expand Down Expand Up @@ -2429,6 +2432,8 @@ void preventCommandAOF(client *c);
void preventCommandReplication(client *c);
void slowlogPushCurrentCommand(client *c, struct redisCommand *cmd, ustime_t duration);
int prepareForShutdown(int flags);
void afterCommand(client *c);
int inNestedCall(void);
#ifdef __GNUC__
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
Expand Down
51 changes: 42 additions & 9 deletions src/tracking.c
Expand Up @@ -58,6 +58,12 @@ typedef struct bcastState {
prefix. */
} bcastState;

/* This is used to store an invalidation key pending to flush */
typedef struct invalidatedKey {
uint64_t client_id;
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
robj *key;
} invalidatedKey;

/* Remove the tracking state from the client 'c'. Note that there is not much
* to do for us here, if not to decrement the counter of the clients in
* tracking mode, because we just store the ID of the client in the tracking
Expand Down Expand Up @@ -348,13 +354,16 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {
if (TrackingTable == NULL) return;

unsigned char *key = (unsigned char*)keyobj->ptr;
size_t keylen = sdslen(keyobj->ptr);

if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,key,keylen);
trackingRememberKeyToBroadcast(c,(char *)key,keylen);

rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen);
rax *ids = raxFind(TrackingTable,key,keylen);
if (ids == raxNotFound) return;

raxIterator ri;
Expand Down Expand Up @@ -384,7 +393,8 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
continue;
}

sendTrackingMessage(target,key,keylen,0);
/* We need to use id as client may be freed */
trackingScheduleKeyInvalidation(id,keyobj);
}
raxStop(&ri);

Expand All @@ -395,10 +405,31 @@ void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}

/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1);
void trackingScheduleKeyInvalidation(uint64_t client_id, robj *keyobj) {
invalidatedKey *ik = zmalloc(sizeof(invalidatedKey));

ik->client_id = client_id;
ik->key = keyobj;
incrRefCount(keyobj);
listAddNodeTail(server.tracking_pending_keys, ik);
}

void trackingHandlePendingKeyInvalidations() {
if (!listLength(server.tracking_pending_keys)) return;

listNode *ln;
listIter li;

listRewind(server.tracking_pending_keys,&li);
while ((ln = listNext(&li)) != NULL) {
invalidatedKey *ik = listNodeValue(ln);
client *target = lookupClientByID(ik->client_id);
if (target != NULL)
sendTrackingMessage(target,(char *)ik->key->ptr,sdslen(ik->key->ptr),0);
decrRefCount(ik->key);
zfree(ik);
}
listEmpty(server.tracking_pending_keys);
}

/* This function is called when one or all the Redis databases are
Expand Down Expand Up @@ -475,7 +506,9 @@ void trackingLimitUsedSlots(void) {
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
if (raxEOF(&ri)) break;
trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0);
robj *keyobj = createStringObject((char*)ri.key,ri.key_len);
trackingInvalidateKey(NULL,keyobj,0);
decrRefCount(keyobj);
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
raxStop(&ri);
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/tracking.tcl
Expand Up @@ -369,6 +369,56 @@ start_server {tags {"tracking network"}} {
$r CLIENT TRACKING OFF
}

test {hdel deliver invlidate message after response in the same connection} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r HSET myhash f 1
r HGET myhash f
set res [r HDEL myhash f]
assert_equal $res 1
set res [r read]
assert_equal $res {invalidate myhash}
}

test {Tracking invalidation message is not interleaved with multiple keys response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
# We need disable active expire, so we can trigger lazy expire
r DEBUG SET-ACTIVE-EXPIRE 0
r MULTI
r MSET x{t} 1 y{t} 2
r PEXPIRE y{t} 100
r GET y{t}
r EXEC
after 110
# Read expired key y{t}, generate invalidate message about this key
set res [r MGET x{t} y{t}]
assert_equal $res {1 {}}
# Consume the invalidate message which is after command response
set res [r read]
assert_equal $res {invalidate y{t}}
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}

test {Tracking invalidation message is not interleaved with transaction response} {
r CLIENT TRACKING off
r HELLO 3
r CLIENT TRACKING on
r MSET a{t} 1 b{t} 2
r GET a{t}
# Start a transaction, make a{t} generate an invalidate message
r MULTI
r INCR a{t}
r GET b{t}
set res [r EXEC]
assert_equal $res {2 2}
set res [r read]
# Consume the invalidate message which is after command response
assert_equal $res {invalidate a{t}}
}

test {Tracking gets notification on tracking table key eviction} {
r CLIENT TRACKING off
r CLIENT TRACKING on REDIRECT $redir_id NOLOOP
Expand Down