Skip to content

Commit

Permalink
Implements [B]Z[REV]POP and the respective unit tests
Browse files Browse the repository at this point in the history
An implementation of the
[Ze POP Redis Module](https://github.com/itamarhaber/zpop) as core
Redis commands.

Fixes #1861.
  • Loading branch information
itamarhaber committed Apr 29, 2018
1 parent e6b0e8d commit 438125b
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 20 deletions.
70 changes: 56 additions & 14 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ void processUnblockedClients(void) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
Expand All @@ -162,7 +164,9 @@ void unblockClient(client *c) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_STREAM) {
if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
Expand Down Expand Up @@ -244,7 +248,7 @@ void handleClientsBlockedOnKeys(void) {
client *receiver = clientnode->value;

if (receiver->btype != BLOCKED_LIST) {
/* Put on the tail, so that at the next call
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
Expand Down Expand Up @@ -289,6 +293,43 @@ void handleClientsBlockedOnKeys(void) {
* when an element was pushed on the list. */
}

/* Serve clients blocked on sorted set key. */
else if (o != NULL && o->type == OBJ_ZSET) {
dictEntry *de;

/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);

while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;

if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listDelNode(clients,clientnode);
listAddNodeTail(clients,receiver);
continue;
}

int reverse = (receiver->lastcmd &&
receiver->lastcmd->proc == bzpopCommand) ?
0 : 1;
unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,reverse);

propagate(reverse ?
server.zrevpopCommand : server.zpopCommand,
receiver->db->id,receiver->argv,receiver->argc,
PROPAGATE_AOF|PROPAGATE_REPL);
}
}
}

/* Serve clients blocked on stream key. */
else if (o != NULL && o->type == OBJ_STREAM) {
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
Expand Down Expand Up @@ -371,8 +412,9 @@ void handleClientsBlockedOnKeys(void) {
}
}

/* This is how the current blocking lists/streams work, we use BLPOP as
* example, but the concept is the same for other list ops and XREAD.
/* This is how the current blocking lists/sorted sets/streams work, we use
* BLPOP as example, but the concept is the same for other list ops, sorted
* sets and XREAD.
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
* if blocking is not required.
Expand All @@ -389,14 +431,14 @@ void handleClientsBlockedOnKeys(void) {
* to the number of elements we have in the ready list.
*/

/* Set a client in blocking mode for the specified key (list or stream), with
* the specified timeout. The 'type' argument is BLOCKED_LIST or BLOCKED_STREAM
* depending on the kind of operation we are waiting for an empty key in
* order to awake the client. The client is blocked for all the 'numkeys'
* keys as in the 'keys' argument. When we block for stream keys, we also
* provide an array of streamID structures: clients will be unblocked only
* when items with an ID greater or equal to the specified one is appended
* to the stream. */
/* Set a client in blocking mode for the specified key (list, zset or stream),
* with the specified timeout. The 'type' argument is BLOCKED_LIST,
* BLOCKED_ZSET or BLOCKED_STREAM depending on the kind of operation we are
* waiting for an empty key in order to awake the client. The client is blocked
* for all the 'numkeys' keys as in the 'keys' argument. When we block for
* stream keys, we also provide an array of streamID structures: clients will
* be unblocked only when items with an ID greater or equal to the specified
* one is appended to the stream. */
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
dictEntry *de;
list *l;
Expand All @@ -409,7 +451,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo

for (j = 0; j < numkeys; j++) {
/* The value associated with the key name in the bpop.keys dictionary
* is NULL for lists, or the stream ID for streams. */
* is NULL for lists and sorted sets, or the stream ID for streams. */
void *key_data = NULL;
if (btype == BLOCKED_STREAM) {
key_data = zmalloc(sizeof(streamID));
Expand Down
4 changes: 3 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
int retval = dictAdd(db->dict, copy, val);

serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST) signalKeyAsReady(db, key);
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}

Expand Down
8 changes: 8 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ struct redisCommand redisCommandTable[] = {
{"zrank",zrankCommand,3,"rF",0,NULL,1,1,1,0,0},
{"zrevrank",zrevrankCommand,3,"rF",0,NULL,1,1,1,0,0},
{"zscan",zscanCommand,-3,"rR",0,NULL,1,1,1,0,0},
{"zpop",zpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"zrevpop",zrevpopCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"bzpop",bzpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"bzrevpop",bzrevpopCommand,-2,"wsF",0,NULL,1,-2,1,0,0},
{"hset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
{"hsetnx",hsetnxCommand,4,"wmF",0,NULL,1,1,1,0,0},
{"hget",hgetCommand,3,"rF",0,NULL,1,1,1,0,0},
Expand Down Expand Up @@ -1369,6 +1373,8 @@ void createSharedObjects(void) {
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
shared.lpush = createStringObject("LPUSH",5);
shared.zpop = createStringObject("ZPOP",4);
shared.zrevpop = createStringObject("ZREVPOP",7);
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
Expand Down Expand Up @@ -1562,6 +1568,8 @@ void initServerConfig(void) {
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.zpopCommand = lookupCommandByCString("zpop");
server.zrevpopCommand = lookupCommandByCString("zrevpop");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
Expand Down
16 changes: 11 additions & 5 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define BLOCKED_WAIT 2 /* WAIT for synchronous replication. */
#define BLOCKED_MODULE 3 /* Blocked by a loadable module. */
#define BLOCKED_STREAM 4 /* XREAD. */
#define BLOCKED_NUM 5 /* Number of blocked states. */
#define BLOCKED_ZSET 5 /* BZPOP et al. */
#define BLOCKED_NUM 6 /* Number of blocked states. */

/* Client request types */
#define PROTO_REQ_INLINE 1
Expand Down Expand Up @@ -646,7 +647,7 @@ typedef struct blockingState {
mstime_t timeout; /* Blocking operation timeout. If UNIX current time
* is > timeout then the operation timed out. */

/* BLOCKED_LIST and BLOCKED_STREAM */
/* BLOCKED_LIST, BLOCKED_ZSET and BLOCKED_STREAM */
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP or XREAD. Or NULL. */
robj *target; /* The key that should receive the element,
Expand Down Expand Up @@ -762,7 +763,7 @@ struct sharedObjectsStruct {
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *emptyscan,
*rpop, *lpop, *lpush, *zpop, *zrevpop, *emptyscan,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
Expand Down Expand Up @@ -960,8 +961,8 @@ struct redisServer {
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand;
*rpopCommand, *zpopCommand, *zrevpopCommand, *sremCommand,
*execCommand, *expireCommand, *pexpireCommand, *xclaimCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
Expand Down Expand Up @@ -1628,6 +1629,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
long zsetRank(robj *zobj, sds ele, int reverse);
int zsetDel(robj *zobj, sds ele);
void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse);
sds ziplistGetObject(unsigned char *sptr);
int zslValueGteMin(double value, zrangespec *spec);
int zslValueLteMax(double value, zrangespec *spec);
Expand Down Expand Up @@ -1968,6 +1970,10 @@ void zremCommand(client *c);
void zscoreCommand(client *c);
void zremrangebyscoreCommand(client *c);
void zremrangebylexCommand(client *c);
void zpopCommand(client *c);
void zrevpopCommand(client *c);
void bzpopCommand(client *c);
void bzrevpopCommand(client *c);
void multiCommand(client *c);
void execCommand(client *c);
void discardCommand(client *c);
Expand Down
144 changes: 144 additions & 0 deletions src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -3068,3 +3068,147 @@ void zscanCommand(client *c) {
checkType(c,o,OBJ_ZSET)) return;
scanGenericCommand(c,o,cursor);
}

/* This command implements the generic zpop operation, used by:
* ZPOP, ZREVPOP, BZPOP and BZREVPOP */
void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse) {
int idx;
robj *key;
robj *zobj;
sds ele;
double score;
char *events[2] = {"zpop","zrevpop"};

// Check type and break on the first error, otherwise identify candidate
idx = 0;
while (idx < keyc) {
key = keyv[idx++];
zobj = lookupKeyWrite(c->db,key);
if (!zobj) continue;
if (checkType(c,zobj,OBJ_ZSET)) return;
break;
}

// No candidate for zpopping, return empty
if (!zobj) {
addReply(c,shared.emptymultibulk);
return;
}

if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;

// Get the first or last element in the sorted set
eptr = ziplistIndex(zl,reverse ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);

// There must be an element in the sorted set
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr,vlen);

// Get the score
sptr = ziplistNext(zl,eptr);
serverAssertWithInfo(c,zobj,sptr != NULL);
score = zzlGetScore(sptr);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;

// Get the first or last element in the sorted set
ln = (reverse ? zsl->tail : zsl->header->level[0].forward);

// There must be an element in the sorted set
serverAssertWithInfo(c,zobj,ln != NULL);
ele = sdsdup(ln->ele);
score = ln->score;
} else {
serverPanic("Unknown sorted set encoding");
}

// Remove the element
serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
server.dirty++;
signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,events[reverse],key,c->db->id);

// Remove the key, if indeed needed
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
}

addReplyMultiBulkLen(c,3);
addReplyBulk(c,key);
addReplyDouble(c,score);
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
}

// ZPOP key [key ...]
void zpopCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,0);
}

// ZREVPOP key [key ...]
void zrevpopCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,1);
}

/* Blocking Z[REV]POP */
void blockingGenericZpopCommand(client *c, int reverse) {
robj *o;
mstime_t timeout;
int j;

if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;

for (j = 1; j < c->argc-1; j++) {
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
if (o->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal Z[REV]POP. */
genericZpopCommand(c,&c->argv[j],1,reverse);
/* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
rewriteClientCommandVector(c,2,
reverse ? shared.zrevpop : shared.zpop,
c->argv[j]);
return;
}
}
}
}

/* If we are inside a MULTI/EXEC and the zset is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}

/* If the keys do not exist we must block */
blockForKeys(c,BLOCKED_ZSET,c->argv + 1,c->argc - 2,timeout,NULL,NULL);
}

// BZPOP key [key ...] timeout
void bzpopCommand(client *c) {
blockingGenericZpopCommand(c,0);
}

// BZREVPOP key [key ...] timeout
void bzrevpopCommand(client *c) {
blockingGenericZpopCommand(c,1);
}

0 comments on commit 438125b

Please sign in to comment.