New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition issues between the main thread and module threads #12817
Changes from all commits
ecdf99c
716ef44
6e4de39
c09ff05
152e7f2
e6a08af
49937df
761f5dd
15db3fc
14c0aa9
1bad868
4b48f42
dfb5665
f9c27e9
0f1e787
d5ded67
07f8de1
0741699
1ed674f
eeda5e6
eaa21da
e26eeb9
c660f6e
d272368
70b78ae
8bc235f
496d0db
a47cba0
7be865f
54524ca
17efc14
54cdf01
588eff4
cd8b6a3
0bc0796
a06d7dd
d9c18af
42de713
393af9e
70d5348
1359fad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -306,7 +306,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since | |||||||||||||||||
* allow thread safe contexts to execute commands at a safe moment. */ | ||||||||||||||||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
/* Function pointer type for keyspace event notification subscriptions from modules. */ | ||||||||||||||||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); | ||||||||||||||||||
|
||||||||||||||||||
|
@@ -2338,7 +2337,10 @@ ustime_t RM_CachedMicroseconds(void) { | |||||||||||||||||
* Within the same command, you can call multiple times | ||||||||||||||||||
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() | ||||||||||||||||||
* to accumulate independent time intervals to the background duration. | ||||||||||||||||||
* This method always return REDISMODULE_OK. */ | ||||||||||||||||||
* This method always return REDISMODULE_OK. | ||||||||||||||||||
* | ||||||||||||||||||
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread) | ||||||||||||||||||
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ | ||||||||||||||||||
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { | ||||||||||||||||||
elapsedStart(&(bc->background_timer)); | ||||||||||||||||||
return REDISMODULE_OK; | ||||||||||||||||||
|
@@ -2348,7 +2350,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { | |||||||||||||||||
* to calculate the elapsed execution time. | ||||||||||||||||||
* On success REDISMODULE_OK is returned. | ||||||||||||||||||
* This method only returns REDISMODULE_ERR if no start time was | ||||||||||||||||||
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */ | ||||||||||||||||||
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). | ||||||||||||||||||
* | ||||||||||||||||||
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread) | ||||||||||||||||||
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ | ||||||||||||||||||
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { | ||||||||||||||||||
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart | ||||||||||||||||||
if (!bc->background_timer) | ||||||||||||||||||
|
@@ -2717,7 +2722,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM | |||||||||||||||||
* pass ctx as NULL when releasing the string (but passing a context will not | ||||||||||||||||||
* create any issue). Strings created with a context should be freed also passing | ||||||||||||||||||
* the context, so if you want to free a string out of context later, make sure | ||||||||||||||||||
* to create it using a NULL context. */ | ||||||||||||||||||
* to create it using a NULL context. | ||||||||||||||||||
* | ||||||||||||||||||
* This API is not thread safe, access to these retained strings (if they originated | ||||||||||||||||||
* from a client command arguments) must be done with GIL locked. */ | ||||||||||||||||||
void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { | ||||||||||||||||||
decrRefCount(str); | ||||||||||||||||||
if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str); | ||||||||||||||||||
|
@@ -2754,7 +2762,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { | |||||||||||||||||
* | ||||||||||||||||||
* Threaded modules that reference retained strings from other threads *must* | ||||||||||||||||||
* explicitly trim the allocation as soon as the string is retained. Not doing | ||||||||||||||||||
* so may result with automatic trimming which is not thread safe. */ | ||||||||||||||||||
* so may result with automatic trimming which is not thread safe. | ||||||||||||||||||
* | ||||||||||||||||||
* This API is not thread safe, access to these retained strings (if they originated | ||||||||||||||||||
* from a client command arguments) must be done with GIL locked. */ | ||||||||||||||||||
void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { | ||||||||||||||||||
if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) { | ||||||||||||||||||
/* Increment the string reference counting only if we can't | ||||||||||||||||||
|
@@ -2796,7 +2807,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { | |||||||||||||||||
* | ||||||||||||||||||
* Threaded modules that reference held strings from other threads *must* | ||||||||||||||||||
* explicitly trim the allocation as soon as the string is held. Not doing | ||||||||||||||||||
* so may result with automatic trimming which is not thread safe. */ | ||||||||||||||||||
* so may result with automatic trimming which is not thread safe. | ||||||||||||||||||
* | ||||||||||||||||||
* This API is not thread safe, access to these retained strings (if they originated | ||||||||||||||||||
* from a client command arguments) must be done with GIL locked. */ | ||||||||||||||||||
RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { | ||||||||||||||||||
if (str->refcount == OBJ_STATIC_REFCOUNT) { | ||||||||||||||||||
return RM_CreateStringFromString(ctx, str); | ||||||||||||||||||
|
@@ -8228,7 +8242,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { | |||||||||||||||||
* argument, but better to be safe than sorry. */ | ||||||||||||||||||
if (bc->timeout_callback == NULL) return REDISMODULE_ERR; | ||||||||||||||||||
if (bc->unblocked) return REDISMODULE_OK; | ||||||||||||||||||
if (bc->client) moduleBlockedClientTimedOut(bc->client); | ||||||||||||||||||
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1); | ||||||||||||||||||
} | ||||||||||||||||||
moduleUnblockClientByHandle(bc,privdata); | ||||||||||||||||||
return REDISMODULE_OK; | ||||||||||||||||||
|
@@ -8327,8 +8341,10 @@ void moduleHandleBlockedClients(void) { | |||||||||||||||||
* This needs to be out of the reply callback above given that a | ||||||||||||||||||
* module might not define any callback and still do blocking ops. | ||||||||||||||||||
*/ | ||||||||||||||||||
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) { | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
if (c && !clientHasModuleAuthInProgress(c)) { | ||||||||||||||||||
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) : | ||||||||||||||||||
(server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (c != NULL) { | ||||||||||||||||||
|
@@ -8346,7 +8362,7 @@ void moduleHandleBlockedClients(void) { | |||||||||||||||||
* if there are pending replies here. This is needed since | ||||||||||||||||||
* during a non blocking command the client may receive output. */ | ||||||||||||||||||
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) && | ||||||||||||||||||
!(c->flags & CLIENT_PENDING_WRITE)) | ||||||||||||||||||
!(c->flags & CLIENT_PENDING_WRITE) && c->conn) | ||||||||||||||||||
{ | ||||||||||||||||||
c->flags |= CLIENT_PENDING_WRITE; | ||||||||||||||||||
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node); | ||||||||||||||||||
|
@@ -8381,8 +8397,15 @@ int moduleBlockedClientMayTimeout(client *c) { | |||||||||||||||||
/* Called when our client timed out. After this function unblockClient() | ||||||||||||||||||
* is called, and it will invalidate the blocked client. So this function | ||||||||||||||||||
* does not need to do any cleanup. Eventually the module will call the | ||||||||||||||||||
* API to unblock the client and the memory will be released. */ | ||||||||||||||||||
void moduleBlockedClientTimedOut(client *c) { | ||||||||||||||||||
* API to unblock the client and the memory will be released. | ||||||||||||||||||
* | ||||||||||||||||||
* If this function is called from a module, we handle the timeout callback | ||||||||||||||||||
* and the update of the unblock status in a thread-safe manner to avoid race | ||||||||||||||||||
* conditions with the main thread. | ||||||||||||||||||
* If this function is called from the main thread, we must handle the unblocking | ||||||||||||||||||
* of the client synchronously. This ensures that we can reply to the client before | ||||||||||||||||||
* resetClient() is called. */ | ||||||||||||||||||
void moduleBlockedClientTimedOut(client *c, int from_module) { | ||||||||||||||||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; | ||||||||||||||||||
|
||||||||||||||||||
/* Protect against re-processing: don't serve clients that are already | ||||||||||||||||||
|
@@ -8391,14 +8414,22 @@ void moduleBlockedClientTimedOut(client *c) { | |||||||||||||||||
if (bc->unblocked) return; | ||||||||||||||||||
|
||||||||||||||||||
RedisModuleCtx ctx; | ||||||||||||||||||
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); | ||||||||||||||||||
int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT; | ||||||||||||||||||
if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE; | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm trying to figure out this change.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is because both of them are fixed to ensure that The reason is:
If the ctx is not to be Lines 501 to 508 in 27a8e3b
|
||||||||||||||||||
moduleCreateContext(&ctx, bc->module, flags); | ||||||||||||||||||
ctx.client = bc->client; | ||||||||||||||||||
ctx.blocked_client = bc; | ||||||||||||||||||
ctx.blocked_privdata = bc->privdata; | ||||||||||||||||||
long long prev_error_replies = server.stat_total_error_replies; | ||||||||||||||||||
|
||||||||||||||||||
long long prev_error_replies; | ||||||||||||||||||
if (!from_module) | ||||||||||||||||||
prev_error_replies = server.stat_total_error_replies; | ||||||||||||||||||
|
||||||||||||||||||
bc->timeout_callback(&ctx,(void**)c->argv,c->argc); | ||||||||||||||||||
moduleFreeContext(&ctx); | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
|
||||||||||||||||||
if (!from_module) | ||||||||||||||||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); | ||||||||||||||||||
|
||||||||||||||||||
/* For timeout events, we do not want to call the disconnect callback, | ||||||||||||||||||
* because the blocked client will be automatically disconnected in | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -414,8 +414,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { | |
* to a channel which we are subscribed to, then we wanna postpone that message to be added | ||
* after the command's reply (specifically important during multi-exec). the exception is | ||
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. | ||
* The check for executing_client also avoids affecting push messages that are part of eviction. */ | ||
if (c == server.current_client && (c->flags & CLIENT_PUSHING) && | ||
* The check for executing_client also avoids affecting push messages that are part of eviction. | ||
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */ | ||
if ((c->flags & CLIENT_PUSHING) && c == server.current_client && | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start version: 7.2.0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. like #12817 (comment), this is not about the use of RM_AddReply, it's about using the argv strings (changing their refcount). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they're not related. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sundb this one at the top comment (number 2), says:
but if that's just an access to a variable and then ignoring what we read from it, isn't that "Harm Level: None"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right, it should be |
||
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) | ||
{ | ||
_addReplyProtoToList(c,server.pending_push_messages,s,len); | ||
|
@@ -1450,7 +1451,7 @@ void unlinkClient(client *c) { | |
listNode *ln; | ||
|
||
/* If this is marked as current client unset it. */ | ||
if (server.current_client == c) server.current_client = NULL; | ||
if (c->conn && server.current_client == c) server.current_client = NULL; | ||
|
||
/* Certain operations must be done only if the client has an active connection. | ||
* If the client was already unlinked or if it's a "fake client" the | ||
|
@@ -1494,7 +1495,7 @@ void unlinkClient(client *c) { | |
} | ||
|
||
/* Remove from the list of pending reads if needed. */ | ||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE); | ||
serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE); | ||
if (c->pending_read_list_node != NULL) { | ||
listDelNode(server.clients_pending_read,c->pending_read_list_node); | ||
c->pending_read_list_node = NULL; | ||
|
@@ -1649,6 +1650,12 @@ void freeClient(client *c) { | |
reqresReset(c, 1); | ||
#endif | ||
|
||
/* Remove the contribution that this client gave to our | ||
* incrementally computed memory usage. */ | ||
if (c->conn) | ||
server.stat_clients_type_memory[c->last_memory_type] -= | ||
c->last_memory_usage; | ||
Comment on lines
+1653
to
+1657
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. start version: 7.0.0
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one is resolved by modifying the freeClient code, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can't / shouldn't evict them anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've never been able to remember how I reproduced it. Config:
Command: RedisModule_Call(ctx,"client","cc","no-evict","off"); Patch: int clientEvictionAllowed(client *c) {
serverAssert(c->conn);
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
return 0;
}
int type = getClientType(c);
return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB);
}
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please als have a look at #12817 (comment) and top comment(7). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i don't think using RM_Call to call the CLIENT command is valid. |
||
|
||
/* Unlink the client: this will close the socket, remove the I/O | ||
* handlers, and remove references of the client from different | ||
* places where active clients may be referenced. */ | ||
|
@@ -1697,10 +1704,6 @@ void freeClient(client *c) { | |
* we lost the connection with the master. */ | ||
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); | ||
|
||
/* Remove the contribution that this client gave to our | ||
* incrementally computed memory usage. */ | ||
server.stat_clients_type_memory[c->last_memory_type] -= | ||
c->last_memory_usage; | ||
/* Remove client from memory usage buckets */ | ||
if (c->mem_usage_bucket) { | ||
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; | ||
|
@@ -2487,7 +2490,7 @@ int processCommandAndResetClient(client *c) { | |
commandProcessed(c); | ||
/* Update the client's memory to include output buffer growth following the | ||
* processed command. */ | ||
updateClientMemUsageAndBucket(c); | ||
if (c->conn) updateClientMemUsageAndBucket(c); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @oranagra this is another place to record the memory usage of fake clients.
failed test:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think that's a bug, they're not "evictable", and the user (application) doesn't control them (and their amount). |
||
} | ||
|
||
if (server.current_client == NULL) deadclient = 1; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following #12817 (comment)
we call
updateStatsOnUnblock()
here when from RM_UnblockClient().There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean that before this PR it was called twice?
maybe we should add a comment in moduleBlockedClientTimedOut, explaining the
if
statement by referring to this call.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR,
moduleHandleBlockedClients()
ignored updating the block status when the client was blocked on keys.origin code: