Skip to content

Commit

Permalink
Handle output buffer limits for Module blocked clients (#8141)
Browse files Browse the repository at this point in the history
Module blocked clients cache the response in a temporary client,
the reply list in this client would be affected by the recent fix
in #7202, but when the reply is later copied into the real client,
it would have bypassed all the checks for output buffer limit, which
would have resulted in both: responding with a partial response to
the client, and also not disconnecting it at all.

(cherry picked from commit 48efc25)
  • Loading branch information
oranagra committed Jan 12, 2021
1 parent 785851a commit ec74ae7
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 1 deletion.
33 changes: 32 additions & 1 deletion src/networking.c
Expand Up @@ -260,6 +260,9 @@ int prepareClientToWrite(client *c) {
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */

/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;

Expand All @@ -277,6 +280,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
return C_OK;
}

/* Adds the reply to the reply linked list.
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
void _addReplyProtoToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;

Expand Down Expand Up @@ -837,14 +842,40 @@ void addReplySubcommandSyntaxError(client *c) {
/* Append 'src' client output buffers into 'dst' client output buffers.
* This function clears the output buffers of 'src' */
void AddReplyFromClient(client *dst, client *src) {
if (prepareClientToWrite(dst) != C_OK)
/* If the source client contains a partial response due to client output
* buffer limits, propagate that to the dest rather than copy a partial
* reply. We don't wanna run the risk of copying partial response in case
* for some reason the output limits don't reach the same decision (maybe
* they changed) */
if (src->flags & CLIENT_CLOSE_ASAP) {
sds client = catClientInfoString(sdsempty(),dst);
freeClientAsync(dst);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
sdsfree(client);
return;
}

/* First add the static buffer (either into the static buffer or reply list) */
addReplyProto(dst,src->buf, src->bufpos);

/* We need to check with prepareClientToWrite again (after addReplyProto)
* since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
if (prepareClientToWrite(dst) != C_OK)
return;

/* We're bypassing _addReplyProtoToList, so we need to add the pre/post
* checks in it. */
if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;

/* Concatenate the reply list into the dest */
if (listLength(src->reply))
listJoin(dst->reply,src->reply);
dst->reply_bytes += src->reply_bytes;
src->reply_bytes = 0;
src->bufpos = 0;

/* Check output buffer limits */
asyncCloseClientOnOutputBufferLimitReached(dst);
}

/* Copy 'src' client output buffers into 'dst' client output buffers.
Expand Down
105 changes: 105 additions & 0 deletions tests/modules/blockedclient.c
Expand Up @@ -79,6 +79,105 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}

typedef struct {
RedisModuleString **argv;
int argc;
RedisModuleBlockedClient *bc;
} bg_call_data;

void *bg_call_worker(void *arg) {
bg_call_data *bg = arg;

// Get Redis module context
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);

// Acquire GIL
RedisModule_ThreadSafeContextLock(ctx);

// Call the command
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);

// Release GIL
RedisModule_ThreadSafeContextUnlock(ctx);

// Reply to client
if (!rep) {
RedisModule_ReplyWithError(ctx, "NULL reply returned");
} else {
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}

// Unblock client
RedisModule_UnblockClient(bg->bc, NULL);

/* Free the arguments */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);

// Free the Redis module context
RedisModule_FreeThreadSafeContext(ctx);

return NULL;
}

int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
UNUSED(argv);
UNUSED(argc);

/* Make sure we're not trying to block a client when we shouldn't */
int flags = RedisModule_GetContextFlags(ctx);
int allFlags = RedisModule_GetContextFlagsAll();
if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
(flags & REDISMODULE_CTX_FLAGS_MULTI)) {
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
return REDISMODULE_OK;
}

/* Make a copy of the arguments and pass them to the thread. */
bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
bg->argc = argc;
for (int i=0; i<argc; i++)
bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);

/* Block the client */
bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);

/* Start a thread to handle the request */
pthread_t tid;
int res = pthread_create(&tid, NULL, bg_call_worker, bg);
assert(res == 0);

return REDISMODULE_OK;
}

int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
UNUSED(argv);
UNUSED(argc);

if(argc < 2){
return RedisModule_WrongArity(ctx);
}

const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);

RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
if(!rep){
RedisModule_ReplyWithError(ctx, "NULL reply returned");
}else{
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}

return REDISMODULE_OK;
}


int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
Expand All @@ -89,5 +188,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;

return REDISMODULE_OK;
}
13 changes: 13 additions & 0 deletions tests/unit/moduleapi/blockedclient.tcl
Expand Up @@ -14,4 +14,17 @@ start_server {tags {"modules"}} {
r acquire_gil
assert_equal {{Blocked client is not supported inside multi}} [r exec]
}

test {blocked client reaches client output buffer limit} {
r hset hash big [string repeat x 50000]
r hset hash bada [string repeat x 50000]
r hset hash boom [string repeat x 50000]
r config set client-output-buffer-limit {normal 100000 0 0}
r client setname myclient
catch {r do_bg_rm_call hgetall hash} e
assert_match "*I/O error*" $e
reconnect
set clients [r client list]
assert_no_match "*name=myclient*" $clients
}
}

0 comments on commit ec74ae7

Please sign in to comment.