diff --git a/runtest-moduleapi b/runtest-moduleapi index e58c5809998d..4b4f72d10cfc 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -51,4 +51,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/timer \ --single unit/moduleapi/publish \ --single unit/moduleapi/usercall \ +--single unit/moduleapi/postnotifications \ "${@}" diff --git a/src/blocked.c b/src/blocked.c index 497ffe4ce446..069d3ba95e43 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -635,7 +635,7 @@ void handleClientsBlockedOnKeys(void) { if (!o) { /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to * take care of the propagation here, because afterCommand wasn't called */ - propagatePendingCommands(); + postExecutionUnitOperations(); } else { if (o->type == OBJ_LIST) serveClientsBlockedOnListKey(o,rl); diff --git a/src/cluster.c b/src/cluster.c index 436ed014c8c9..d9acb1bdce05 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -7361,8 +7361,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) { propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); signalModifiedKey(NULL, &server.db[0], key); moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); + postExecutionUnitOperations(); decrRefCount(key); - propagatePendingCommands(); + postExecutionUnitOperations(); j++; server.dirty++; } diff --git a/src/evict.c b/src/evict.c index 637a8b6c7824..f97285f2ad51 100644 --- a/src/evict.c +++ b/src/evict.c @@ -691,7 +691,7 @@ int performEvictions(void) { notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); - propagatePendingCommands(); + postExecutionUnitOperations(); decrRefCount(keyobj); keys_freed++; diff --git a/src/expire.c b/src/expire.c index 8220f80cfea5..a106b0839600 100644 --- a/src/expire.c +++ b/src/expire.c @@ -267,7 +267,7 @@ void activeExpireCycle(int type) { if (activeExpireCycleTryExpire(db,e,now)) { expired++; /* Propagate the DEL command */ - propagatePendingCommands(); + postExecutionUnitOperations(); } if (ttl > 0) { /* We want the average TTL of keys yet diff --git a/src/module.c b/src/module.c index e938b98d7a79..e336ebd1980c 100644 --- a/src/module.c +++ b/src/module.c @@ -294,6 +294,9 @@ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; /* Function pointer type for keyspace event notification subscriptions from modules. */ typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); +/* Function pointer type for post jobs */ +typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); + /* Keyspace notification subscriber information. * See RM_SubscribeToKeyspaceEvents() for more information. */ typedef struct RedisModuleKeyspaceSubscriber { @@ -308,9 +311,21 @@ typedef struct RedisModuleKeyspaceSubscriber { int active; } RedisModuleKeyspaceSubscriber; +typedef struct RedisModulePostExecUnitJob { + /* The module subscribed to the event */ + RedisModule *module; + RedisModulePostNotificationJobFunc callback; + void *pd; + void (*free_pd)(void*); + int dbid; +} RedisModulePostExecUnitJob; + /* The module keyspace notification subscribers list */ static list *moduleKeyspaceSubscribers; +/* The module post keyspace jobs list */ +static list *modulePostExecUnitJobs; + /* Data structures related to the exported dictionary data structure. */ typedef struct RedisModuleDict { rax *rax; /* The radix tree. */ @@ -729,8 +744,9 @@ void moduleFreeContext(RedisModuleCtx *ctx) { /* Modules take care of their own propagation, when we are * outside of call() context (timers, events, etc.). */ if (--server.module_ctx_nesting == 0) { - if (!server.core_propagates) - propagatePendingCommands(); + if (!server.core_propagates) { + postExecutionUnitOperations(); + } if (server.busy_module_yield_flags) { blockingOperationEnds(); server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; @@ -2207,7 +2223,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { * REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD: * Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb) * and that redis could be serving reads during replication instead of blocking with LOADING status. - */ + * + * REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS: + * Declare that the module wants to get nested key-space notifications. + * By default, Redis will not fire key-space notifications that happened inside + * a key-space notification callback. This flag allows to change this behavior + * and fire nested key-space notifications. Notice: if enabled, the module + * should protected itself from infinite recursion. */ void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) { ctx->module->options = options; } @@ -7905,7 +7927,7 @@ void moduleGILBeforeUnlock() { * (because it's u clear when thread safe contexts are * released we have to propagate here). */ server.module_ctx_nesting--; - propagatePendingCommands(); + postExecutionUnitOperations(); if (server.busy_module_yield_flags) { blockingOperationEnds(); @@ -8000,6 +8022,12 @@ void moduleReleaseGIL(void) { * so notification callbacks must to be fast, or they would slow Redis down. * If you need to take long actions, use threads to offload them. * + * Moreover, the fact that the notification is executed synchronously means + * that the notification code will be executed in the middle on Redis logic + * (commands logic, eviction, expire). Changing the key space while the logic + * runs is dangerous and discouraged. In order to react to key space events with + * write actions, please refer to `RM_AddPostExecutionUnitJob`. + * * See https://redis.io/topics/notifications for more information. */ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) { @@ -8013,6 +8041,53 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti return REDISMODULE_OK; } +void firePostExecutionUnitJobs() { + /* Avoid propagation of commands. */ + server.in_nested_call++; + while (listLength(modulePostExecUnitJobs) > 0) { + listNode *ln = listFirst(modulePostExecUnitJobs); + RedisModulePostExecUnitJob *job = listNodeValue(ln); + listDelNode(modulePostExecUnitJobs, ln); + + RedisModuleCtx ctx; + moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT); + selectDb(ctx.client, job->dbid); + + job->callback(&ctx, job->pd); + if (job->free_pd) job->free_pd(job->pd); + + moduleFreeContext(&ctx); + zfree(job); + } + server.in_nested_call--; +} + +/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write + * operation (See `RM_SubscribeToKeyspaceEvents`). In order to still perform write actions in this scenario, + * Redis provides `RM_AddPostNotificationJob` API. The API allows to register a job callback which Redis will call + * when the following condition are promised to be fulfilled: + * 1. It is safe to perform any write operation. + * 2. The job will be called atomically along side the key space notification. + * + * Notice, one job might trigger key space notifications that will trigger more jobs. + * This raises a concerns of entering an infinite loops, we consider infinite loops + * as a logical bug that need to be fixed in the module, an attempt to protect against + * infinite loops by halting the execution could result in violation of the feature correctness + * and so Redis will make no attempt to protect the module from infinite loops. + * + * 'free_pd' can be NULL and in such case will not be used. */ +int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *privdata, void (*free_privdata)(void*)) { + RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job)); + job->module = ctx->module; + job->callback = callback; + job->pd = privdata; + job->free_pd = free_privdata; + job->dbid = ctx->client->db->id; + + listAddNodeTail(modulePostExecUnitJobs, job); + return REDISMODULE_OK; +} + /* Get the configured bitmap of notify-keyspace-events (Could be used * for additional filtering in RedisModuleNotificationFunc) */ int RM_GetNotifyKeyspaceEvents() { @@ -8045,7 +8120,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) RedisModuleKeyspaceSubscriber *sub = ln->value; /* Only notify subscribers on events matching the registration, * and avoid subscribers triggering themselves */ - if ((sub->event_mask & type) && sub->active == 0) { + if ((sub->event_mask & type) && + (sub->active == 0 || (sub->module->options & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) { RedisModuleCtx ctx; moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, dbid); @@ -11116,6 +11192,8 @@ void moduleInitModulesSystem(void) { /* Set up the keyspace notification subscriber list and static client */ moduleKeyspaceSubscribers = listCreate(); + modulePostExecUnitJobs = listCreate(); + /* Set up filter list */ moduleCommandFilters = listCreate(); @@ -12234,6 +12312,23 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) { * ## Miscellaneous APIs * -------------------------------------------------------------------------- */ +/** + * Returns the full module options flags mask, using the return value + * the module can check if a certain set of module options are supported + * by the redis server version in use. + * Example: + * + * int supportedFlags = RM_GetModuleOptionsAll(); + * if (supportedFlags & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS) { + * // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is supported + * } else{ + * // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported + * } + */ +int RM_GetModuleOptionsAll() { + return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1; +} + /** * Returns the full ContextFlags mask, using the return value * the module can check if a certain set of flags are supported @@ -12825,6 +12920,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(NotifyKeyspaceEvent); REGISTER_API(GetNotifyKeyspaceEvents); REGISTER_API(SubscribeToKeyspaceEvents); + REGISTER_API(AddPostNotificationJob); REGISTER_API(RegisterClusterMessageReceiver); REGISTER_API(SendClusterMessage); REGISTER_API(GetClusterNodeInfo); @@ -12932,6 +13028,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(AuthenticateClientWithACLUser); REGISTER_API(AuthenticateClientWithUser); REGISTER_API(GetContextFlagsAll); + REGISTER_API(GetModuleOptionsAll); REGISTER_API(GetKeyspaceNotificationFlagsAll); REGISTER_API(IsSubEventSupported); REGISTER_API(GetServerVersion); diff --git a/src/redismodule.h b/src/redismodule.h index 0515af61efde..7f04f7ba44d7 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -261,6 +261,15 @@ typedef uint64_t RedisModuleTimerID; /* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */ #define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<2) +/* Declare that the module want to get nested key space notifications. + * If enabled, the module is responsible to break endless loop. */ +#define REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS (1<<3) + +/* Next option flag, must be updated when adding new module flags above! + * This flag should not be used directly by the module. + * Use RedisModule_GetModuleOptionsAll instead. */ +#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4) + /* Definitions for RedisModule_SetCommandInfo. */ typedef enum { @@ -834,6 +843,7 @@ typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx; typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc); typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc); typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); +typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd); typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver); typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value); typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when); @@ -1146,6 +1156,7 @@ REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cur REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR; @@ -1167,6 +1178,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR; @@ -1490,6 +1502,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(Scan); REDISMODULE_GET_API(ScanKey); REDISMODULE_GET_API(GetContextFlagsAll); + REDISMODULE_GET_API(GetModuleOptionsAll); REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll); REDISMODULE_GET_API(IsSubEventSupported); REDISMODULE_GET_API(GetServerVersion); @@ -1512,6 +1525,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(BlockedClientMeasureTimeEnd); REDISMODULE_GET_API(SetDisconnectCallback); REDISMODULE_GET_API(SubscribeToKeyspaceEvents); + REDISMODULE_GET_API(AddPostNotificationJob); REDISMODULE_GET_API(NotifyKeyspaceEvent); REDISMODULE_GET_API(GetNotifyKeyspaceEvents); REDISMODULE_GET_API(BlockedClientDisconnected); diff --git a/src/server.c b/src/server.c index 149b675eb701..bc85a3778eb6 100644 --- a/src/server.c +++ b/src/server.c @@ -3226,7 +3226,7 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int /* Handle the alsoPropagate() API to handle commands that want to propagate * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ -void propagatePendingCommands() { +static void propagatePendingCommands() { if (server.also_propagate.numops == 0) return; @@ -3262,6 +3262,31 @@ void propagatePendingCommands() { redisOpArrayFree(&server.also_propagate); } +/* Performs operations that should be performed after an execution unit ends. + * Execution unit is a code that should be done atomically. + * Execution units can be nested and are not necessarily starts with Redis command. + * + * For example the following is a logical unit: + * active expire -> + * trigger del notification of some module -> + * accessing a key -> + * trigger key miss notification of some other module + * + * What we want to achieve is that the entire execution unit will be done atomically, + * currently with respect to replication and post jobs, but in the future there might + * be other considerations. So we basically want the `postUnitOperations` to trigger + * after the entire chain finished. + * + * Current, in order to avoid massive code changes that could be risky to cherry-pick, + * we count on the mechanism we already have such as `server.core_propagation`, + * `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably + * do not need all of those variable and we will make an attempt to re-arrange it on unstable + * branch. */ +void postExecutionUnitOperations() { + firePostExecutionUnitJobs(); + propagatePendingCommands(); +} + /* Increment the command failure counters (either rejected_calls or failed_calls). * The decision which counter to increment is done using the flags argument, options are: * * ERROR_COMMAND_REJECTED - update rejected_calls @@ -3576,8 +3601,9 @@ void afterCommand(client *c) { /* If we are at the top-most call() we can propagate what we accumulated. * Should be done before trackingHandlePendingKeyInvalidations so that we * reply to client before invalidating cache (makes more sense) */ - if (server.core_propagates) - propagatePendingCommands(); + if (server.core_propagates) { + postExecutionUnitOperations(); + } /* Flush pending invalidation messages only when we are not in nested call. * So the messages are not interleaved with transaction response. */ trackingHandlePendingKeyInvalidations(); diff --git a/src/server.h b/src/server.h index 0faf80c0ff1a..3c2a6a045bd3 100644 --- a/src/server.h +++ b/src/server.h @@ -2424,6 +2424,7 @@ void moduleAcquireGIL(void); int moduleTryAcquireGIL(void); void moduleReleaseGIL(void); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); +void firePostExecutionUnitJobs(); void moduleCallCommandFilters(client *c); void ModuleForkDoneHandler(int exitcode, int bysignal); int TerminateModuleForkChild(int child_pid, int wait); @@ -2946,7 +2947,7 @@ void startCommandExecution(); int incrCommandStatsOnError(struct redisCommand *cmd, int flags); void call(client *c, int flags); void alsoPropagate(int dbid, robj **argv, int argc, int target); -void propagatePendingCommands(); +void postExecutionUnitOperations(); void redisOpArrayFree(redisOpArray *oa); void forceCommandPropagation(client *c, int flags); void preventCommandPropagation(client *c); diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 56069406b8ac..5c73ed0e321c 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -59,7 +59,8 @@ TEST_MODULES = \ moduleconfigs.so \ moduleconfigstwo.so \ publish.so \ - usercall.so + usercall.so \ + postnotifications.so .PHONY: all diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index bb998788321c..46eb688a5759 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -131,6 +131,49 @@ static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, con return REDISMODULE_OK; } +static int KeySpace_NotificationModuleString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + RedisModuleKey *redis_key = RedisModule_OpenKey(ctx, key, REDISMODULE_READ); + + size_t len = 0; + /* RedisModule_StringDMA could change the data format and cause the old robj to be freed. + * This code verifies that such format change will not cause any crashes.*/ + char *data = RedisModule_StringDMA(redis_key, &len, REDISMODULE_READ); + int res = strncmp(data, "dummy", 5); + REDISMODULE_NOT_USED(res); + + RedisModule_CloseKey(redis_key); + + return REDISMODULE_OK; +} + +static void KeySpace_PostNotificationStringFreePD(void *pd) { + RedisModule_FreeString(NULL, pd); +} + +static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) { + REDISMODULE_NOT_USED(ctx); + RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd); + RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationModuleStringPostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + + if (strncmp(key_str, "string1_", 8) != 0) { + return REDISMODULE_OK; + } + + RedisModuleString *new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str); + RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + return REDISMODULE_OK; +} + static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(type); @@ -312,6 +355,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleString) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleStringPostNotificationJob) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){ return REDISMODULE_ERR; } diff --git a/tests/modules/postnotifications.c b/tests/modules/postnotifications.c new file mode 100644 index 000000000000..ca3a15b43775 --- /dev/null +++ b/tests/modules/postnotifications.c @@ -0,0 +1,236 @@ +/* This module is used to test the server post keyspace jobs API. + * + * ----------------------------------------------------------------------------- + * + * Copyright (c) 2020, Meir Shpilraien + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3 + * key space event: + * * STRINGS - the module register to all strings notifications and set post notification job + * that increase a counter indicating how many times the string key was changed. + * In addition, it increase another counter that counts the total changes that + * was made on all strings keys. + * * EXPIRED - the module register to expired event and set post notification job that that + * counts the total number of expired events. + * * EVICTED - the module register to evicted event and set post notification job that that + * counts the total number of evicted events. + * + * In addition, the module register a new command, 'postnotification.async_set', that performs a set + * command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on + * notifications that was triggered on a background thread. */ + +#define _BSD_SOURCE +#define _DEFAULT_SOURCE /* For usleep */ + +#include "redismodule.h" +#include +#include +#include +#include + +static void KeySpace_PostNotificationStringFreePD(void *pd) { + RedisModule_FreeString(NULL, pd); +} + +static void KeySpace_PostNotificationReadKey(RedisModuleCtx *ctx, void *pd) { + RedisModuleCallReply* rep = RedisModule_Call(ctx, "get", "!s", pd); + RedisModule_FreeCallReply(rep); +} + +static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) { + REDISMODULE_NOT_USED(ctx); + RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd); + RedisModule_FreeCallReply(rep); +} + +static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + REDISMODULE_NOT_USED(key); + + RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7); + RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + return REDISMODULE_OK; +} + +static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + REDISMODULE_NOT_USED(key); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + + if (strncmp(key_str, "evicted", 7) == 0) { + return REDISMODULE_OK; /* do not count the evicted key */ + } + + RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7); + RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + return REDISMODULE_OK; +} + +static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + + if (strncmp(key_str, "string_", 7) != 0) { + return REDISMODULE_OK; + } + + if (strcmp(key_str, "string_total") == 0) { + return REDISMODULE_OK; + } + + RedisModuleString *new_key; + if (strncmp(key_str, "string_changed{", 15) == 0) { + new_key = RedisModule_CreateString(NULL, "string_total", 12); + } else { + new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str); + } + + RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); + return REDISMODULE_OK; +} + +static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + + if (strncmp(key_str, "read_", 5) != 0) { + return REDISMODULE_OK; + } + + RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);; + RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD); + return REDISMODULE_OK; +} + +static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(type); + REDISMODULE_NOT_USED(event); + + const char *key_str = RedisModule_StringPtrLen(key, NULL); + + if (strncmp(key_str, "write_sync_", 11) != 0) { + return REDISMODULE_OK; + } + + /* This test was only meant to check REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS. + * In general it is wrong and discourage to perform any writes inside a notification callback. */ + RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 11, strlen(key_str) - 11);; + RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", new_key, "1"); + RedisModule_FreeCallReply(rep); + RedisModule_FreeString(NULL, new_key); + return REDISMODULE_OK; +} + +static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) { + RedisModuleBlockedClient *bc = arg; + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc); + RedisModule_ThreadSafeContextLock(ctx); + RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1"); + RedisModule_ThreadSafeContextUnlock(ctx); + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + + RedisModule_UnblockClient(bc, NULL); + RedisModule_FreeThreadSafeContext(ctx); + return NULL; +} + +static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + if (argc != 1) + return RedisModule_WrongArity(ctx); + + pthread_t tid; + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0); + + if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) { + RedisModule_AbortBlock(bc); + return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + } + return REDISMODULE_OK; +} + +/* This function must be present on each Redis module. It is used in order to + * register the commands into the Redis server. */ +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (!(RedisModule_GetModuleOptionsAll() & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS)) { + return REDISMODULE_ERR; + } + + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS); + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_LazyExpireInsidePostNotificationJob) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NestedNotification) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EVICTED, KeySpace_NotificationEvicted) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet, + "write", 0, 0, 0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + return REDISMODULE_OK; +} + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index ceec6fdf3f3f..19c712052e39 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -97,5 +97,9 @@ tags "modules" { test "Unload the module - testkeyspace" { assert_equal {OK} [r module unload testkeyspace] } + + test "Verify RM_StringDMA with expiration are not causing invalid memory access" { + assert_equal {OK} [r set x 1 EX 1] + } } } diff --git a/tests/unit/moduleapi/postnotifications.tcl b/tests/unit/moduleapi/postnotifications.tcl new file mode 100644 index 000000000000..11b003a15d06 --- /dev/null +++ b/tests/unit/moduleapi/postnotifications.tcl @@ -0,0 +1,205 @@ +set testmodule [file normalize tests/modules/postnotifications.so] + +tags "modules" { + start_server [list overrides [list loadmodule "$testmodule"]] { + + test {Test write on post notification callback} { + set repl [attach_to_replication_stream] + + r set string_x 1 + assert_equal {1} [r get string_changed{string_x}] + assert_equal {1} [r get string_total] + + r set string_x 2 + assert_equal {2} [r get string_changed{string_x}] + assert_equal {2} [r get string_total] + + assert_replication_stream $repl { + {multi} + {select *} + {set string_x 1} + {incr string_changed{string_x}} + {incr string_total} + {exec} + {multi} + {set string_x 2} + {incr string_changed{string_x}} + {incr string_total} + {exec} + } + close_replication_stream $repl + } + + test {Test write on post notification callback from module thread} { + r flushall + set repl [attach_to_replication_stream] + + assert_equal {OK} [r postnotification.async_set] + assert_equal {1} [r get string_changed{string_x}] + assert_equal {1} [r get string_total] + + assert_replication_stream $repl { + {multi} + {select *} + {set string_x 1} + {incr string_changed{string_x}} + {incr string_total} + {exec} + } + close_replication_stream $repl + } + + test {Test active expire} { + r flushall + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 10 + + wait_for_condition 100 50 { + [r keys expired] == {expired} + } else { + puts [r keys *] + fail "Failed waiting for x to expired" + } + + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {del x} + {incr expired} + {exec} + } + close_replication_stream $repl + } + + test {Test lazy expire} { + r flushall + r DEBUG SET-ACTIVE-EXPIRE 0 + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 1 + after 10 + assert_equal {} [r get x] + + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {del x} + {incr expired} + {exec} + } + close_replication_stream $repl + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + + test {Test lazy expire inside post job notification} { + r flushall + r DEBUG SET-ACTIVE-EXPIRE 0 + set repl [attach_to_replication_stream] + + r set x 1 + r pexpire x 1 + after 10 + assert_equal {OK} [r set read_x 1] + + assert_replication_stream $repl { + {select *} + {set x 1} + {pexpireat x *} + {multi} + {set read_x 1} + {del x} + {incr expired} + {exec} + } + close_replication_stream $repl + r DEBUG SET-ACTIVE-EXPIRE 1 + } {OK} {needs:debug} + + test {Test nested keyspace notification} { + r flushall + set repl [attach_to_replication_stream] + + assert_equal {OK} [r set write_sync_write_sync_x 1] + + assert_replication_stream $repl { + {multi} + {select *} + {set x 1} + {set write_sync_x 1} + {set write_sync_write_sync_x 1} + {exec} + } + close_replication_stream $repl + } + + test {Test eviction} { + r flushall + set repl [attach_to_replication_stream] + r set x 1 + r config set maxmemory-policy allkeys-random + r config set maxmemory 1 + + assert_error {OOM *} {r set y 1} + + assert_replication_stream $repl { + {select *} + {set x 1} + {multi} + {del x} + {incr evicted} + {exec} + } + close_replication_stream $repl + } {} {needs:config-maxmemory} + } +} + +set testmodule2 [file normalize tests/modules/keyspace_events.so] + +tags "modules" { + start_server [list overrides [list loadmodule "$testmodule"]] { + r module load $testmodule2 + test {Test write on post notification callback} { + set repl [attach_to_replication_stream] + + r set string_x 1 + assert_equal {1} [r get string_changed{string_x}] + assert_equal {1} [r get string_total] + + r set string_x 2 + assert_equal {2} [r get string_changed{string_x}] + assert_equal {2} [r get string_total] + + r set string1_x 1 + assert_equal {1} [r get string_changed{string1_x}] + assert_equal {3} [r get string_total] + + assert_replication_stream $repl { + {multi} + {select *} + {set string_x 1} + {incr string_changed{string_x}} + {incr string_total} + {exec} + {multi} + {set string_x 2} + {incr string_changed{string_x}} + {incr string_total} + {exec} + {multi} + {set string1_x 1} + {incr string_changed{string1_x}} + {incr string_total} + {exec} + } + close_replication_stream $repl + } + } +} \ No newline at end of file