diff --git a/src/evict.c b/src/evict.c index 909714b4304f..96a0fef5e40c 100644 --- a/src/evict.c +++ b/src/evict.c @@ -667,6 +667,7 @@ int performEvictions(void) { * * AOF and Output buffer memory will be freed eventually so * we only care about memory used by the key space. */ + enterExecutionUnit(1, 0); delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED); @@ -679,6 +680,7 @@ int performEvictions(void) { notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id); propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); + exitExecutionUnit(); postExecutionUnitOperations(); decrRefCount(keyobj); keys_freed++; diff --git a/src/expire.c b/src/expire.c index 425491af6bcf..33c21c3ae533 100644 --- a/src/expire.c +++ b/src/expire.c @@ -54,10 +54,12 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { long long t = dictGetSignedIntegerVal(de); if (now > t) { + enterExecutionUnit(1, 0); sds key = dictGetKey(de); robj *keyobj = createStringObject(key,sdslen(key)); deleteExpiredKeyAndPropagate(db,keyobj); decrRefCount(keyobj); + exitExecutionUnit(); return 1; } else { return 0; diff --git a/tests/modules/postnotifications.c b/tests/modules/postnotifications.c index ca3a15b43775..b4a97cbd5cf2 100644 --- a/tests/modules/postnotifications.c +++ b/tests/modules/postnotifications.c @@ -90,6 +90,10 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha return REDISMODULE_OK; /* do not count the evicted key */ } + if (strncmp(key_str, "before_evicted", 14) == 0) { + return REDISMODULE_OK; /* do not count the before_evicted key */ + } + RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7); RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); return REDISMODULE_OK; @@ -186,6 +190,55 @@ static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleSt return REDISMODULE_OK; } +typedef struct KeySpace_EventPostNotificationCtx { + RedisModuleString *triggered_on; + RedisModuleString *new_key; +} KeySpace_EventPostNotificationCtx; + +static void KeySpace_ServerEventPostNotificationFree(void *pd) { + KeySpace_EventPostNotificationCtx *pn_ctx = pd; + RedisModule_FreeString(NULL, pn_ctx->new_key); + RedisModule_FreeString(NULL, pn_ctx->triggered_on); + RedisModule_Free(pn_ctx); +} + +static void KeySpace_ServerEventPostNotification(RedisModuleCtx *ctx, void *pd) { + REDISMODULE_NOT_USED(ctx); + KeySpace_EventPostNotificationCtx *pn_ctx = pd; + RedisModuleCallReply* rep = RedisModule_Call(ctx, "lpush", "!ss", pn_ctx->new_key, pn_ctx->triggered_on); + RedisModule_FreeCallReply(rep); +} + +static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data) { + REDISMODULE_NOT_USED(eid); + REDISMODULE_NOT_USED(data); + if (subevent > 3) { + RedisModule_Log(ctx, "warning", "Got an unexpected subevent '%ld'", subevent); + return; + } + static const char* events[] = { + "before_deleted", + "before_expired", + "before_evicted", + "before_overwritten", + }; + + const RedisModuleString *key_name = RedisModule_GetKeyNameFromModuleKey(((RedisModuleKeyInfo*)data)->key); + const char *key_str = RedisModule_StringPtrLen(key_name, NULL); + + for (int i = 0 ; i < 4 ; ++i) { + const char *event = events[i]; + if (strncmp(key_str, event , strlen(event)) == 0) { + return; /* don't log any event on our tracking keys */ + } + } + + KeySpace_EventPostNotificationCtx *pn_ctx = RedisModule_Alloc(sizeof(*pn_ctx)); + pn_ctx->triggered_on = RedisModule_HoldString(NULL, (RedisModuleString*)key_name); + pn_ctx->new_key = RedisModule_CreateString(NULL, events[subevent], strlen(events[subevent])); + RedisModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree); +} + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -200,6 +253,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + int with_key_events = 0; + if (argc >= 1) { + const char *arg = RedisModule_StringPtrLen(argv[0], 0); + if (strcmp(arg, "with_key_events") == 0) { + with_key_events = 1; + } + } + RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS); if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){ @@ -222,6 +283,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (with_key_events) { + if(RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, KeySpace_ServerEventCallback) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + } + if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet, "write", 0, 0, 0) == REDISMODULE_ERR){ return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/async_rm_call.tcl b/tests/unit/moduleapi/async_rm_call.tcl index 4799ea124e99..1bf12de2377c 100644 --- a/tests/unit/moduleapi/async_rm_call.tcl +++ b/tests/unit/moduleapi/async_rm_call.tcl @@ -314,6 +314,14 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {OK} + # Explanation of the first multi exec block: + # {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys' + # {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed + # {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed + # {incr string_total} - post notification job that was registered when string_changed{string_foo} changed + # {incr string_total} - post notification job that was registered when string_changed{string_bar} changed assert_replication_stream $repl { {select *} {lpush l a} @@ -355,6 +363,25 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {OK} + # Explanation of the first multi exec block: + # {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys' + # {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {incr string_changed{string_foo}} - post notification job that was registered when 'string_foo' changed + # {incr string_changed{string_bar}} - post notification job that was registered when 'string_bar' changed + # {incr string_total} - post notification job that was registered when string_changed{string_foo} changed + # {incr string_total} - post notification job that was registered when string_changed{string_bar} changed + # + # Explanation of the second multi exec block: + # {lpop l} - pop the value by our blocking command 'blpop_and_set_multiple_keys' + # {del string_foo} - lazy expiration of string_foo when 'blpop_and_set_multiple_keys' tries to write to it. + # {set string_foo 1} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {set string_bar 2} - the action of our blocking command 'blpop_and_set_multiple_keys' + # {incr expired} - the post notification job, registered after string_foo got expired + # {incr string_changed{string_foo}} - post notification job triggered when we set string_foo + # {incr string_changed{string_bar}} - post notification job triggered when we set string_bar + # {incr string_total} - post notification job triggered when we incr 'string_changed{string_foo}' + # {incr string_total} - post notification job triggered when we incr 'string_changed{string_bar}' assert_replication_stream $repl { {select *} {lpush l a} diff --git a/tests/unit/moduleapi/postnotifications.tcl b/tests/unit/moduleapi/postnotifications.tcl index 11b003a15d06..7e48c7bc339a 100644 --- a/tests/unit/moduleapi/postnotifications.tcl +++ b/tests/unit/moduleapi/postnotifications.tcl @@ -1,7 +1,8 @@ set testmodule [file normalize tests/modules/postnotifications.so] tags "modules" { - start_server [list overrides [list loadmodule "$testmodule"]] { + start_server {} { + r module load $testmodule with_key_events test {Test write on post notification callback} { set repl [attach_to_replication_stream] @@ -9,11 +10,12 @@ tags "modules" { r set string_x 1 assert_equal {1} [r get string_changed{string_x}] assert_equal {1} [r get string_total] - + r set string_x 2 assert_equal {2} [r get string_changed{string_x}] assert_equal {2} [r get string_total] + # the {lpush before_overwritten string_x} is a post notification job registered when 'string_x' was overwritten assert_replication_stream $repl { {multi} {select *} @@ -23,6 +25,7 @@ tags "modules" { {exec} {multi} {set string_x 2} + {lpush before_overwritten string_x} {incr string_changed{string_x}} {incr string_total} {exec} @@ -37,7 +40,7 @@ tags "modules" { assert_equal {OK} [r postnotification.async_set] assert_equal {1} [r get string_changed{string_x}] assert_equal {1} [r get string_total] - + assert_replication_stream $repl { {multi} {select *} @@ -63,12 +66,14 @@ tags "modules" { fail "Failed waiting for x to expired" } + # the {lpush before_expired x} is a post notification job registered before 'x' got expired assert_replication_stream $repl { {select *} {set x 1} {pexpireat x *} {multi} {del x} + {lpush before_expired x} {incr expired} {exec} } @@ -85,12 +90,14 @@ tags "modules" { after 10 assert_equal {} [r get x] + # the {lpush before_expired x} is a post notification job registered before 'x' got expired assert_replication_stream $repl { {select *} {set x 1} {pexpireat x *} {multi} {del x} + {lpush before_expired x} {incr expired} {exec} } @@ -108,6 +115,7 @@ tags "modules" { after 10 assert_equal {OK} [r set read_x 1] + # the {lpush before_expired x} is a post notification job registered before 'x' got expired assert_replication_stream $repl { {select *} {set x 1} @@ -115,6 +123,7 @@ tags "modules" { {multi} {set read_x 1} {del x} + {lpush before_expired x} {incr expired} {exec} } @@ -143,16 +152,18 @@ tags "modules" { r flushall set repl [attach_to_replication_stream] r set x 1 - r config set maxmemory-policy allkeys-random + r config set maxmemory-policy allkeys-random r config set maxmemory 1 assert_error {OOM *} {r set y 1} + # the {lpush before_evicted x} is a post notification job registered before 'x' got evicted assert_replication_stream $repl { {select *} {set x 1} {multi} {del x} + {lpush before_evicted x} {incr evicted} {exec} } @@ -164,7 +175,8 @@ tags "modules" { set testmodule2 [file normalize tests/modules/keyspace_events.so] tags "modules" { - start_server [list overrides [list loadmodule "$testmodule"]] { + start_server {} { + r module load $testmodule with_key_events r module load $testmodule2 test {Test write on post notification callback} { set repl [attach_to_replication_stream] @@ -172,7 +184,7 @@ tags "modules" { r set string_x 1 assert_equal {1} [r get string_changed{string_x}] assert_equal {1} [r get string_total] - + r set string_x 2 assert_equal {2} [r get string_changed{string_x}] assert_equal {2} [r get string_total] @@ -181,6 +193,7 @@ tags "modules" { assert_equal {1} [r get string_changed{string1_x}] assert_equal {3} [r get string_total] + # the {lpush before_overwritten string_x} is a post notification job registered before 'string_x' got overwritten assert_replication_stream $repl { {multi} {select *} @@ -190,6 +203,7 @@ tags "modules" { {exec} {multi} {set string_x 2} + {lpush before_overwritten string_x} {incr string_changed{string_x}} {incr string_total} {exec} @@ -202,4 +216,4 @@ tags "modules" { close_replication_stream $repl } } -} \ No newline at end of file +}