diff --git a/src/networking.c b/src/networking.c index f1665825c860..df30206a320c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -270,6 +270,9 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +/* Attempts to add the reply to the static buffer in the client struct. + * Returns C_ERR if the buffer is full, or the reply list is not empty, + * in which case the reply must be added to the reply list. */ int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; @@ -287,6 +290,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { return C_OK; } +/* Adds the reply to the reply linked list. + * Note: some edits to this function need to be relayed to AddReplyFromClient. */ void _addReplyProtoToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; @@ -848,14 +853,40 @@ void addReplySubcommandSyntaxError(client *c) { /* Append 'src' client output buffers into 'dst' client output buffers. * This function clears the output buffers of 'src' */ void AddReplyFromClient(client *dst, client *src) { - if (prepareClientToWrite(dst) != C_OK) + /* If the source client contains a partial response due to client output + * buffer limits, propagate that to the dest rather than copy a partial + * reply. We don't wanna run the risk of copying partial response in case + * for some reason the output limits don't reach the same decision (maybe + * they changed) */ + if (src->flags & CLIENT_CLOSE_ASAP) { + sds client = catClientInfoString(sdsempty(),dst); + freeClientAsync(dst); + serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); + sdsfree(client); return; + } + + /* First add the static buffer (either into the static buffer or reply list) */ addReplyProto(dst,src->buf, src->bufpos); + + /* We need to check with prepareClientToWrite again (after addReplyProto) + * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */ + if (prepareClientToWrite(dst) != C_OK) + return; + + /* We're bypassing _addReplyProtoToList, so we need to add the pre/post + * checks in it. */ + if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return; + + /* Concatenate the reply list into the dest */ if (listLength(src->reply)) listJoin(dst->reply,src->reply); dst->reply_bytes += src->reply_bytes; src->reply_bytes = 0; src->bufpos = 0; + + /* Check output buffer limits */ + asyncCloseClientOnOutputBufferLimitReached(dst); } /* Copy 'src' client output buffers into 'dst' client output buffers. diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index cfa8319d8a06..c3b0d5eb4854 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -85,6 +85,88 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +typedef struct { + RedisModuleString **argv; + int argc; + RedisModuleBlockedClient *bc; +} bg_call_data; + +void *bg_call_worker(void *arg) { + bg_call_data *bg = arg; + + // Get Redis module context + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); + + // Acquire GIL + RedisModule_ThreadSafeContextLock(ctx); + + // Call the command + const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL); + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2); + + // Release GIL + RedisModule_ThreadSafeContextUnlock(ctx); + + // Reply to client + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + + // Unblock client + RedisModule_UnblockClient(bg->bc, NULL); + + /* Free the arguments */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + + // Free the Redis module context + RedisModule_FreeThreadSafeContext(ctx); + + return NULL; +} + +int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + UNUSED(argv); + UNUSED(argc); + + /* Make sure we're not trying to block a client when we shouldn't */ + int flags = RedisModule_GetContextFlags(ctx); + int allFlags = RedisModule_GetContextFlagsAll(); + if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) && + (flags & REDISMODULE_CTX_FLAGS_MULTI)) { + RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi"); + return REDISMODULE_OK; + } + if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) && + (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) { + RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed"); + return REDISMODULE_OK; + } + + /* Make a copy of the arguments and pass them to the thread. */ + bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data)); + bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc); + bg->argc = argc; + for (int i=0; iargv[i] = RedisModule_HoldString(ctx, argv[i]); + + /* Block the client */ + bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0); + + /* Start a thread to handle the request */ + pthread_t tid; + int res = pthread_create(&tid, NULL, bg_call_worker, bg); + assert(res == 0); + + return REDISMODULE_OK; +} + int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ UNUSED(argv); UNUSED(argc); @@ -120,5 +202,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 60039659eb31..0598a84d69de 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -72,4 +72,22 @@ start_server {tags {"modules"}} { } e set e } {*ERR*DENY BLOCKING*} + + test {RM_Call from blocked client} { + r hset hash foo bar + r do_bg_rm_call hgetall hash + } {foo bar} + + test {blocked client reaches client output buffer limit} { + r hset hash big [string repeat x 50000] + r hset hash bada [string repeat x 50000] + r hset hash boom [string repeat x 50000] + r config set client-output-buffer-limit {normal 100000 0 0} + r client setname myclient + catch {r do_bg_rm_call hgetall hash} e + assert_match "*I/O error*" $e + reconnect + set clients [r client list] + assert_no_match "*name=myclient*" $clients + } }