Skip to content

Commit

Permalink
Module API to allow writes after key space notification hooks (redis#…
Browse files Browse the repository at this point in the history
…11199)

### Summary of API additions

* `RedisModule_AddPostNotificationJob` - new API to call inside a key space
  notification (and on more locations in the future) and allow to add a post job as describe above.
* New module option, `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`,
  allows to disable Redis protection of nested key-space notifications.
* `RedisModule_GetModuleOptionsAll` - gets the mask of all supported module options so a module
  will be able to check if a given option is supported by the current running Redis instance.

### Background

The following PR is a proposal of handling write operations inside module key space notifications.
After a lot of discussions we came to a conclusion that module should not perform any write
operations on key space notification.

Some examples of issues that such write operation can cause are describe on the following links:

* Bad replication oreder - redis#10969
* Used after free - redis#10969 (comment)
* Used after free - redis#9406 (comment)

There are probably more issues that are yet to be discovered. The underline problem with writing
inside key space notification is that the notification runs synchronously, this means that the notification
code will be executed in the middle on Redis logic (commands logic, eviction, expire).
Redis **do not assume** that the data might change while running the logic and such changes
can crash Redis or cause unexpected behaviour.

The solution is to state that modules **should not** perform any write command inside key space
notification (we can chose whether or not we want to force it). To still cover the use-case where
module wants to perform a write operation as a reaction to key space notifications, we introduce
a new API , `RedisModule_AddPostNotificationJob`, that allows to register a callback that will be
called by Redis when the following conditions hold:

* It is safe to perform any write operation.
* The job will be called atomically along side the operation that triggers it (in our case, key
  space notification).

Module can use this new API to safely perform any write operation and still achieve atomicity
between the notification and the write.

Although currently the API is supported on key space notifications, the API is written in a generic
way so that in the future we will be able to use it on other places (server events for example).

### Technical Details

Whenever a module uses `RedisModule_AddPostNotificationJob` the callback is added to a list
of callbacks (called `modulePostExecUnitJobs`) that need to be invoke after the current execution
unit ends (whether its a command, eviction, or active expire). In order to trigger those callback
atomically with the notification effect, we call those callbacks on `postExecutionUnitOperations`
(which was `propagatePendingCommands` before this PR). The new function fires the post jobs
and then calls `propagatePendingCommands`.

If the callback perform more operations that triggers more key space notifications. Those keys
space notifications might register more callbacks. Those callbacks will be added to the end
of `modulePostExecUnitJobs` list and will be invoke atomically after the current callback ends.
This raises a concerns of entering an infinite loops, we consider infinite loops as a logical bug
that need to be fixed in the module, an attempt to protect against infinite loops by halting the
execution could result in violation of the feature correctness and so **Redis will make no attempt
to protect the module from infinite loops**

In addition, currently key space notifications are not nested. Some modules might want to allow
nesting key-space notifications. To allow that and keep backward compatibility, we introduce a
new module option called `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`.
Setting this option will disable the Redis key-space notifications nesting protection and will
pass this responsibility to the module.

### Redis infrastructure

This PR promotes the existing `propagatePendingCommands` to an "Execution Unit" concept,
which is called after each atomic unit of execution,

Co-authored-by: Oran Agra <oran@redislabs.com>
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com>
Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com>
  • Loading branch information
4 people committed Apr 19, 2023
1 parent 53e2e2a commit 05acfcf
Show file tree
Hide file tree
Showing 14 changed files with 651 additions and 14 deletions.
1 change: 1 addition & 0 deletions runtest-moduleapi
Expand Up @@ -51,4 +51,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/timer \
--single unit/moduleapi/publish \
--single unit/moduleapi/usercall \
--single unit/moduleapi/postnotifications \
"${@}"
2 changes: 1 addition & 1 deletion src/blocked.c
Expand Up @@ -635,7 +635,7 @@ void handleClientsBlockedOnKeys(void) {
if (!o) {
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
propagatePendingCommands();
postExecutionUnitOperations();
} else {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
Expand Down
3 changes: 2 additions & 1 deletion src/cluster.c
Expand Up @@ -7361,8 +7361,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
postExecutionUnitOperations();
decrRefCount(key);
propagatePendingCommands();
postExecutionUnitOperations();
j++;
server.dirty++;
}
Expand Down
2 changes: 1 addition & 1 deletion src/evict.c
Expand Up @@ -691,7 +691,7 @@ int performEvictions(void) {
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
propagatePendingCommands();
postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;

Expand Down
2 changes: 1 addition & 1 deletion src/expire.c
Expand Up @@ -267,7 +267,7 @@ void activeExpireCycle(int type) {
if (activeExpireCycleTryExpire(db,e,now)) {
expired++;
/* Propagate the DEL command */
propagatePendingCommands();
postExecutionUnitOperations();
}
if (ttl > 0) {
/* We want the average TTL of keys yet
Expand Down
107 changes: 102 additions & 5 deletions src/module.c
Expand Up @@ -294,6 +294,9 @@ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
/* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);

/* Function pointer type for post jobs */
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);

/* Keyspace notification subscriber information.
* See RM_SubscribeToKeyspaceEvents() for more information. */
typedef struct RedisModuleKeyspaceSubscriber {
Expand All @@ -308,9 +311,21 @@ typedef struct RedisModuleKeyspaceSubscriber {
int active;
} RedisModuleKeyspaceSubscriber;

typedef struct RedisModulePostExecUnitJob {
/* The module subscribed to the event */
RedisModule *module;
RedisModulePostNotificationJobFunc callback;
void *pd;
void (*free_pd)(void*);
int dbid;
} RedisModulePostExecUnitJob;

/* The module keyspace notification subscribers list */
static list *moduleKeyspaceSubscribers;

/* The module post keyspace jobs list */
static list *modulePostExecUnitJobs;

/* Data structures related to the exported dictionary data structure. */
typedef struct RedisModuleDict {
rax *rax; /* The radix tree. */
Expand Down Expand Up @@ -729,8 +744,9 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
/* Modules take care of their own propagation, when we are
* outside of call() context (timers, events, etc.). */
if (--server.module_ctx_nesting == 0) {
if (!server.core_propagates)
propagatePendingCommands();
if (!server.core_propagates) {
postExecutionUnitOperations();
}
if (server.busy_module_yield_flags) {
blockingOperationEnds();
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
Expand Down Expand Up @@ -2207,7 +2223,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
* REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD:
* Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb)
* and that redis could be serving reads during replication instead of blocking with LOADING status.
*/
*
* REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS:
* Declare that the module wants to get nested key-space notifications.
* By default, Redis will not fire key-space notifications that happened inside
* a key-space notification callback. This flag allows to change this behavior
* and fire nested key-space notifications. Notice: if enabled, the module
* should protected itself from infinite recursion. */
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
Expand Down Expand Up @@ -7905,7 +7927,7 @@ void moduleGILBeforeUnlock() {
* (because it's u clear when thread safe contexts are
* released we have to propagate here). */
server.module_ctx_nesting--;
propagatePendingCommands();
postExecutionUnitOperations();

if (server.busy_module_yield_flags) {
blockingOperationEnds();
Expand Down Expand Up @@ -8000,6 +8022,12 @@ void moduleReleaseGIL(void) {
* so notification callbacks must to be fast, or they would slow Redis down.
* If you need to take long actions, use threads to offload them.
*
* Moreover, the fact that the notification is executed synchronously means
* that the notification code will be executed in the middle on Redis logic
* (commands logic, eviction, expire). Changing the key space while the logic
* runs is dangerous and discouraged. In order to react to key space events with
* write actions, please refer to `RM_AddPostExecutionUnitJob`.
*
* See https://redis.io/topics/notifications for more information.
*/
int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
Expand All @@ -8013,6 +8041,53 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}

void firePostExecutionUnitJobs() {
/* Avoid propagation of commands. */
server.in_nested_call++;
while (listLength(modulePostExecUnitJobs) > 0) {
listNode *ln = listFirst(modulePostExecUnitJobs);
RedisModulePostExecUnitJob *job = listNodeValue(ln);
listDelNode(modulePostExecUnitJobs, ln);

RedisModuleCtx ctx;
moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, job->dbid);

job->callback(&ctx, job->pd);
if (job->free_pd) job->free_pd(job->pd);

moduleFreeContext(&ctx);
zfree(job);
}
server.in_nested_call--;
}

/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
* operation (See `RM_SubscribeToKeyspaceEvents`). In order to still perform write actions in this scenario,
* Redis provides `RM_AddPostNotificationJob` API. The API allows to register a job callback which Redis will call
* when the following condition are promised to be fulfilled:
* 1. It is safe to perform any write operation.
* 2. The job will be called atomically along side the key space notification.
*
* Notice, one job might trigger key space notifications that will trigger more jobs.
* This raises a concerns of entering an infinite loops, we consider infinite loops
* as a logical bug that need to be fixed in the module, an attempt to protect against
* infinite loops by halting the execution could result in violation of the feature correctness
* and so Redis will make no attempt to protect the module from infinite loops.
*
* 'free_pd' can be NULL and in such case will not be used. */
int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *privdata, void (*free_privdata)(void*)) {
RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job));
job->module = ctx->module;
job->callback = callback;
job->pd = privdata;
job->free_pd = free_privdata;
job->dbid = ctx->client->db->id;

listAddNodeTail(modulePostExecUnitJobs, job);
return REDISMODULE_OK;
}

/* Get the configured bitmap of notify-keyspace-events (Could be used
* for additional filtering in RedisModuleNotificationFunc) */
int RM_GetNotifyKeyspaceEvents() {
Expand Down Expand Up @@ -8045,7 +8120,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
RedisModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
* and avoid subscribers triggering themselves */
if ((sub->event_mask & type) && sub->active == 0) {
if ((sub->event_mask & type) &&
(sub->active == 0 || (sub->module->options & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
Expand Down Expand Up @@ -11116,6 +11192,8 @@ void moduleInitModulesSystem(void) {
/* Set up the keyspace notification subscriber list and static client */
moduleKeyspaceSubscribers = listCreate();

modulePostExecUnitJobs = listCreate();

/* Set up filter list */
moduleCommandFilters = listCreate();

Expand Down Expand Up @@ -12234,6 +12312,23 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
* ## Miscellaneous APIs
* -------------------------------------------------------------------------- */

/**
* Returns the full module options flags mask, using the return value
* the module can check if a certain set of module options are supported
* by the redis server version in use.
* Example:
*
* int supportedFlags = RM_GetModuleOptionsAll();
* if (supportedFlags & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS) {
* // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is supported
* } else{
* // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported
* }
*/
int RM_GetModuleOptionsAll() {
return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1;
}

/**
* Returns the full ContextFlags mask, using the return value
* the module can check if a certain set of flags are supported
Expand Down Expand Up @@ -12825,6 +12920,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(NotifyKeyspaceEvent);
REGISTER_API(GetNotifyKeyspaceEvents);
REGISTER_API(SubscribeToKeyspaceEvents);
REGISTER_API(AddPostNotificationJob);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
REGISTER_API(GetClusterNodeInfo);
Expand Down Expand Up @@ -12932,6 +13028,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(AuthenticateClientWithACLUser);
REGISTER_API(AuthenticateClientWithUser);
REGISTER_API(GetContextFlagsAll);
REGISTER_API(GetModuleOptionsAll);
REGISTER_API(GetKeyspaceNotificationFlagsAll);
REGISTER_API(IsSubEventSupported);
REGISTER_API(GetServerVersion);
Expand Down
14 changes: 14 additions & 0 deletions src/redismodule.h
Expand Up @@ -261,6 +261,15 @@ typedef uint64_t RedisModuleTimerID;
/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<2)

/* Declare that the module want to get nested key space notifications.
* If enabled, the module is responsible to break endless loop. */
#define REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS (1<<3)

/* Next option flag, must be updated when adding new module flags above!
* This flag should not be used directly by the module.
* Use RedisModule_GetModuleOptionsAll instead. */
#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4)

/* Definitions for RedisModule_SetCommandInfo. */

typedef enum {
Expand Down Expand Up @@ -834,6 +843,7 @@ typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
Expand Down Expand Up @@ -1146,6 +1156,7 @@ REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cur
REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
Expand All @@ -1167,6 +1178,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
Expand Down Expand Up @@ -1490,6 +1502,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(Scan);
REDISMODULE_GET_API(ScanKey);
REDISMODULE_GET_API(GetContextFlagsAll);
REDISMODULE_GET_API(GetModuleOptionsAll);
REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll);
REDISMODULE_GET_API(IsSubEventSupported);
REDISMODULE_GET_API(GetServerVersion);
Expand All @@ -1512,6 +1525,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);
Expand Down
32 changes: 29 additions & 3 deletions src/server.c
Expand Up @@ -3226,7 +3226,7 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
void propagatePendingCommands() {
static void propagatePendingCommands() {
if (server.also_propagate.numops == 0)
return;

Expand Down Expand Up @@ -3262,6 +3262,31 @@ void propagatePendingCommands() {
redisOpArrayFree(&server.also_propagate);
}

/* Performs operations that should be performed after an execution unit ends.
* Execution unit is a code that should be done atomically.
* Execution units can be nested and are not necessarily starts with Redis command.
*
* For example the following is a logical unit:
* active expire ->
* trigger del notification of some module ->
* accessing a key ->
* trigger key miss notification of some other module
*
* What we want to achieve is that the entire execution unit will be done atomically,
* currently with respect to replication and post jobs, but in the future there might
* be other considerations. So we basically want the `postUnitOperations` to trigger
* after the entire chain finished.
*
* Current, in order to avoid massive code changes that could be risky to cherry-pick,
* we count on the mechanism we already have such as `server.core_propagation`,
* `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably
* do not need all of those variable and we will make an attempt to re-arrange it on unstable
* branch. */
void postExecutionUnitOperations() {
firePostExecutionUnitJobs();
propagatePendingCommands();
}

/* Increment the command failure counters (either rejected_calls or failed_calls).
* The decision which counter to increment is done using the flags argument, options are:
* * ERROR_COMMAND_REJECTED - update rejected_calls
Expand Down Expand Up @@ -3576,8 +3601,9 @@ void afterCommand(client *c) {
/* If we are at the top-most call() we can propagate what we accumulated.
* Should be done before trackingHandlePendingKeyInvalidations so that we
* reply to client before invalidating cache (makes more sense) */
if (server.core_propagates)
propagatePendingCommands();
if (server.core_propagates) {
postExecutionUnitOperations();
}
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
trackingHandlePendingKeyInvalidations();
Expand Down
3 changes: 2 additions & 1 deletion src/server.h
Expand Up @@ -2424,6 +2424,7 @@ void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs();
void moduleCallCommandFilters(client *c);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
Expand Down Expand Up @@ -2946,7 +2947,7 @@ void startCommandExecution();
int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
void postExecutionUnitOperations();
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);
Expand Down
3 changes: 2 additions & 1 deletion tests/modules/Makefile
Expand Up @@ -59,7 +59,8 @@ TEST_MODULES = \
moduleconfigs.so \
moduleconfigstwo.so \
publish.so \
usercall.so
usercall.so \
postnotifications.so

.PHONY: all

Expand Down

0 comments on commit 05acfcf

Please sign in to comment.