Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort out the mess around writable replicas and lookupKeyRead/Write #9572

Merged
merged 24 commits into from
Nov 28, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c09c811
Sort out the mess around writable replicas and lookupKeyRead/Write
zuiderkwast Sep 30, 2021
c002931
Apply suggestions from code review
zuiderkwast Oct 7, 2021
49b2c48
Fixup: Review suggestions
zuiderkwast Oct 11, 2021
710e9d9
Fixup: Add assert in expireIfNeeded
zuiderkwast Oct 11, 2021
ef55207
Fixup: spelling typo
zuiderkwast Oct 11, 2021
068e7a1
Fixup: Modules test commands marked as readonly
zuiderkwast Oct 11, 2021
562b96f
Fixup: Make COPY use plain lookupKeyRead()
zuiderkwast Oct 11, 2021
3b1e307
Fix the assert in expireIfNeeded for eval and debug loadaof + stuff
zuiderkwast Oct 14, 2021
6f01fb0
Add test cases
zuiderkwast Oct 14, 2021
32a7045
Add WRITE flag to all test module write commands
zuiderkwast Oct 14, 2021
cca77ac
Fix review comments
zuiderkwast Oct 21, 2021
a439aa9
Merge remote-tracking branch 'redis/unstable' into writable-replicas
zuiderkwast Oct 21, 2021
aecc32c
Attemt to fix failing test
zuiderkwast Oct 22, 2021
34d1119
Review comments
zuiderkwast Oct 25, 2021
260c454
Fixup: Update test wait timeout messages
zuiderkwast Oct 26, 2021
c78d43e
Delete the LOOKUP_NOEXPIRE flag
zuiderkwast Nov 3, 2021
0ee4d68
Never consider keys expired when client is the master
zuiderkwast Nov 3, 2021
623ca03
Use dictFind instead of lookupKey in scanDatabaseForReadyLists (db.c)
zuiderkwast Nov 4, 2021
323dffc
Add test cases for SWAPDB with blocked client and replication of expi…
zuiderkwast Nov 4, 2021
f5def78
Fix review comments on test cases
zuiderkwast Nov 4, 2021
ae46106
Rename expire_on_replica to force_delete_expired + fix another comment
zuiderkwast Nov 24, 2021
d091ef9
Merge remote-tracking branch 'redis/unstable' into writable-replicas
zuiderkwast Nov 24, 2021
ef10a77
Restore deleted parts of expireIfNeeded comment
zuiderkwast Nov 24, 2021
34676f9
Sleep one extra millisecond in test case
zuiderkwast Nov 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ int loadAppendOnlyFile(char *filename) {
* to the same file we're about to read. */
server.aof_state = AOF_OFF;

fakeClient = createAOFClient();
client *old_client = server.current_client;
fakeClient = server.current_client = createAOFClient();
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);

/* Check if this AOF file has an RDB preamble. In that case we need to
Expand Down Expand Up @@ -892,6 +893,7 @@ int loadAppendOnlyFile(char *filename) {

cleanup:
if (fakeClient) freeClient(fakeClient);
server.current_client = old_client;
fclose(fp);
stopLoading(ret == AOF_OK);
return ret;
Expand Down
4 changes: 2 additions & 2 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ void handleClientsBlockedOnKeys(void) {
updateCachedTime(0);

/* Serve clients blocked on the key. */
robj *o = lookupKeyWrite(rl->db,rl->key);

robj *o = lookupKeyReadWithFlags(rl->db, rl->key, (LOOKUP_NONOTIFY |
LOOKUP_NOSTATS));
oranagra marked this conversation as resolved.
Show resolved Hide resolved
if (o != NULL) {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -5940,8 +5940,9 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
}

/* Migrating / Importing slot? Count keys we don't have. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
{
missing_keys++;
}
Expand Down
167 changes: 86 additions & 81 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct dbBackup {
* C-level DB API
*----------------------------------------------------------------------------*/

int expireIfNeeded(redisDb *db, robj *key, int expire_on_replica);
int keyIsExpired(redisDb *db, robj *key);

/* Update LFU when an object is accessed.
Expand All @@ -56,14 +57,46 @@ void updateLFU(robj *val) {
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}

/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
/* Lookup a key for read or write operations, or return NULL if the key is not
* found in the specified DB. This function implements the functionality of
* lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
*
* Side-effects of calling this function:
*
* 1. A key gets expired if it reached it's TTL.
* 2. The key's last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): No special flags are passed.
* LOOKUP_NOTOUCH: Don't alter the last access time of the key.
* LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
* LOOKUP_NOSTATS: Don't increment key hits/misses couters.
* LOOKUP_NOEXPIRE: Don't check if the key has expired.
* LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
* replicas, use separate keyspace stats and events (TODO)).
*
* Note: this function also returns NULL if the key is logically expired but
* still existing, in case this is a replica and the LOOKUP_WRITE is not set.
* Even if the key expiry is master-driven, we can correctly report a key is
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->dict,key->ptr);
robj *val = NULL;
if (de) {
robj *val = dictGetVal(de);
val = dictGetVal(de);
if (!(flags & LOOKUP_NOEXPIRE) &&
expireIfNeeded(db, key, flags & LOOKUP_WRITE))
{
/* The key is no longer valid. */
val = NULL;
}
}

if (val) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
Expand All @@ -74,75 +107,33 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
val->lru = LRU_CLOCK();
}
}
return val;

if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_hits++;
/* TODO: Use separate hits stats for WRITE */
} else {
return NULL;
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_misses++;
/* TODO: Use separate misses stats and notify event for WRITE */
}

return val;
}

/* Lookup a key for read operations, or return NULL if the key is not found
* in the specified DB.
*
* As a side effect of calling this function:
* 1. A key gets expired if it reached it's TTL.
* 2. The key last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): no special flags are passed.
* LOOKUP_NOTOUCH: don't alter the last access time of the key.
*
* Note: this function also returns NULL if the key is logically expired
* but still existing, in case this is a slave, since this API is called only
* for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */
* This function is equivalent to lookupKey(). The point of using this function
* rather than lookupKey() directly is to indicate that the purpose is to read
* the key. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;

if (expireIfNeeded(db,key) == 1) {
/* If we are in the context of a master, expireIfNeeded() returns 1
* when the key is no longer valid, so we can return NULL ASAP. */
if (server.masterhost == NULL)
goto keymiss;

/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessing expired values in a read-only fashion, that
* will say the key as non existing.
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
goto keymiss;
}
}
val = lookupKey(db,key,flags);
if (val == NULL)
goto keymiss;
server.stat_keyspace_hits++;
return val;

keymiss:
if (!(flags & LOOKUP_NONOTIFY)) {
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
server.stat_keyspace_misses++;
return NULL;
serverAssert(!(flags & LOOKUP_WRITE));
return lookupKey(db, key, flags);
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
}

/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
Expand All @@ -152,13 +143,13 @@ robj *lookupKeyRead(redisDb *db, robj *key) {
}

/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
* the key if its TTL is reached. It's equivalent to lookupKey() with the
* LOOKUP_WRITE flag added.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
return lookupKey(db,key,flags);
return lookupKey(db, key, flags | LOOKUP_WRITE);
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
}

robj *lookupKeyWrite(redisDb *db, robj *key) {
Expand Down Expand Up @@ -294,7 +285,7 @@ robj *dbRandomKey(redisDb *db) {
* return a key name that may be already expired. */
return keyobj;
}
if (expireIfNeeded(db,keyobj)) {
if (expireIfNeeded(db,keyobj,0)) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
Expand Down Expand Up @@ -676,7 +667,7 @@ void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]);
expireIfNeeded(c->db,c->argv[j],0);
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
Expand Down Expand Up @@ -974,7 +965,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
}

/* Filter element if it is an expired key. */
if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;
if (!filter && o == NULL && expireIfNeeded(c->db, kobj, 0)) filter = 1;

/* Remove the element and its associated value if needed. */
if (filter) {
Expand Down Expand Up @@ -1241,7 +1232,7 @@ void copyCommand(client *c) {
}

/* Check if the element exists and get a reference */
o = lookupKeyWrite(c->db, key);
o = lookupKeyRead(c->db, key);
if (!o) {
addReply(c,shared.czero);
return;
Expand Down Expand Up @@ -1301,7 +1292,8 @@ void scanDatabaseForReadyLists(redisDb *db) {
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
robj *value = lookupKey(db,key,LOOKUP_NOTOUCH);
robj *value = lookupKey(db, key, (LOOKUP_NOTOUCH | LOOKUP_NOSTATS |
LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE));
if (value) signalKeyAsReady(db, key, value->type);
}
dictReleaseIterator(di);
Expand Down Expand Up @@ -1516,31 +1508,44 @@ int keyIsExpired(redisDb *db, robj *key) {
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because slave instances do not expire keys, they wait
* for DELs from the master for consistency matters. However even
* slaves will try to have a coherent return value for the function,
* so that read commands executed in the slave side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
oranagra marked this conversation as resolved.
Show resolved Hide resolved
* instance, because replicas do not expire keys. They wait for DELs
* from the master for consistency matters. The exception is write
* operations to writable replicas. However, even read-only replicas
* will try to have a coherent return value of the function, so that
* read commands executed on the replica will be able to behave as if
* the key is expired even if still present (because the master has yet
* to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
int expireIfNeeded(redisDb *db, robj *key, int expire_on_replica) {
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

/* Deleting expired keys on a replica makes the replica inconsistent with
* the master. The reason it's allowed for write commands is to make
* writable replicas behave consistently. It shall not be used in readonly
* commands. */
if (expire_on_replica) {
oranagra marked this conversation as resolved.
Show resolved Hide resolved
client *c = server.in_eval ? server.lua_client : server.current_client;
serverAssert(!c || !c->cmd || (c->cmd->flags & (CMD_WRITE|CMD_MODULE)));
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
}

if (!keyIsExpired(db,key)) return 0;

/* If we are running in the context of a slave, instead of
/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
* the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on writable
* replicas.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;
if (server.masterhost != NULL && !expire_on_replica) return 1;

/* If clients are paused, we keep the current dataset constant,
* but return to the client what we believe is the right state. Typically,
Expand Down
14 changes: 10 additions & 4 deletions src/hyperloglog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1257,8 +1257,15 @@ void pfcountCommand(client *c) {
/* Case 2: cardinality of the single HLL.
*
* The user specified a single key. Either return the cached value
* or compute one and update the cache. */
o = lookupKeyWrite(c->db,c->argv[1]);
* or compute one and update the cache.
*
* Since a HLL is a regular Redis string type value, updating the cache does
* modify the value. We do a lookupKeyRead anyway since this is flagged as a
* read-only command. The difference is that with lookupKeyWrite, a
* logically expired key on a replica is deleted, while with lookupKeyRead
* it isn't, but the lookup returns NULL either way if the key is logically
* expired, which is what matters here. */
o = lookupKeyRead(c->db,c->argv[1]);
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
if (o == NULL) {
/* No key? Cardinality is zero since no element was added, otherwise
* we would have a key as HLLADD creates it as a side effect. */
Expand Down Expand Up @@ -1295,8 +1302,7 @@ void pfcountCommand(client *c) {
hdr->card[5] = (card >> 40) & 0xff;
hdr->card[6] = (card >> 48) & 0xff;
hdr->card[7] = (card >> 56) & 0xff;
/* This is not considered a read-only command even if the
* data structure is not modified, since the cached value
/* This is considered a read-only command even if the cached value
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
* may be modified and given that the HLL is a Redis string
* we need to propagate the change. */
signalModifiedKey(c,c->db,c->argv[1]);
Expand Down
10 changes: 6 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2542,11 +2542,9 @@ int removeExpire(redisDb *db, robj *key);
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj);
void propagateExpire(redisDb *db, robj *key, int lazy);
int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj *key);
void setExpire(client *c, redisDb *db, robj *key, long long when);
int checkAlreadyExpired(long long when);
robj *lookupKey(redisDb *db, robj *key, int flags);
robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply);
Expand All @@ -2558,8 +2556,12 @@ robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock, int lru_multiplier);
#define LOOKUP_NONE 0
#define LOOKUP_NOTOUCH (1<<0)
#define LOOKUP_NONOTIFY (1<<1)
#define LOOKUP_NOTOUCH (1<<0) /* Don't update LRU. */
#define LOOKUP_NONOTIFY (1<<1) /* Don't trigger keyspace event on key misses. */
#define LOOKUP_NOSTATS (1<<2) /* Don't update keyspace hits/misses couters. */
#define LOOKUP_NOEXPIRE (1<<3) /* Don't check if the key is expired. */
#define LOOKUP_WRITE (1<<4) /* Delete expired keys even in replicas. */
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

void dbAdd(redisDb *db, robj *key, robj *val);
int dbAddRDBLoad(redisDb *db, sds key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val);
Expand Down