Skip to content

Commit

Permalink
Don't execute commands for clients when they are unblocked
Browse files Browse the repository at this point in the history
  • Loading branch information
pietern committed Dec 6, 2010
1 parent ecf9401 commit a4ce758
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
19 changes: 17 additions & 2 deletions src/redis.c
Expand Up @@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
listNode *ln;
redisClient *c;

/* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listNode *ln;

listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
redisClient *c = ln->value;
c = ln->value;
struct redisCommand *cmd;

/* Resume the client. */
Expand All @@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
processInputBuffer(c);
}
}

/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);

/* Process remaining data in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}

/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
Expand Down Expand Up @@ -818,6 +832,7 @@ void initServer() {
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
Expand Down
1 change: 1 addition & 0 deletions src/redis.h
Expand Up @@ -435,6 +435,7 @@ struct redisServer {
/* Blocked clients */
unsigned int bpop_blocked_clients;
unsigned int vm_blocked_clients;
list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
Expand Down
7 changes: 1 addition & 6 deletions src/t_list.c
Expand Up @@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) {
c->bpop.target = NULL;
c->flags &= (~REDIS_BLOCKED);
server.bpop_blocked_clients--;
/* We want to process data if there is some command waiting
* in the input buffer. Note that this is safe even if
* unblockClientWaitingData() gets called from freeClient() because
* freeClient() will be smart enough to call this function
* *after* c->querybuf was set to NULL. */
if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
listAddNodeTail(server.unblocked_clients,c);
}

/* This should be called from any function PUSHing into lists.
Expand Down

0 comments on commit a4ce758

Please sign in to comment.