diff --git a/src/redis.c b/src/redis.c index 8a5f9632aec7..a1653c36e458 100644 --- a/src/redis.c +++ b/src/redis.c @@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + listNode *ln; + redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; - listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { - redisClient *c = ln->value; + c = ln->value; struct redisCommand *cmd; /* Resume the client. */ @@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + + /* Try to process pending commands for clients that were just unblocked. */ + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(); } @@ -818,6 +832,7 @@ void initServer() { server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.unblocked_clients = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); diff --git a/src/redis.h b/src/redis.h index 27cb82598ede..3639f0623d5d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -435,6 +435,7 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; + list *unblocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/src/t_list.c b/src/t_list.c index 867e258a1e22..7dc3f13934aa 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.bpop_blocked_clients--; - /* We want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if - * unblockClientWaitingData() gets called from freeClient() because - * freeClient() will be smart enough to call this function - * *after* c->querybuf was set to NULL. */ - if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); + listAddNodeTail(server.unblocked_clients,c); } /* This should be called from any function PUSHing into lists.