Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a special notification unlink available only for modules #9406

Merged
merged 38 commits into from Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
884d8c3
Add a specail notification available only for modules, indicates that
huangzhw Aug 23, 2021
b26abc3
remove unlink to removed
huangzhw Aug 23, 2021
dc4694e
Don't unlink key when module removed notify
huangzhw Aug 28, 2021
9226d4e
attempt to simplify the plink ref
oranagra Sep 5, 2021
747cc43
fix comments, delete expire
huangzhw Sep 6, 2021
d4fa9a4
dbUnshareStringValue should not call dbOverwrite.
huangzhw Sep 12, 2021
c7c39ec
Stop lazy expire when notify module removed
huangzhw Sep 14, 2021
7f96495
rename and change test
huangzhw Sep 15, 2021
37617b8
Rename functions and move dictPauseRehashing
huangzhw Sep 26, 2021
c4781f2
Merge branch 'unstable' into module-notify
huangzhw Oct 3, 2021
5379e4c
Merge branch 'unstable' into module-notify
huangzhw Aug 21, 2022
b01ea9d
add key unlink event
huangzhw Sep 25, 2022
5c45826
move key event to moduleNotifyKeyUnlink
huangzhw Sep 28, 2022
1053be6
code review
huangzhw Oct 1, 2022
b01d982
type conv
huangzhw Oct 1, 2022
349bebf
code review add test and handle StringDMA
huangzhw Oct 3, 2022
e225b5c
Merge branch 'unstable' into module-notify
huangzhw Oct 3, 2022
d2a56b9
Move special definisions to the top of redismodule.h
oranagra Oct 3, 2022
48d425a
change test
huangzhw Oct 4, 2022
b7154a7
add subevent expired and evicted
huangzhw Oct 17, 2022
5237912
add doc
huangzhw Oct 17, 2022
dff15b9
clean
huangzhw Oct 17, 2022
8f0f770
fix setKey and add overwrite event
huangzhw Oct 27, 2022
8280c4a
Merge remote-tracking branch 'official/unstable' into module-notify
huangzhw Oct 27, 2022
610fe00
code review
huangzhw Oct 27, 2022
21dbaa4
typo
oranagra Oct 30, 2022
c52a357
fix 32bit platform, mstime_t size is bigger than pointer.
huangzhw Oct 30, 2022
8b7ef84
fix memory
huangzhw Oct 30, 2022
add7077
temporarily disable lazy expiry when we are in unlink/unlink2 callback
huangzhw Nov 1, 2022
147aba5
code review fix
huangzhw Nov 27, 2022
ea0e40c
use RedisModuleKey instead of RedisModuleString
huangzhw Nov 29, 2022
43e1e1e
fix
huangzhw Nov 29, 2022
9c01b1f
Merge remote-tracking branch 'official/unstable' into module-notify
huangzhw Nov 29, 2022
30869eb
fix
huangzhw Nov 29, 2022
0d854b3
fix access local variables
huangzhw Nov 29, 2022
abc06fc
move version
huangzhw Nov 30, 2022
cf8816e
add comments to test
oranagra Nov 30, 2022
b2b3c04
typo
oranagra Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 48 additions & 24 deletions src/db.c
Expand Up @@ -219,7 +219,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
* This function does not modify the expire time of the existing key.
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
static void dbOverwrite(redisDb *db, robj *key, robj *val, int signal) {
oranagra marked this conversation as resolved.
Show resolved Hide resolved
dictEntry *de = dictFind(db->dict,key->ptr);

serverAssertWithInfo(NULL,key,de != NULL);
Expand All @@ -228,12 +228,22 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
if (signal) {
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain old */
incrRefCount(old);
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
decrRefCount(old);
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
/* Entry in auxentry may be changed, so we need update auxentry */
auxentry = *de;
}
dictSetVal(db->dict, de, val);

if (server.lazyfree_lazy_server_del) {
Expand All @@ -244,6 +254,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictFreeVal(db->dict, &auxentry);
}

/* Replace an existing key with a new value, we just replace value and don't
* emit any events */
void dbReplaceValue(redisDb *db, robj *key, robj *val) {
dbOverwrite(db, key, val, 0);
}

/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
Expand All @@ -268,7 +284,7 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
if (!keyfound) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
dbOverwrite(db,key,val,1);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
Expand Down Expand Up @@ -315,23 +331,33 @@ robj *dbRandomKey(redisDb *db) {
}

/* Helper for sync and async delete. */
static int dbGenericDelete(redisDb *db, robj *key, int async) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
dictEntry *de = dictUnlink(db->dict,key->ptr);
int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
oranagra marked this conversation as resolved.
Show resolved Hide resolved
dictEntry **plink;
int table;
dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink,&table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain val */
incrRefCount(val);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id);
moduleNotifyKeyUnlink(key,val,db->id,flags);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,val->type);
/* We should call decr before freeObjAsync. If not, the refcount may be
* greater than 1, so freeObjAsync doesn't work */
decrRefCount(val);
if (async) {
freeObjAsync(key, val, db->id);
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
dictSetVal(db->dict, de, NULL);
}
if (server.cluster_enabled) slotToKeyDelEntry(de, db);
dictFreeUnlinkedEntry(db->dict,de);

/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
dictTwoPhaseUnlinkFree(db->dict,de,plink,table);
return 1;
} else {
return 0;
Expand All @@ -340,19 +366,19 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 0);
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
}

/* Delete a key, value, and associated expiration entry if any, from the DB. If
* the value consists of many allocations, it may be freed asynchronously. */
int dbAsyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 1);
return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
}

/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del);
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
}

/* Prepare the string object stored at 'key' to be modified destructively
Expand Down Expand Up @@ -388,7 +414,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbOverwrite(db,key,o);
dbReplaceValue(db,key,o);
}
return o;
}
Expand Down Expand Up @@ -1559,10 +1585,7 @@ long long getExpire(redisDb *db, robj *key) {
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
Expand Down Expand Up @@ -1655,6 +1678,7 @@ int keyIsExpired(redisDb *db, robj *key) {
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return 0;
if (!keyIsExpired(db,key)) return 0;

/* If we are running in the context of a replica, instead of
Expand Down
48 changes: 48 additions & 0 deletions src/dict.c
Expand Up @@ -534,6 +534,54 @@ void *dictFetchValue(dict *d, const void *key) {
return he ? dictGetVal(he) : NULL;
}

/* Find an element from the table, also get the plink of the entry. The entry
madolson marked this conversation as resolved.
Show resolved Hide resolved
* is returned if the element is found, and the user should later call
* `dictTwoPhaseUnlinkFree` with it in order to unlink and release it. Otherwise if
* the key is not found, NULL is returned. These two functions should be used in pair.
* `dictTwoPhaseUnlinkFind` pauses rehash and `dictTwoPhaseUnlinkFree` resumes rehash.
*
* We can use like this:
*
* dictEntry *de = dictTwoPhaseUnlinkFind(db->dict,key->ptr,&plink, &table);
* // Do something, but we can't modify the dict
* dictTwoPhaseUnlinkFree(db->dict,de,plink,table); // We don't need to lookup again
*/
dictEntry *dictTwoPhaseUnlinkFind(dict *d, const void *key, dictEntry ***plink, int *table_index)
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
{
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
uint64_t h, idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);

for (table = 0; table <= 1; table++) {
idx = h & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
dictEntry **ref = &d->ht_table[table][idx];
while(*ref) {
if (key==(*ref)->key || dictCompareKeys(d, key, (*ref)->key)) {
*table_index = table;
*plink = ref;
dictPauseRehashing(d);
return *ref;
}
ref = &(*ref)->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table_index)
{
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
if (he == NULL) return;
d->ht_used[table_index]--;
*plink = he->next;
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
dictResumeRehashing(d);
}

/* A fingerprint is a 64 bit number that represents the state of the dictionary
* at a given time, it's just a few dict properties xored together.
* When an unsafe iterator is initialized, we get the dict fingerprint, and check
Expand Down
2 changes: 2 additions & 0 deletions src/dict.h
Expand Up @@ -181,6 +181,8 @@ int dictReplace(dict *d, void *key, void *val);
int dictDelete(dict *d, const void *key);
dictEntry *dictUnlink(dict *d, const void *key);
void dictFreeUnlinkedEntry(dict *d, dictEntry *he);
dictEntry *dictTwoPhaseUnlinkFind(dict *d, const void *key, dictEntry ***plink, int *table_index);
void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table_index);
void dictRelease(dict *d);
dictEntry * dictFind(dict *d, const void *key);
void *dictFetchValue(dict *d, const void *key);
Expand Down
5 changes: 1 addition & 4 deletions src/evict.c
Expand Up @@ -678,10 +678,7 @@ int performEvictions(void) {
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
delta -= (long long) zmalloc_used_memory();
Expand Down
3 changes: 1 addition & 2 deletions src/expire.c
Expand Up @@ -637,8 +637,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
if (checkAlreadyExpired(when)) {
robj *aux;

int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
int deleted = dbGenericDelete(c->db,key,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
serverAssertWithInfo(c,key,deleted);
server.dirty++;

Expand Down
44 changes: 41 additions & 3 deletions src/module.c
Expand Up @@ -10478,6 +10478,7 @@ static uint64_t moduleEventVersions[] = {
-1, /* REDISMODULE_EVENT_REPL_ASYNC_LOAD */
-1, /* REDISMODULE_EVENT_EVENTLOOP */
-1, /* REDISMODULE_EVENT_CONFIG */
REDISMODULE_KEYINFO_VERSION, /* REDISMODULE_EVENT_KEY */
};

/* Register to be notified, via a callback, when the specified server event
Expand Down Expand Up @@ -10752,6 +10753,23 @@ static uint64_t moduleEventVersions[] = {
* // name of each modified configuration item
* uint32_t num_changes; // The number of elements in the config_names array
*
* * RedisModule_Event_Key
*
* Called when a key is removed from the keyspace. We can't modify any key in
* the event.
* The following sub events are available:
*
* * `REDISMODULE_SUBEVENT_KEY_DELETED`
* * `REDISMODULE_SUBEVENT_KEY_EXPIRED`
* * `REDISMODULE_SUBEVENT_KEY_EVICTED`
* * `REDISMODULE_SUBEVENT_KEY_OVERWRITE`
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
*
* The data pointer can be casted to a RedisModuleKeyInfo
* structure with the following fields:
*
* int32_t dbnum; // Database number of the key
* RedisModuleString *key; // Key name
*
* The function returns REDISMODULE_OK if the module was successfully subscribed
* for the specified event. If the API is called from a wrong context or unsupported event
* is given then REDISMODULE_ERR is returned. */
Expand Down Expand Up @@ -10831,6 +10849,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) {
return subevent < _REDISMODULE_SUBEVENT_EVENTLOOP_NEXT;
case REDISMODULE_EVENT_CONFIG:
return subevent < _REDISMODULE_SUBEVENT_CONFIG_NEXT;
case REDISMODULE_EVENT_KEY:
return subevent < _REDISMODULE_SUBEVENT_KEY_NEXT;
default:
break;
}
Expand Down Expand Up @@ -10909,6 +10929,11 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CONFIG) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_KEY) {
moduledata = data;
RedisModuleKeyInfoV1 *ki = data;
if (ki->dbnum != -1)
selectDb(ctx.client, ki->dbnum);
huangzhw marked this conversation as resolved.
Show resolved Hide resolved
}

el->module->in_hook++;
Expand Down Expand Up @@ -10958,9 +10983,22 @@ void processModuleLoadingProgressEvent(int is_aof) {
}
}

/* When a module key is deleted (in dbAsyncDelete/dbSyncDelete/dbOverwrite), it
/* When a key is deleted (in dbAsyncDelete/dbSyncDelete/setKey), it
* will be called to tell the module which key is about to be released. */
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) {
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
server.lazy_expire_disabled++;
oranagra marked this conversation as resolved.
Show resolved Hide resolved
int subevent = REDISMODULE_SUBEVENT_KEY_DELETED;
if (flags & DB_FLAG_KEY_EXPIRED) {
subevent = REDISMODULE_SUBEVENT_KEY_EXPIRED;
} else if (flags & DB_FLAG_KEY_EVICTED) {
subevent = REDISMODULE_SUBEVENT_KEY_EVICTED;
} else if (flags & DB_FLAG_KEY_OVERWRITE) {
subevent = REDISMODULE_SUBEVENT_KEY_OVERWRITE;
}
RedisModuleKeyInfoV1 ki = {REDISMODULE_KEYINFO_VERSION, dbid, key};
oranagra marked this conversation as resolved.
Show resolved Hide resolved
moduleFireServerEvent(REDISMODULE_EVENT_KEY, subevent, &ki);
server.lazy_expire_disabled--;

if (val->type == OBJ_MODULE) {
oranagra marked this conversation as resolved.
Show resolved Hide resolved
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
Expand All @@ -10970,7 +11008,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) {
mt->unlink2(&ctx,mv->value);
} else if (mt->unlink != NULL) {
mt->unlink(key,mv->value);
}
}
}
}

Expand Down