Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge remote branch 'pietern/brpoplpush'
  • Loading branch information
antirez committed Dec 14, 2010
2 parents 8c304be + a4ce758 commit f858c11
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 80 deletions.
8 changes: 5 additions & 3 deletions src/networking.c
Expand Up @@ -41,8 +41,10 @@ redisClient *createClient(int fd) {
c->reply = listCreate();
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
c->blocking_keys = NULL;
c->blocking_keys_num = 0;
c->bpop.keys = NULL;
c->bpop.count = 0;
c->bpop.timeout = 0;
c->bpop.target = NULL;
c->io_keys = listCreate();
c->watched_keys = listCreate();
listSetFreeMethod(c->io_keys,decrRefCount);
Expand Down Expand Up @@ -699,7 +701,7 @@ void closeTimedoutClients(void) {
redisLog(REDIS_VERBOSE,"Closing idle client");
freeClient(c);
} else if (c->flags & REDIS_BLOCKED) {
if (c->blockingto != 0 && c->blockingto < now) {
if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
addReply(c,shared.nullmultibulk);
unblockClientWaitingData(c);
}
Expand Down
28 changes: 22 additions & 6 deletions src/redis.c
Expand Up @@ -89,14 +89,15 @@ struct redisCommand readonlyCommandTable[] = {
{"rpop",rpopCommand,2,0,NULL,1,1,1},
{"lpop",lpopCommand,2,0,NULL,1,1,1},
{"brpop",brpopCommand,-3,0,NULL,1,1,1},
{"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"blpop",blpopCommand,-3,0,NULL,1,1,1},
{"llen",llenCommand,2,0,NULL,1,1,1},
{"lindex",lindexCommand,3,0,NULL,1,1,1},
{"lset",lsetCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"lrange",lrangeCommand,4,0,NULL,1,1,1},
{"ltrim",ltrimCommand,4,0,NULL,1,1,1},
{"lrem",lremCommand,4,0,NULL,1,1,1},
{"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"srem",sremCommand,3,0,NULL,1,1,1},
{"smove",smoveCommand,4,0,NULL,1,2,1},
Expand Down Expand Up @@ -572,7 +573,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
}

/* Close connections of timedout clients */
if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
closeTimedoutClients();

/* Check if a background saving or AOF rewrite in progress terminated */
Expand Down Expand Up @@ -645,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
listNode *ln;
redisClient *c;

/* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listNode *ln;

listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
redisClient *c = ln->value;
c = ln->value;
struct redisCommand *cmd;

/* Resume the client. */
Expand All @@ -671,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
processInputBuffer(c);
}
}

/* Try to process pending commands for clients that were just unblocked. */
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
redisAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);

/* Process remaining data in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}

/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
Expand Down Expand Up @@ -758,7 +773,7 @@ void initServerConfig() {
server.rdbcompression = 1;
server.activerehashing = 1;
server.maxclients = 0;
server.blpop_blocked_clients = 0;
server.bpop_blocked_clients = 0;
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
Expand Down Expand Up @@ -817,6 +832,7 @@ void initServer() {
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
Expand Down Expand Up @@ -1169,7 +1185,7 @@ sds genRedisInfoString(void) {
(float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
server.blpop_blocked_clients,
server.bpop_blocked_clients,
zmalloc_used_memory(),
hmem,
zmalloc_get_rss(),
Expand Down
22 changes: 15 additions & 7 deletions src/redis.h
Expand Up @@ -293,6 +293,16 @@ typedef struct multiState {
int count; /* Total number of MULTI commands */
} multiState;

typedef struct blockingState {
robj **keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int count; /* Number of blocking keys */
time_t timeout; /* Blocking operation timeout. If UNIX current time
* is >= timeout then the operation timed out. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
} blockingState;

/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
Expand All @@ -316,11 +326,7 @@ typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
robj **blocking_keys; /* The key we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
int blocking_keys_num; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
blockingState bpop; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
Expand Down Expand Up @@ -427,8 +433,9 @@ struct redisServer {
int maxmemory_policy;
int maxmemory_samples;
/* Blocked clients */
unsigned int blpop_blocked_clients;
unsigned int bpop_blocked_clients;
unsigned int vm_blocked_clients;
list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
Expand Down Expand Up @@ -937,7 +944,7 @@ void flushdbCommand(redisClient *c);
void flushallCommand(redisClient *c);
void sortCommand(redisClient *c);
void lremCommand(redisClient *c);
void rpoplpushcommand(redisClient *c);
void rpoplpushCommand(redisClient *c);
void infoCommand(redisClient *c);
void mgetCommand(redisClient *c);
void monitorCommand(redisClient *c);
Expand Down Expand Up @@ -966,6 +973,7 @@ void execCommand(redisClient *c);
void discardCommand(redisClient *c);
void blpopCommand(redisClient *c);
void brpopCommand(redisClient *c);
void brpoplpushCommand(redisClient *c);
void appendCommand(redisClient *c);
void substrCommand(redisClient *c);
void strlenCommand(redisClient *c);
Expand Down

0 comments on commit f858c11

Please sign in to comment.