diff --git a/src/module.c b/src/module.c index b966998c6711..a60a345ae382 100644 --- a/src/module.c +++ b/src/module.c @@ -2407,7 +2407,33 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { server.busy_module_yield_flags |= BUSY_MODULE_YIELD_CLIENTS; /* Let redis process events */ - processEventsWhileBlocked(); + if (!pthread_equal(server.main_thread_id, pthread_self())) { + /* If we are not in the main thread, we defer event loop processing to the main thread + * after the main thread enters acquiring GIL state in order to protect the event + * loop (ae.c) and avoid potential race conditions. */ + + int acquiring; + atomicGet(server.module_gil_acquring, acquiring); + if (!acquiring) { + /* If the main thread has not yet entered the acquiring GIL state, + * we attempt to wake it up and exit without waiting for it to + * acquire the GIL. This avoids blocking the caller, allowing them to + * continue with unfinished tasks before the next yield. + * We assume the caller keeps the GIL locked. */ + if (write(server.module_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + } else { + /* Release the GIL, yielding CPU to give the main thread an opportunity to start + * event processing, and then acquire the GIL again until the main thread releases it. */ + moduleReleaseGIL(); + sched_yield(); + moduleAcquireGIL(); + } + } else { + /* If we are in the main thread, we can safely process events. */ + processEventsWhileBlocked(); + } server.busy_module_yield_reply = prev_busy_module_yield_reply; /* Possibly restore the previous flags in case of two nested contexts @@ -11888,6 +11914,7 @@ void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); server.loadmodule_queue = listCreate(); server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType); + server.module_gil_acquring = 0; modules = dictCreate(&modulesDictType); moduleAuthCallbacks = listCreate(); diff --git a/src/server.c b/src/server.c index 5a17446dc2c9..4fd4a993c3ef 100644 --- a/src/server.c +++ b/src/server.c @@ -1875,7 +1875,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { mstime_t latency; latencyStartMonitor(latency); + atomicSet(server.module_gil_acquring, 1); moduleAcquireGIL(); + atomicSet(server.module_gil_acquring, 0); moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP, REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, NULL); diff --git a/src/server.h b/src/server.h index 437793b0e22f..b398d8ae93e6 100644 --- a/src/server.h +++ b/src/server.h @@ -1608,6 +1608,7 @@ struct redisServer { int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */ pid_t child_pid; /* PID of current child */ int child_type; /* Type of current child */ + redisAtomic int module_gil_acquring; /* Indicates whether the GIL is being acquiring by the main thread. */ /* Networking */ int port; /* TCP listening port */ int tls_port; /* TLS listening port */