Skip to content

Commit

Permalink
Fix race condition issues between the main thread and module threads (#…
Browse files Browse the repository at this point in the history
…12817)

Fix #12785 and other race condition issues.
See the following isolated comments.

The following report was obtained using SANITIZER thread.
```sh
make SANITIZER=thread
./runtest-moduleapi --config io-threads 4 --config io-threads-do-reads yes --accurate
```

1. Fixed thread-safe issue in RM_UnblockClient()
Related discussion:
#12817 (comment)
* When blocking a client in a module using `RM_BlockClientOnKeys()` or
`RM_BlockClientOnKeysWithFlags()`
with a timeout_callback, calling RM_UnblockClient() in module threads
can lead to race conditions
     in `updateStatsOnUnblock()`.

     - Introduced: 
        Version: 6.2
        PR: #7491

     - Touch:
`server.stat_numcommands`, `cmd->latency_histogram`, `server.slowlog`,
and `server.latency_events`
     
     - Harm Level: High
Potentially corrupts the memory data of `cmd->latency_histogram`,
`server.slowlog`, and `server.latency_events`

     - Solution:
Differentiate whether the call to moduleBlockedClientTimedOut() comes
from the module or the main thread.
Since we can't know if RM_UnblockClient() comes from module threads, we
always assume it does and
let `updateStatsOnUnblock()` asynchronously update the unblock status.
     
* When error reply is called in timeout_callback(), ctx is not
thread-safe, eventually lead to race conditions in `afterErrorReply`.

     - Introduced: 
        Version: 6.2
        PR: #8217

     - Touch
       `server.stat_total_error_replies`, `server.errors`, 

     - Harm Level: High
       Potentially corrupts the memory data of `server.errors`
   
      - Solution: 
Make the ctx in `timeout_callback()` with `REDISMODULE_CTX_THREAD_SAFE`,
and asynchronously reply errors to the client.

2. Made RM_Reply*() family API thread-safe
Related discussion:
#12817 (comment)
Call chain: `RM_Reply*()` -> `_addReplyToBufferOrList()` -> touch
server.current_client

    - Introduced: 
       Version: 7.2.0
       PR: #12326

   - Harm Level: None
Since the module fake client won't have the `CLIENT_PUSHING` flag, even
if we touch server.current_client,
     we can still exit after `c->flags & CLIENT_PUSHING`.

   - Solution
      Checking `c->flags & CLIENT_PUSHING` earlier.

3. Made freeClient() thread-safe
    Fix #12785

    - Introduced: 
       Version: 4.0
Commit:
3fcf959

    - Harm Level: Moderate
       * Trigger assertion
It happens when the module thread calls freeClient while the io-thread
is in progress,
which just triggers an assertion, and doesn't make any race condiaions.

* Touch `server.current_client`, `server.stat_clients_type_memory`, and
`clientMemUsageBucket->clients`.
It happens between the main thread and the module threads, may cause
data corruption.
1. Error reset `server.current_client` to NULL, but theoretically this
won't happen,
because the module has already reset `server.current_client` to old
value before entering freeClient.
2. corrupts `clientMemUsageBucket->clients` in
updateClientMemUsageAndBucket().
3. Causes server.stat_clients_type_memory memory statistics to be
inaccurate.
    
    - Solution:
* No longer counts memory usage on fake clients, to avoid updating
`server.stat_clients_type_memory` in freeClient.
* No longer resetting `server.current_client` in unlinkClient, because
the fake client won't be evicted or disconnected in the mid of the
process.
* Judgment assertion `io_threads_op == IO_THREADS_OP_IDLE` only if c is
not a fake client.

4. Fixed free client args without GIL
Related discussion:
#12817 (comment)
When freeing retained strings in the module thread (refcount decr), or
using them in some way (refcount incr), we should do so while holding
the GIL,
otherwise, they might be simultaneously freed while the main thread is
processing the unblock client state.

    - Introduced: 
       Version: 6.2.0
       PR: #8141

   - Harm Level: Low
     Trigger assertion or double free or memory leak. 

   - Solution:
Documenting that module API users need to ensure any access to these
retained strings is done with the GIL locked

5. Fix adding fake client to server.clients_pending_write
    It will incorrectly log the memory usage for the fake client.
Related discussion:
#12817 (comment)

    - Introduced: 
       Version: 4.0
Commit:
9b01b64

    - Harm Level: None
      Only result in NOP

    - Solution:
       * Don't add fake client into server.clients_pending_write
* Add c->conn assertion for updateClientMemUsageAndBucket() and
updateClientMemoryUsage() to avoid same
         issue in the future.
So now it will be the responsibility of the caller of both of them to
avoid passing in fake client.

6. Fix calling RM_BlockedClientMeasureTimeStart() and
RM_BlockedClientMeasureTimeEnd() without GIL
    - Introduced: 
       Version: 6.2
       PR: #7491

   - Harm Level: Low
Causes inaccuracies in command latency histogram and slow logs, but does
not corrupt memory.

   - Solution:
Module API users, if know that non-thread-safe APIs will be used in
multi-threading, need to take responsibility for protecting them with
their own locks instead of the GIL, as using the GIL is too expensive.

### Other issue
1. RM_Yield is not thread-safe, fixed via #12905.

### Summarize
1. Fix thread-safe issues for `RM_UnblockClient()`, `freeClient()` and
`RM_Yield`, potentially preventing memory corruption, data disorder, or
assertion.
2. Updated docs and module test to clarify module API users'
responsibility for locking non-thread-safe APIs in multi-threading, such
as RM_BlockedClientMeasureTimeStart/End(), RM_FreeString(),
RM_RetainString(), and RM_HoldString().

### About backpot to 7.2
1. The implement of (1) is not too satisfying, would like to get more
eyes.
2. (2), (3) can be safely for backport
3. (4), (6) just modifying the module tests and updating the
documentation, no need for a backpot.
4. (5) is harmless, no need for a backpot.

---------

Co-authored-by: Oran Agra <oran@redislabs.com>
  • Loading branch information
sundb and oranagra committed Jan 19, 2024
1 parent f81c3fd commit d064002
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/blocked.c
Expand Up @@ -239,7 +239,7 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
moduleBlockedClientTimedOut(c, 0);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down
61 changes: 46 additions & 15 deletions src/module.c
Expand Up @@ -306,7 +306,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since
* allow thread safe contexts to execute commands at a safe moment. */
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;


/* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);

Expand Down Expand Up @@ -2338,7 +2337,10 @@ ustime_t RM_CachedMicroseconds(void) {
* Within the same command, you can call multiple times
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
* to accumulate independent time intervals to the background duration.
* This method always return REDISMODULE_OK. */
* This method always return REDISMODULE_OK.
*
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
elapsedStart(&(bc->background_timer));
return REDISMODULE_OK;
Expand All @@ -2348,7 +2350,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
* to calculate the elapsed execution time.
* On success REDISMODULE_OK is returned.
* This method only returns REDISMODULE_ERR if no start time was
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ).
*
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
if (!bc->background_timer)
Expand Down Expand Up @@ -2717,7 +2722,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM
* pass ctx as NULL when releasing the string (but passing a context will not
* create any issue). Strings created with a context should be freed also passing
* the context, so if you want to free a string out of context later, make sure
* to create it using a NULL context. */
* to create it using a NULL context.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
decrRefCount(str);
if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
Expand Down Expand Up @@ -2754,7 +2762,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
*
* Threaded modules that reference retained strings from other threads *must*
* explicitly trim the allocation as soon as the string is retained. Not doing
* so may result with automatic trimming which is not thread safe. */
* so may result with automatic trimming which is not thread safe.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
/* Increment the string reference counting only if we can't
Expand Down Expand Up @@ -2796,7 +2807,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
*
* Threaded modules that reference held strings from other threads *must*
* explicitly trim the allocation as soon as the string is held. Not doing
* so may result with automatic trimming which is not thread safe. */
* so may result with automatic trimming which is not thread safe.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) {
if (str->refcount == OBJ_STATIC_REFCOUNT) {
return RM_CreateStringFromString(ctx, str);
Expand Down Expand Up @@ -8228,7 +8242,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
* argument, but better to be safe than sorry. */
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
if (bc->unblocked) return REDISMODULE_OK;
if (bc->client) moduleBlockedClientTimedOut(bc->client);
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1);
}
moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
Expand Down Expand Up @@ -8327,8 +8341,10 @@ void moduleHandleBlockedClients(void) {
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies);
if (c && !clientHasModuleAuthInProgress(c)) {
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) :
(server.stat_total_error_replies != prev_error_replies);
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors);
}

if (c != NULL) {
Expand All @@ -8346,7 +8362,7 @@ void moduleHandleBlockedClients(void) {
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE))
!(c->flags & CLIENT_PENDING_WRITE) && c->conn)
{
c->flags |= CLIENT_PENDING_WRITE;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
Expand Down Expand Up @@ -8381,8 +8397,15 @@ int moduleBlockedClientMayTimeout(client *c) {
/* Called when our client timed out. After this function unblockClient()
* is called, and it will invalidate the blocked client. So this function
* does not need to do any cleanup. Eventually the module will call the
* API to unblock the client and the memory will be released. */
void moduleBlockedClientTimedOut(client *c) {
* API to unblock the client and the memory will be released.
*
* If this function is called from a module, we handle the timeout callback
* and the update of the unblock status in a thread-safe manner to avoid race
* conditions with the main thread.
* If this function is called from the main thread, we must handle the unblocking
* of the client synchronously. This ensures that we can reply to the client before
* resetClient() is called. */
void moduleBlockedClientTimedOut(client *c, int from_module) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;

/* Protect against re-processing: don't serve clients that are already
Expand All @@ -8391,14 +8414,22 @@ void moduleBlockedClientTimedOut(client *c) {
if (bc->unblocked) return;

RedisModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT;
if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE;
moduleCreateContext(&ctx, bc->module, flags);
ctx.client = bc->client;
ctx.blocked_client = bc;
ctx.blocked_privdata = bc->privdata;
long long prev_error_replies = server.stat_total_error_replies;

long long prev_error_replies;
if (!from_module)
prev_error_replies = server.stat_total_error_replies;

bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);

if (!from_module)
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);

/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in
Expand Down
21 changes: 12 additions & 9 deletions src/networking.c
Expand Up @@ -414,8 +414,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
* to a channel which we are subscribed to, then we wanna postpone that message to be added
* after the command's reply (specifically important during multi-exec). the exception is
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
* The check for executing_client also avoids affecting push messages that are part of eviction. */
if (c == server.current_client && (c->flags & CLIENT_PUSHING) &&
* The check for executing_client also avoids affecting push messages that are part of eviction.
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
if ((c->flags & CLIENT_PUSHING) && c == server.current_client &&
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
{
_addReplyProtoToList(c,server.pending_push_messages,s,len);
Expand Down Expand Up @@ -1450,7 +1451,7 @@ void unlinkClient(client *c) {
listNode *ln;

/* If this is marked as current client unset it. */
if (server.current_client == c) server.current_client = NULL;
if (c->conn && server.current_client == c) server.current_client = NULL;

/* Certain operations must be done only if the client has an active connection.
* If the client was already unlinked or if it's a "fake client" the
Expand Down Expand Up @@ -1494,7 +1495,7 @@ void unlinkClient(client *c) {
}

/* Remove from the list of pending reads if needed. */
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE);
if (c->pending_read_list_node != NULL) {
listDelNode(server.clients_pending_read,c->pending_read_list_node);
c->pending_read_list_node = NULL;
Expand Down Expand Up @@ -1649,6 +1650,12 @@ void freeClient(client *c) {
reqresReset(c, 1);
#endif

/* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
if (c->conn)
server.stat_clients_type_memory[c->last_memory_type] -=
c->last_memory_usage;

/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
* places where active clients may be referenced. */
Expand Down Expand Up @@ -1697,10 +1704,6 @@ void freeClient(client *c) {
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();

/* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
server.stat_clients_type_memory[c->last_memory_type] -=
c->last_memory_usage;
/* Remove client from memory usage buckets */
if (c->mem_usage_bucket) {
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
Expand Down Expand Up @@ -2487,7 +2490,7 @@ int processCommandAndResetClient(client *c) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
updateClientMemUsageAndBucket(c);
if (c->conn) updateClientMemUsageAndBucket(c);
}

if (server.current_client == NULL) deadclient = 1;
Expand Down
5 changes: 3 additions & 2 deletions src/server.c
Expand Up @@ -994,6 +994,7 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
* usage bucket.
*/
void updateClientMemoryUsage(client *c) {
serverAssert(c->conn);
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
/* Now that we have the memory used by the client, remove the old
Expand All @@ -1006,7 +1007,7 @@ void updateClientMemoryUsage(client *c) {
}

int clientEvictionAllowed(client *c) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) {
return 0;
}
int type = getClientType(c);
Expand Down Expand Up @@ -1046,7 +1047,7 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
* returns 1 if client eviction for this client is allowed, 0 otherwise.
*/
int updateClientMemUsageAndBucket(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn);
int allow_eviction = clientEvictionAllowed(c);
removeClientFromMemUsageBucket(c, allow_eviction);

Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Expand Up @@ -2532,7 +2532,7 @@ void moduleFreeContext(struct RedisModuleCtx *ctx);
void moduleCallCommandUnblockedHandler(client *c);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientTimedOut(client *c, int from_module);
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
void moduleAcquireGIL(void);
Expand Down
15 changes: 8 additions & 7 deletions tests/modules/blockedclient.c
Expand Up @@ -102,6 +102,7 @@ typedef struct {

void *bg_call_worker(void *arg) {
bg_call_data *bg = arg;
RedisModuleBlockedClient *bc = bg->bc;

// Get Redis module context
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
Expand Down Expand Up @@ -135,6 +136,12 @@ void *bg_call_worker(void *arg) {
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1);
RedisModule_FreeString(NULL, format_redis_str);

/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);

// Release GIL
RedisModule_ThreadSafeContextUnlock(ctx);

Expand All @@ -147,13 +154,7 @@ void *bg_call_worker(void *arg) {
}

// Unblock client
RedisModule_UnblockClient(bg->bc, NULL);

/* Free the arguments */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);
RedisModule_UnblockClient(bc, NULL);

// Free the Redis module context
RedisModule_FreeThreadSafeContext(ctx);
Expand Down

0 comments on commit d064002

Please sign in to comment.