From 8a979f039011a4672b1052ee84ca56f214e6a681 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 10:43:21 -0300 Subject: [PATCH 01/15] Fix case in RPOPLPUSH. --- src/redis.c | 2 +- src/redis.h | 2 +- src/t_list.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/redis.c b/src/redis.c index 5b39c011f537..1f7e7d266284 100644 --- a/src/redis.c +++ b/src/redis.c @@ -96,7 +96,7 @@ struct redisCommand readonlyCommandTable[] = { {"lrange",lrangeCommand,4,0,NULL,1,1,1}, {"ltrim",ltrimCommand,4,0,NULL,1,1,1}, {"lrem",lremCommand,4,0,NULL,1,1,1}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, + {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1}, {"srem",sremCommand,3,0,NULL,1,1,1}, {"smove",smoveCommand,4,0,NULL,1,2,1}, diff --git a/src/redis.h b/src/redis.h index 11e4e557f312..6bd0fd5dfe96 100644 --- a/src/redis.h +++ b/src/redis.h @@ -932,7 +932,7 @@ void flushdbCommand(redisClient *c); void flushallCommand(redisClient *c); void sortCommand(redisClient *c); void lremCommand(redisClient *c); -void rpoplpushcommand(redisClient *c); +void rpoplpushCommand(redisClient *c); void infoCommand(redisClient *c); void mgetCommand(redisClient *c); void monitorCommand(redisClient *c); diff --git a/src/t_list.c b/src/t_list.c index 42e1d5873eb8..10e7f72c7137 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -618,7 +618,7 @@ void lremCommand(redisClient *c) { * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushcommand(redisClient *c) { +void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; From b2a7fd0cf7afef7e7ede9e46a317fcb9ae84768c Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 15:25:59 -0300 Subject: [PATCH 02/15] BRPOPLPUSH. --- src/redis.c | 1 + src/redis.h | 2 + src/t_list.c | 83 +++++++++++++++++++++++++++++----------- tests/unit/type/list.tcl | 49 ++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 23 deletions(-) diff --git a/src/redis.c b/src/redis.c index 1f7e7d266284..58a796c037e1 100644 --- a/src/redis.c +++ b/src/redis.c @@ -89,6 +89,7 @@ struct redisCommand readonlyCommandTable[] = { {"rpop",rpopCommand,2,0,NULL,1,1,1}, {"lpop",lpopCommand,2,0,NULL,1,1,1}, {"brpop",brpopCommand,-3,0,NULL,1,1,1}, + {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"blpop",blpopCommand,-3,0,NULL,1,1,1}, {"llen",llenCommand,2,0,NULL,1,1,1}, {"lindex",lindexCommand,3,0,NULL,1,1,1}, diff --git a/src/redis.h b/src/redis.h index 6bd0fd5dfe96..83a94483fc7b 100644 --- a/src/redis.h +++ b/src/redis.h @@ -321,6 +321,7 @@ typedef struct redisClient { int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ + robj *blocking_target; list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ @@ -961,6 +962,7 @@ void execCommand(redisClient *c); void discardCommand(redisClient *c); void blpopCommand(redisClient *c); void brpopCommand(redisClient *c); +void brpoplpushCommand(redisClient *c); void appendCommand(redisClient *c); void substrCommand(redisClient *c); void strlenCommand(redisClient *c); diff --git a/src/t_list.c b/src/t_list.c index 10e7f72c7137..437f4004be18 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -777,9 +777,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->blocking_target == NULL) { + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + } + else { + receiver->argc++; + + robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + + addReplyBulk(receiver,ele); + + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db,receiver->blocking_target,dobj); + } + + listTypePush(dobj,ele,REDIS_HEAD); + } + unblockClientWaitingData(receiver); return 1; } @@ -814,26 +833,36 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; + if (c->blocking_target == NULL) { + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; + } + else { + c->argv[2] = c->blocking_target; + c->blocking_target = NULL; + + rpoplpushCommand(c); + } + return; } } @@ -860,3 +889,11 @@ void blpopCommand(redisClient *c) { void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + +void brpoplpushCommand(redisClient *c) { + c->blocking_target = c->argv[2]; + c->argv[2] = c->argv[3]; + c->argc--; + + blockingPopGenericCommand(c,REDIS_TAIL); +} diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 4c131fc3774e..85cbe88e7223 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -127,6 +127,55 @@ start_server { assert_equal 0 [r llen blist1] assert_equal 1 [r llen blist2] } + + test "BRPOPLPUSH - $type" { + r del target + + set rd [redis_deferring_client] + create_$type blist "a b $large c d" + + $rd brpoplpush blist target 1 + assert_equal d [$rd read] + + assert_equal d [r rpop target] + assert_equal "a b $large c" [r lrange blist 0 -1] + } + } + + test "BRPOPLPUSH with zero timeout should block indefinitely" { + set rd [redis_deferring_client] + r del blist target + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_equal foo [$rd read] + assert_equal {foo} [r lrange target 0 -1] + } + + test "BRPOPLPUSH with wrong source type" { + set rd [redis_deferring_client] + r del blist target + r set blist nolist + $rd brpoplpush blist target 1 + assert_error "ERR*wrong kind*" {$rd read} + } + + test "BRPOPLPUSH with wrong destination type" { + set rd [redis_deferring_client] + r del blist target + r set target nolist + r lpush blist foo + $rd brpoplpush blist target 1 + assert_error "ERR*wrong kind*" {$rd read} + + set rd [redis_deferring_client] + r del blist target + r set target nolist + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_error "ERR*wrong kind*" {$rd read} + assert_equal {foo} [r lrange blist 0 -1] } foreach {pop} {BLPOP BRPOP} { From 357a841714d50f3d3c293b9bb03839bac28cb05a Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 19:38:01 -0300 Subject: [PATCH 03/15] Move to struct. --- src/networking.c | 8 +++++--- src/redis.h | 17 +++++++++++------ src/t_list.c | 36 ++++++++++++++++++------------------ 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/networking.c b/src/networking.c index 634e2107c0fc..530e2ca87382 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,8 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bstate.keys = NULL; + c->bstate.count = 0; + c->bstate.timeout = 0; + c->bstate.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -677,7 +679,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bstate.timeout != 0 && c->bstate.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --git a/src/redis.h b/src/redis.h index 83a94483fc7b..0de94585b89c 100644 --- a/src/redis.h +++ b/src/redis.h @@ -293,6 +293,16 @@ typedef struct multiState { int count; /* Total number of MULTI commands */ } multiState; +typedef struct blockingState { + robj **keys; /* The key we are waiting to terminate a blocking + * operation such as BLPOP. Otherwise NULL. */ + int count; /* Number of blocking keys */ + time_t timeout; /* Blocking operation timeout. If UNIX current time + * is >= timeout then the operation timed out. */ + robj *target; /* The key that should receive the element, + * for BRPOPLPUSH. */ +} blockingState; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { @@ -316,12 +326,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blocking_keys; /* The key we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ - int blocking_keys_num; /* Number of blocking keys */ - time_t blockingto; /* Blocking operation timeout. If UNIX current time - * is >= blockingto then the operation timed out. */ - robj *blocking_target; + blockingState bstate; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ diff --git a/src/t_list.c b/src/t_list.c index 437f4004be18..17015acfe80a 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -694,12 +694,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); + c->bstate.count = numkeys; + c->bstate.timeout = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bstate.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -728,22 +728,22 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bstate.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bstate.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bstate.keys[j]); + decrRefCount(c->bstate.keys[j]); } /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bstate.keys); + c->bstate.keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -777,7 +777,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->blocking_target == NULL) { + if (receiver->bstate.target == NULL) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); @@ -785,7 +785,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { else { receiver->argc++; - robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); @@ -793,7 +793,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->blocking_target,dobj); + dbAdd(receiver->db,receiver->bstate.target,dobj); } listTypePush(dobj,ele,REDIS_HEAD); @@ -833,7 +833,7 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->blocking_target == NULL) { + if (c->bstate.target == NULL) { /* We need to alter the command arguments before to call * popGenericCommand() as the command takes a single key. */ orig_argv = c->argv; @@ -857,8 +857,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { c->argc = orig_argc; } else { - c->argv[2] = c->blocking_target; - c->blocking_target = NULL; + c->argv[2] = c->bstate.target; + c->bstate.target = NULL; rpoplpushCommand(c); } @@ -891,7 +891,7 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->blocking_target = c->argv[2]; + c->bstate.target = c->argv[2]; c->argv[2] = c->argv[3]; c->argc--; From ba3b474111a79bfa378a8c77d89fff600cd5b23a Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 20:47:46 -0300 Subject: [PATCH 04/15] Refactor code for BRPOPLPUSH. --- src/t_list.c | 123 ++++++++++++++++++++++++--------------- tests/unit/type/list.tcl | 14 +++++ 2 files changed, 91 insertions(+), 46 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 17015acfe80a..2dedf4811a4e 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -689,7 +689,7 @@ void rpoplpushCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; @@ -697,6 +697,12 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); c->bstate.count = numkeys; c->bstate.timeout = timeout; + c->bstate.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ c->bstate.keys[j] = keys[j]; @@ -741,9 +747,15 @@ void unblockClientWaitingData(redisClient *c) { dictDelete(c->db->blocking_keys,c->bstate.keys[j]); decrRefCount(c->bstate.keys[j]); } + + if (c->bstate.target != NULL) { + decrRefCount(c->bstate.target); + } + /* Cleanup the client structure */ zfree(c->bstate.keys); c->bstate.keys = NULL; + c->bstate.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -783,8 +795,6 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyBulk(receiver,ele); } else { - receiver->argc++; - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; @@ -806,17 +816,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplyError(c,"timeout is negative"); + if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { return; } @@ -833,35 +836,27 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->bstate.target == NULL) { - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; - } - else { - c->argv[2] = c->bstate.target; - c->bstate.target = NULL; - - rpoplpushCommand(c); - } + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; return; } @@ -877,9 +872,26 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); +} + +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; } void blpopCommand(redisClient *c) { @@ -891,9 +903,28 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->bstate.target = c->argv[2]; - c->argv[2] = c->argv[3]; - c->argc--; + time_t timeout; - blockingPopGenericCommand(c,REDIS_TAIL); + if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + return; + } + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + + if (key == NULL) { + // block + if (c->flags & REDIS_MULTI) { + addReply(c,shared.nullmultibulk); + } else { + if (timeout > 0) timeout += time(NULL); + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + // The list exists and has elements. + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 85cbe88e7223..a2d0edf6d836 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -178,6 +178,20 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } + test {BRPOPLPUSH inside a transaction} { + r del xlist target + r lpush xlist foo + r lpush xlist bar + + r multi + r brpoplpush xlist target 0 + r brpoplpush xlist target 0 + r brpoplpush xlist target 0 + r lrange xlist 0 -1 + r lrange target 0 -1 + r exec + } {foo bar {} {} {bar foo}} + foreach {pop} {BLPOP BRPOP} { test "$pop: with single empty list argument" { set rd [redis_deferring_client] From 7c25a43adc67c4a8d08e930aa92f1c5575ec3646 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 10:31:02 -0300 Subject: [PATCH 05/15] Handle BRPOPLPUSH inside a transaction. --- src/t_list.c | 62 ++++++++++++++++++++++++---------------- tests/unit/type/list.tcl | 15 +++++++++- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 2dedf4811a4e..d14de708f2b5 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -790,23 +790,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { receiver = ln->value; if (receiver->bstate.target == NULL) { - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + /* BRPOP/BLPOP return a multi-bulk with the name + * of the popped list */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); } else { - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + /* BRPOPLPUSH */ + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - addReplyBulk(receiver,ele); + addReplyBulk(receiver,ele); - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->bstate.target,dobj); - } + if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db, receiver->bstate.target, dobj); + } - listTypePush(dobj,ele,REDIS_HEAD); + listTypePush(dobj, ele, REDIS_HEAD); + } } unblockClientWaitingData(receiver); @@ -880,13 +885,13 @@ int checkTimeout(redisClient *c, robj *object, time_t *timeout) { long long lltimeout; if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); - return REDIS_ERR; + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; } - + if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); - return REDIS_ERR; + addReplyError(c, "timeout is negative"); + return REDIS_ERR; } *timeout = lltimeout; @@ -911,20 +916,27 @@ void brpoplpushCommand(redisClient *c) { robj *key = lookupKeyWrite(c->db, c->argv[1]); - if (key == NULL) { - // block if (c->flags & REDIS_MULTI) { - addReply(c,shared.nullmultibulk); + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); } else { if (timeout > 0) timeout += time(NULL); + + /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } - } else if (key->type != REDIS_LIST) { - addReply(c, shared.wrongtypeerr); } else { - // The list exists and has elements. - redisAssert(listTypeLength(key) > 0); - rpoplpushCommand(c); + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } } } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index a2d0edf6d836..62ea159dd456 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -152,6 +152,19 @@ start_server { assert_equal {foo} [r lrange target 0 -1] } + test "BRPOPLPUSH with a client BLPOPing the target list" { + set rd [redis_deferring_client] + set rd2 [redis_deferring_client] + r del blist target + $rd2 blpop target 0 + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_equal foo [$rd read] + assert_equal {target foo} [$rd2 read] + assert_equal 0 [r exists target] + } + test "BRPOPLPUSH with wrong source type" { set rd [redis_deferring_client] r del blist target @@ -178,7 +191,7 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } - test {BRPOPLPUSH inside a transaction} { + test "BRPOPLPUSH inside a transaction" { r del xlist target r lpush xlist foo r lpush xlist bar From 59bd44d1c883b52507358996d1436d466c377e60 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 11:00:54 -0300 Subject: [PATCH 06/15] Remove warning. --- src/t_list.c | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index d14de708f2b5..7f12e4ac009f 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -818,6 +818,24 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { return 1; } +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; +} + /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; @@ -881,24 +899,6 @@ void blockingPopGenericCommand(redisClient *c, int where) { blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } -int checkTimeout(redisClient *c, robj *object, time_t *timeout) { - long long lltimeout; - - if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); - return REDIS_ERR; - } - - if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); - return REDIS_ERR; - } - - *timeout = lltimeout; - - return REDIS_OK; -} - void blpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_HEAD); } From e3c51c4b1bb60069bbd6552fe9109885b886aa86 Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 15:06:25 -0300 Subject: [PATCH 07/15] Rename bstate to bpop. --- src/networking.c | 10 +++++----- src/redis.h | 2 +- src/t_list.c | 38 +++++++++++++++++++------------------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/networking.c b/src/networking.c index 530e2ca87382..7d14ac53f18b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,10 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->bstate.keys = NULL; - c->bstate.count = 0; - c->bstate.timeout = 0; - c->bstate.target = NULL; + c->bpop.keys = NULL; + c->bpop.count = 0; + c->bpop.timeout = 0; + c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -679,7 +679,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->bstate.timeout != 0 && c->bstate.timeout < now) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --git a/src/redis.h b/src/redis.h index 0de94585b89c..f1142a5b34fc 100644 --- a/src/redis.h +++ b/src/redis.h @@ -326,7 +326,7 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - blockingState bstate; /* blocking state */ + blockingState bpop; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ diff --git a/src/t_list.c b/src/t_list.c index 7f12e4ac009f..f5739792aabe 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -694,10 +694,10 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj list *l; int j; - c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); - c->bstate.count = numkeys; - c->bstate.timeout = timeout; - c->bstate.target = target; + c->bpop.keys = zmalloc(sizeof(robj*)*numkeys); + c->bpop.count = numkeys; + c->bpop.timeout = timeout; + c->bpop.target = target; if (target != NULL) { incrRefCount(target); @@ -705,7 +705,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->bstate.keys[j] = keys[j]; + c->bpop.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -734,28 +734,28 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->bstate.keys != NULL); + redisAssert(c->bpop.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->bstate.count; j++) { + for (j = 0; j < c->bpop.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); + de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->bstate.keys[j]); - decrRefCount(c->bstate.keys[j]); + dictDelete(c->db->blocking_keys,c->bpop.keys[j]); + decrRefCount(c->bpop.keys[j]); } - if (c->bstate.target != NULL) { - decrRefCount(c->bstate.target); + if (c->bpop.target != NULL) { + decrRefCount(c->bpop.target); } /* Cleanup the client structure */ - zfree(c->bstate.keys); - c->bstate.keys = NULL; - c->bstate.target = NULL; + zfree(c->bpop.keys); + c->bpop.keys = NULL; + c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -789,7 +789,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->bstate.target == NULL) { + if (receiver->bpop.target == NULL) { /* BRPOP/BLPOP return a multi-bulk with the name * of the popped list */ addReplyMultiBulkLen(receiver,2); @@ -798,16 +798,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { } else { /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); - if (!handleClientsWaitingListPush(receiver, receiver->bstate.target, ele)) { + if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db, receiver->bstate.target, dobj); + dbAdd(receiver->db, receiver->bpop.target, dobj); } listTypePush(dobj, ele, REDIS_HEAD); From 8987bf23bfd7b4d8501db98f4ae7e37310f58fbd Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Tue, 9 Nov 2010 15:16:09 -0300 Subject: [PATCH 08/15] Adhere to conventions. --- src/t_list.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index f5739792aabe..0da70a040b90 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -795,8 +795,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,key); addReplyBulk(receiver,ele); - } - else { + } else { /* BRPOPLPUSH */ robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; From baa14ef913032baf34645faeae80f5cd6895f97a Mon Sep 17 00:00:00 2001 From: Michel Martens & Damian Janowski Date: Mon, 29 Nov 2010 23:47:45 -0300 Subject: [PATCH 09/15] Fix BRPOPLPUSH behavior for all use cases. --- src/t_list.c | 20 +++++++++---------- tests/unit/type/list.tcl | 43 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 0da70a040b90..ceed70c31697 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -748,10 +748,6 @@ void unblockClientWaitingData(redisClient *c) { decrRefCount(c->bpop.keys[j]); } - if (c->bpop.target != NULL) { - decrRefCount(c->bpop.target); - } - /* Cleanup the client structure */ zfree(c->bpop.keys); c->bpop.keys = NULL; @@ -789,7 +785,11 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->bpop.target == NULL) { + robj *target = receiver->bpop.target; + + unblockClientWaitingData(receiver); + + if (target == NULL) { /* BRPOP/BLPOP return a multi-bulk with the name * of the popped list */ addReplyMultiBulkLen(receiver,2); @@ -797,23 +797,23 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyBulk(receiver,ele); } else { /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); + robj *dobj = lookupKeyWrite(receiver->db,target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; addReplyBulk(receiver,ele); - if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { + if (!handleClientsWaitingListPush(receiver, target, ele)) { /* Create the list if the key does not exist */ if (!dobj) { dobj = createZiplistObject(); - dbAdd(receiver->db, receiver->bpop.target, dobj); + dbAdd(receiver->db, target, dobj); } - listTypePush(dobj, ele, REDIS_HEAD); } + + decrRefCount(target); } - unblockClientWaitingData(receiver); return 1; } diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 62ea159dd456..361644334446 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -191,6 +191,49 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } + test "linked BRPOPLPUSH" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + r del list1 list2 list3 + + $rd1 brpoplpush list1 list2 0 + $rd2 brpoplpush list2 list3 0 + + r rpush list1 foo + + assert_equal {} [r lrange list1 0 -1] + assert_equal {} [r lrange list2 0 -1] + assert_equal {foo} [r lrange list3 0 -1] + } + + test "circular BRPOPLPUSH" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + r del list1 list2 + + $rd1 brpoplpush list1 list2 0 + $rd2 brpoplpush list2 list1 0 + + r rpush list1 foo + + assert_equal {foo} [r lrange list1 0 -1] + assert_equal {} [r lrange list2 0 -1] + } + + test "self-referential BRPOPLPUSH" { + set rd [redis_deferring_client] + + r del blist + + $rd brpoplpush blist blist 0 + + r rpush blist foo + + assert_equal {foo} [r lrange blist 0 -1] + } + test "BRPOPLPUSH inside a transaction" { r del xlist target r lpush xlist foo From c8a0070a611011c706e67da52a7f17ebfc0c0c1a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 13:45:48 +0100 Subject: [PATCH 10/15] Move timeout logic --- src/t_list.c | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index ceed70c31697..a47ab65c854c 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -817,20 +817,20 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { return 1; } -int checkTimeout(redisClient *c, robj *object, time_t *timeout) { - long long lltimeout; +int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { + long tval; - if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { - addReplyError(c, "timeout is not an integer"); + if (getLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != REDIS_OK) return REDIS_ERR; - } - if (lltimeout < 0) { - addReplyError(c, "timeout is negative"); + if (tval < 0) { + addReplyError(c,"timeout is negative"); return REDIS_ERR; } - *timeout = lltimeout; + if (tval > 0) tval += time(NULL); + *timeout = tval; return REDIS_OK; } @@ -841,9 +841,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { time_t timeout; int j; - if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK) return; - } for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); @@ -894,7 +893,6 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - if (timeout > 0) timeout += time(NULL); blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } @@ -909,9 +907,8 @@ void brpopCommand(redisClient *c) { void brpoplpushCommand(redisClient *c) { time_t timeout; - if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK) return; - } robj *key = lookupKeyWrite(c->db, c->argv[1]); @@ -922,8 +919,6 @@ void brpoplpushCommand(redisClient *c) { * returns immediately. */ addReply(c, shared.nullmultibulk); } else { - if (timeout > 0) timeout += time(NULL); - /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } From 5fa95ad7639ae3f43e175d95a7d6384e4723b80e Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 14:05:01 +0100 Subject: [PATCH 11/15] Rename blpop_blocked_clients to bpop_blocked_clients --- src/redis.c | 6 +++--- src/redis.h | 2 +- src/t_list.c | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/redis.c b/src/redis.c index 58a796c037e1..8a5f9632aec7 100644 --- a/src/redis.c +++ b/src/redis.c @@ -573,7 +573,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Close connections of timedout clients */ - if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients) + if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients) closeTimedoutClients(); /* Check if a background saving or AOF rewrite in progress terminated */ @@ -759,7 +759,7 @@ void initServerConfig() { server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; - server.blpop_blocked_clients = 0; + server.bpop_blocked_clients = 0; server.maxmemory = 0; server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_samples = 3; @@ -1170,7 +1170,7 @@ sds genRedisInfoString(void) { (float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000, listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), - server.blpop_blocked_clients, + server.bpop_blocked_clients, zmalloc_used_memory(), hmem, zmalloc_get_rss(), diff --git a/src/redis.h b/src/redis.h index f1142a5b34fc..27cb82598ede 100644 --- a/src/redis.h +++ b/src/redis.h @@ -433,7 +433,7 @@ struct redisServer { int maxmemory_policy; int maxmemory_samples; /* Blocked clients */ - unsigned int blpop_blocked_clients; + unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ diff --git a/src/t_list.c b/src/t_list.c index a47ab65c854c..b46b04943706 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -725,7 +725,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj } /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; - server.blpop_blocked_clients++; + server.bpop_blocked_clients++; } /* Unblock a client that's waiting in a blocking operation such as BLPOP */ @@ -753,7 +753,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.keys = NULL; c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); - server.blpop_blocked_clients--; + server.bpop_blocked_clients--; /* We want to process data if there is some command waiting * in the input buffer. Note that this is safe even if * unblockClientWaitingData() gets called from freeClient() because From ac06fc011df598372232a5dc1805683004240c0d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 14:48:58 +0100 Subject: [PATCH 12/15] Move code for pushing on a (blocking) RPOPLPUSH --- src/t_list.c | 60 +++++++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index b46b04943706..3ce0f992ff15 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -605,19 +605,37 @@ void lremCommand(redisClient *c) { /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: - * IF LLEN(srclist) > 0 - * element = RPOP srclist - * LPUSH dstlist element - * RETURN element - * ELSE - * RETURN nil - * END + * IF LLEN(srclist) > 0 + * element = RPOP srclist + * LPUSH dstlist element + * RETURN element + * ELSE + * RETURN nil + * END * END * * The idea is to be able to get an element from a list in a reliable way * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ + +void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + if (!handleClientsWaitingListPush(c,dstkey,value)) { + /* Create the list if the key does not exist */ + if (!dstobj) { + dstobj = createZiplistObject(); + dbAdd(c->db,dstkey,dstobj); + } else { + touchWatchedKey(c->db,dstkey); + server.dirty++; + } + listTypePush(dstobj,value,REDIS_HEAD); + } + + /* Always send the pushed value to the client. */ + addReplyBulk(c,value); +} + void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -629,20 +647,7 @@ void rpoplpushCommand(redisClient *c) { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); - - /* Add the element to the target list (unless it's directly - * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],value)) { - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(c->db,c->argv[2],dobj); - } - listTypePush(dobj,value,REDIS_HEAD); - } - - /* Send the element to the client as reply as well */ - addReplyBulk(c,value); + rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); @@ -799,18 +804,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* BRPOPLPUSH */ robj *dobj = lookupKeyWrite(receiver->db,target); if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - - addReplyBulk(receiver,ele); - - if (!handleClientsWaitingListPush(receiver, target, ele)) { - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db, target, dobj); - } - listTypePush(dobj, ele, REDIS_HEAD); - } - + rpoplpushHandlePush(receiver,target,dobj,ele); decrRefCount(target); } From 8a88c368edbc12540eee3d129b8a017bd6a84cac Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:04:10 +0100 Subject: [PATCH 13/15] Check other blocked clients when value could not be pushed --- src/t_list.c | 64 +++++++++++++++++++++++++--------------- tests/unit/type/list.tcl | 14 +++++++++ 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 3ce0f992ff15..866a6a3e2919 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) { int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { struct dictEntry *de; redisClient *receiver; - list *l; + int numclients; + list *clients; listNode *ln; + robj *dstkey, *dstobj; de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; - l = dictGetEntryVal(de); - ln = listFirst(l); - redisAssert(ln != NULL); - receiver = ln->value; - - robj *target = receiver->bpop.target; - - unblockClientWaitingData(receiver); - - if (target == NULL) { - /* BRPOP/BLPOP return a multi-bulk with the name - * of the popped list */ - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - } else { - /* BRPOPLPUSH */ - robj *dobj = lookupKeyWrite(receiver->db,target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - rpoplpushHandlePush(receiver,target,dobj,ele); - decrRefCount(target); + clients = dictGetEntryVal(de); + numclients = listLength(clients); + + /* Try to handle the push as long as there are clients waiting for a push. + * Note that "numclients" is used because the list of clients waiting for a + * push on "key" is deleted by unblockClient() when empty. + * + * This loop will have more than 1 iteration when there is a BRPOPLPUSH + * that cannot push the target list because it does not contain a list. If + * this happens, it simply tries the next client waiting for a push. */ + while (numclients--) { + ln = listFirst(clients); + redisAssert(ln != NULL); + receiver = ln->value; + dstkey = receiver->bpop.target; + + /* This should remove the first element of the "clients" list. */ + unblockClientWaitingData(receiver); + redisAssert(ln != listFirst(clients)); + + if (dstkey == NULL) { + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + return 1; + } else { + /* BRPOPLPUSH */ + dstobj = lookupKeyWrite(receiver->db,dstkey); + if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { + decrRefCount(dstkey); + } else { + rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + decrRefCount(dstkey); + return 1; + } + } } - return 1; + return 0; } int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 361644334446..8ac128c5957b 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -191,6 +191,20 @@ start_server { assert_equal {foo} [r lrange blist 0 -1] } + test "BRPOPLPUSH with multiple blocked clients" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + r del blist target1 target2 + r set target1 nolist + $rd1 brpoplpush blist target1 0 + $rd2 brpoplpush blist target2 0 + r lpush blist foo + + assert_error "ERR*wrong kind*" {$rd1 read} + assert_equal {foo} [$rd2 read] + assert_equal {foo} [r lrange target2 0 -1] + } + test "linked BRPOPLPUSH" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] From ecf940141501e47dcc8dfecbc84a4e3f6ee7b0d3 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:04:42 +0100 Subject: [PATCH 14/15] Fix case and indent --- src/t_list.c | 2 +- tests/unit/type/list.tcl | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/t_list.c b/src/t_list.c index 866a6a3e2919..867e258a1e22 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -705,7 +705,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj c->bpop.target = target; if (target != NULL) { - incrRefCount(target); + incrRefCount(target); } for (j = 0; j < numkeys; j++) { diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 8ac128c5957b..6b128b726974 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -205,7 +205,7 @@ start_server { assert_equal {foo} [r lrange target2 0 -1] } - test "linked BRPOPLPUSH" { + test "Linked BRPOPLPUSH" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] @@ -221,7 +221,7 @@ start_server { assert_equal {foo} [r lrange list3 0 -1] } - test "circular BRPOPLPUSH" { + test "Circular BRPOPLPUSH" { set rd1 [redis_deferring_client] set rd2 [redis_deferring_client] @@ -236,7 +236,7 @@ start_server { assert_equal {} [r lrange list2 0 -1] } - test "self-referential BRPOPLPUSH" { + test "Self-referential BRPOPLPUSH" { set rd [redis_deferring_client] r del blist From a4ce7581553b1f4e29a7ed2141add788e56142c5 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 6 Dec 2010 16:39:39 +0100 Subject: [PATCH 15/15] Don't execute commands for clients when they are unblocked --- src/redis.c | 19 +++++++++++++++++-- src/redis.h | 1 + src/t_list.c | 7 +------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/redis.c b/src/redis.c index 8a5f9632aec7..a1653c36e458 100644 --- a/src/redis.c +++ b/src/redis.c @@ -646,15 +646,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * for ready file descriptors. */ void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + listNode *ln; + redisClient *c; /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; - listNode *ln; listRewind(server.io_ready_clients,&li); while((ln = listNext(&li))) { - redisClient *c = ln->value; + c = ln->value; struct redisCommand *cmd; /* Resume the client. */ @@ -672,6 +673,19 @@ void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + + /* Try to process pending commands for clients that were just unblocked. */ + while (listLength(server.unblocked_clients)) { + ln = listFirst(server.unblocked_clients); + redisAssert(ln != NULL); + c = ln->value; + listDelNode(server.unblocked_clients,ln); + + /* Process remaining data in the input buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) + processInputBuffer(c); + } + /* Write the AOF buffer on disk */ flushAppendOnlyFile(); } @@ -818,6 +832,7 @@ void initServer() { server.clients = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); + server.unblocked_clients = listCreate(); createSharedObjects(); server.el = aeCreateEventLoop(); server.db = zmalloc(sizeof(redisDb)*server.dbnum); diff --git a/src/redis.h b/src/redis.h index 27cb82598ede..3639f0623d5d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -435,6 +435,7 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; + list *unblocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/src/t_list.c b/src/t_list.c index 867e258a1e22..7dc3f13934aa 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -759,12 +759,7 @@ void unblockClientWaitingData(redisClient *c) { c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.bpop_blocked_clients--; - /* We want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if - * unblockClientWaitingData() gets called from freeClient() because - * freeClient() will be smart enough to call this function - * *after* c->querybuf was set to NULL. */ - if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); + listAddNodeTail(server.unblocked_clients,c); } /* This should be called from any function PUSHing into lists.