New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition issues between the main thread and module threads #12817
Changes from 6 commits
ecdf99c
716ef44
6e4de39
c09ff05
152e7f2
e6a08af
49937df
761f5dd
15db3fc
14c0aa9
1bad868
4b48f42
dfb5665
f9c27e9
0f1e787
d5ded67
07f8de1
0741699
1ed674f
eeda5e6
eaa21da
e26eeb9
c660f6e
d272368
70b78ae
8bc235f
496d0db
a47cba0
7be865f
54524ca
17efc14
54cdf01
588eff4
cd8b6a3
0bc0796
a06d7dd
d9c18af
42de713
393af9e
70d5348
1359fad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -304,8 +304,11 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since | |||||||||||||||||
|
||||||||||||||||||
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to | ||||||||||||||||||
* allow thread safe contexts to execute commands at a safe moment. */ | ||||||||||||||||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; | ||||||||||||||||||
|
||||||||||||||||||
typedef struct { | ||||||||||||||||||
pthread_mutex_t mutex; | ||||||||||||||||||
redisAtomic pthread_t owner; /* The thread that currently holds the lock */ | ||||||||||||||||||
} ModuleGIL; | ||||||||||||||||||
ModuleGIL moduleGIL = { PTHREAD_MUTEX_INITIALIZER, (pthread_t)0 }; | ||||||||||||||||||
|
||||||||||||||||||
/* Function pointer type for keyspace event notification subscriptions from modules. */ | ||||||||||||||||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); | ||||||||||||||||||
|
@@ -8202,7 +8205,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { | |||||||||||||||||
* argument, but better to be safe than sorry. */ | ||||||||||||||||||
if (bc->timeout_callback == NULL) return REDISMODULE_ERR; | ||||||||||||||||||
if (bc->unblocked) return REDISMODULE_OK; | ||||||||||||||||||
if (bc->client) moduleBlockedClientTimedOut(bc->client); | ||||||||||||||||||
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1); | ||||||||||||||||||
} | ||||||||||||||||||
moduleUnblockClientByHandle(bc,privdata); | ||||||||||||||||||
return REDISMODULE_OK; | ||||||||||||||||||
|
@@ -8301,8 +8304,10 @@ void moduleHandleBlockedClients(void) { | |||||||||||||||||
* This needs to be out of the reply callback above given that a | ||||||||||||||||||
* module might not define any callback and still do blocking ops. | ||||||||||||||||||
*/ | ||||||||||||||||||
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) { | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
if (c && !clientHasModuleAuthInProgress(c)) { | ||||||||||||||||||
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) : | ||||||||||||||||||
(server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (c != NULL) { | ||||||||||||||||||
|
@@ -8355,8 +8360,15 @@ int moduleBlockedClientMayTimeout(client *c) { | |||||||||||||||||
/* Called when our client timed out. After this function unblockClient() | ||||||||||||||||||
* is called, and it will invalidate the blocked client. So this function | ||||||||||||||||||
* does not need to do any cleanup. Eventually the module will call the | ||||||||||||||||||
* API to unblock the client and the memory will be released. */ | ||||||||||||||||||
void moduleBlockedClientTimedOut(client *c) { | ||||||||||||||||||
* API to unblock the client and the memory will be released. | ||||||||||||||||||
* | ||||||||||||||||||
* If this function is called from a module, we handle the timeout callback | ||||||||||||||||||
* and the update of the unblock status in a thread-safe manner to avoid race | ||||||||||||||||||
* conditions with the main thread. | ||||||||||||||||||
* If this function is called from the main thread, we must handle the unblocking | ||||||||||||||||||
* of the client synchronously. This ensures that we can reply to the client before | ||||||||||||||||||
* resetClient() is called. */ | ||||||||||||||||||
void moduleBlockedClientTimedOut(client *c, int from_module) { | ||||||||||||||||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; | ||||||||||||||||||
|
||||||||||||||||||
/* Protect against re-processing: don't serve clients that are already | ||||||||||||||||||
|
@@ -8365,14 +8377,22 @@ void moduleBlockedClientTimedOut(client *c) { | |||||||||||||||||
if (bc->unblocked) return; | ||||||||||||||||||
|
||||||||||||||||||
RedisModuleCtx ctx; | ||||||||||||||||||
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); | ||||||||||||||||||
int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT; | ||||||||||||||||||
if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE; | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm trying to figure out this change.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because both of them are fixed to ensure that The reason is:
If the ctx is not to be Lines 501 to 508 in 27a8e3b
|
||||||||||||||||||
moduleCreateContext(&ctx, bc->module, flags); | ||||||||||||||||||
ctx.client = bc->client; | ||||||||||||||||||
ctx.blocked_client = bc; | ||||||||||||||||||
ctx.blocked_privdata = bc->privdata; | ||||||||||||||||||
long long prev_error_replies = server.stat_total_error_replies; | ||||||||||||||||||
|
||||||||||||||||||
long long prev_error_replies; | ||||||||||||||||||
if (!from_module) | ||||||||||||||||||
prev_error_replies = server.stat_total_error_replies; | ||||||||||||||||||
|
||||||||||||||||||
bc->timeout_callback(&ctx,(void**)c->argv,c->argc); | ||||||||||||||||||
moduleFreeContext(&ctx); | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
|
||||||||||||||||||
if (!from_module) | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
|
||||||||||||||||||
/* For timeout events, we do not want to call the disconnect callback, | ||||||||||||||||||
* because the blocked client will be automatically disconnected in | ||||||||||||||||||
|
@@ -8551,17 +8571,26 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { | |||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
void moduleAcquireGIL(void) { | ||||||||||||||||||
pthread_mutex_lock(&moduleGIL); | ||||||||||||||||||
pthread_mutex_lock(&moduleGIL.mutex); | ||||||||||||||||||
atomicSet(moduleGIL.owner, pthread_self()); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
int moduleTryAcquireGIL(void) { | ||||||||||||||||||
return pthread_mutex_trylock(&moduleGIL); | ||||||||||||||||||
int ret = pthread_mutex_trylock(&moduleGIL.mutex); | ||||||||||||||||||
if (ret == 0) atomicSet(moduleGIL.owner, pthread_self()); | ||||||||||||||||||
return ret; | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
void moduleReleaseGIL(void) { | ||||||||||||||||||
pthread_mutex_unlock(&moduleGIL); | ||||||||||||||||||
atomicSet(moduleGIL.owner, (pthread_t)0); | ||||||||||||||||||
oranagra marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
pthread_mutex_unlock(&moduleGIL.mutex); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
int moduleOwnsGIL(void) { | ||||||||||||||||||
pthread_t owner_thread; | ||||||||||||||||||
atomicGet(moduleGIL.owner, owner_thread); | ||||||||||||||||||
return pthread_equal(pthread_self(), owner_thread); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
/* -------------------------------------------------------------------------- | ||||||||||||||||||
* ## Module Keyspace Notifications API | ||||||||||||||||||
|
@@ -11928,7 +11957,7 @@ void moduleInitModulesSystem(void) { | |||||||||||||||||
|
||||||||||||||||||
/* Our thread-safe contexts GIL must start with already locked: | ||||||||||||||||||
* it is just unlocked when it's safe. */ | ||||||||||||||||||
pthread_mutex_lock(&moduleGIL); | ||||||||||||||||||
moduleAcquireGIL(); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
void modulesCron(void) { | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -415,7 +415,8 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { | |
* after the command's reply (specifically important during multi-exec). the exception is | ||
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. | ||
* The check for executing_client also avoids affecting push messages that are part of eviction. */ | ||
if (c == server.current_client && (c->flags & CLIENT_PUSHING) && | ||
// TODO: should forbid the ReplyWith* module family api from being called outside the lock? | ||
if ((c->flags & CLIENT_PUSHING) && c == server.current_client && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start version: 7.2.0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. like #12817 (comment), this is not about the use of RM_AddReply, it's about using the argv strings (changing their refcount). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they're not related. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sundb this one at the top comment (number 2), says:
but if that's just an access to a variable and then ignoring what we read from it, isn't that "Harm Level: None"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, it should be |
||
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) | ||
{ | ||
_addReplyProtoToList(c,server.pending_push_messages,s,len); | ||
|
@@ -1434,7 +1435,7 @@ void unlinkClient(client *c) { | |
listNode *ln; | ||
|
||
/* If this is marked as current client unset it. */ | ||
if (server.current_client == c) server.current_client = NULL; | ||
if (c->conn && server.current_client == c) server.current_client = NULL; | ||
|
||
/* Certain operations must be done only if the client has an active connection. | ||
* If the client was already unlinked or if it's a "fake client" the | ||
|
@@ -1478,7 +1479,7 @@ void unlinkClient(client *c) { | |
} | ||
|
||
/* Remove from the list of pending reads if needed. */ | ||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE); | ||
serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE); | ||
if (c->pending_read_list_node != NULL) { | ||
listDelNode(server.clients_pending_read,c->pending_read_list_node); | ||
c->pending_read_list_node = NULL; | ||
|
@@ -1631,6 +1632,12 @@ void freeClient(client *c) { | |
reqresReset(c, 1); | ||
#endif | ||
|
||
/* Remove the contribution that this client gave to our | ||
* incrementally computed memory usage. */ | ||
if (c->conn) | ||
server.stat_clients_type_memory[c->last_memory_type] -= | ||
c->last_memory_usage; | ||
Comment on lines
+1653
to
+1657
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start version: 7.0.0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one is resolved by modifying the freeClient code, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't / shouldn't evict them anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've never been able to remember how I reproduced it. Config:
Command: RedisModule_Call(ctx,"client","cc","no-evict","off"); Patch: int clientEvictionAllowed(client *c) {
serverAssert(c->conn);
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
return 0;
}
int type = getClientType(c);
return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB);
}
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please als have a look at #12817 (comment) and top comment(7). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think using RM_Call to call the CLIENT command is valid. |
||
|
||
/* Unlink the client: this will close the socket, remove the I/O | ||
* handlers, and remove references of the client from different | ||
* places where active clients may be referenced. */ | ||
|
@@ -1679,10 +1686,6 @@ void freeClient(client *c) { | |
* we lost the connection with the master. */ | ||
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); | ||
|
||
/* Remove the contribution that this client gave to our | ||
* incrementally computed memory usage. */ | ||
server.stat_clients_type_memory[c->last_memory_type] -= | ||
c->last_memory_usage; | ||
/* Remove client from memory usage buckets */ | ||
if (c->mem_usage_bucket) { | ||
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -135,8 +135,6 @@ void *bg_call_worker(void *arg) { | |
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1); | ||
RedisModule_FreeString(NULL, format_redis_str); | ||
|
||
// Release GIL | ||
RedisModule_ThreadSafeContextUnlock(ctx); | ||
|
||
// Reply to client | ||
if (!rep) { | ||
|
@@ -155,6 +153,9 @@ void *bg_call_worker(void *arg) { | |
RedisModule_Free(bg->argv); | ||
RedisModule_Free(bg); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start version: 6.2.0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it because of auto-memory? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I update the stack trace. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so what do we conclude here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually the order doesn't matter, if they both decr at the same time it can mess up the refcount. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should mention it on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. p.s. for some reason i thought that RM_ReplyWithString will also be an issue, but now i see it doesn't touch the refcount. (at least not in the current implementation) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sundb this one is linked to number 4 at the top, which says:
first of all, an assertion isn't "None" it terminates the process and can cause data loss. maybe change it to "Low"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my intention was that it was only caused by the module's use of API, not by the internal implement, so I chose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ohh, ok, maybe in addition to the "Harm Level", we can add some explicit "Trigger", to specify that it depends on some rare case, and people can easily rule it out and know they're safe. |
||
// Release GIL | ||
RedisModule_ThreadSafeContextUnlock(ctx); | ||
|
||
// Free the Redis module context | ||
RedisModule_FreeThreadSafeContext(ctx); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following #12817 (comment)
we call
updateStatsOnUnblock()
here when from RM_UnblockClient().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean that before this PR it was called twice?
maybe we should add a comment in moduleBlockedClientTimedOut, explaining the
if
statement by referring to this call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR,
moduleHandleBlockedClients()
ignored updating the block status when the client was blocked on keys.origin code: