Skip to content

Commit

Permalink
ZPOP: change sync ZPOP to have a count argument instead of N keys.
Browse files Browse the repository at this point in the history
Usually blocking operations make a lot of sense with multiple keys so
that we can listen to multiple queues (or whatever the app models) with
a single connection. However in the synchronous case it is more useful
to be able to ask for N elements. This is a change that I also wanted to
perform soon or later in the blocking list variant, but here it is more
natural since there is no reply type difference.
  • Loading branch information
antirez committed May 11, 2018
1 parent 6efb6c1 commit 56bbab2
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 56 deletions.
2 changes: 1 addition & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ void handleClientsBlockedOnKeys(void) {
receiver->lastcmd->proc == bzpopminCommand)
? ZSET_MIN : ZSET_MAX;
unblockClient(receiver);
genericZpopCommand(receiver,&rl->key,1,where);
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);

propagate(where == ZSET_MIN ?
server.zpopminCommand : server.zpopmaxCommand,
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,7 @@ unsigned long zslGetRank(zskiplist *zsl, double score, sds o);
int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore);
long zsetRank(robj *zobj, sds ele, int reverse);
int zsetDel(robj *zobj, sds ele);
void genericZpopCommand(client *c, robj **keyv, int keyc, int reverse);
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg);
sds ziplistGetObject(unsigned char *sptr);
int zslValueGteMin(double value, zrangespec *spec);
int zslValueLteMax(double value, zrangespec *spec);
Expand Down
147 changes: 93 additions & 54 deletions src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -3070,13 +3070,28 @@ void zscanCommand(client *c) {
}

/* This command implements the generic zpop operation, used by:
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX */
void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
* ZPOPMIN, ZPOPMAX, BZPOPMIN and BZPOPMAX. This function is also used
* inside blocked.c in the unblocking stage of BZPOPMIN and BZPOPMAX.
*
* If 'emitkey' is true also the key name is emitted, useful for the blocking
* behavior of BZPOP[MIN|MAX], since we can block into multiple keys.
*
* The synchronous version instead does not need to emit the key, but may
* use the 'count' argument to return multiple items if available. */
void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey, robj *countarg) {
int idx;
robj *key;
robj *zobj;
sds ele;
double score;
long count = 1;

/* If a count argument as passed, parse it or return an error. */
if (countarg) {
if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK)
return;
if (count < 0) count = 1;
}

/* Check type and break on the first error, otherwise identify candidate. */
idx = 0;
Expand All @@ -3094,70 +3109,94 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where) {
return;
}

if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
void *arraylen_ptr = addDeferredMultiBulkLength(c);
long arraylen = 0;

/* Get the first or last element in the sorted set. */
eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr,vlen);
/* We emit the key only for the blocking variant. */
if (emitkey) addReplyBulk(c,key);

/* Get the score. */
sptr = ziplistNext(zl,eptr);
serverAssertWithInfo(c,zobj,sptr != NULL);
score = zzlGetScore(sptr);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zln;
/* Remove the element. */
do {
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;

// Get the first or last element in the sorted set
zln = (where == ZSET_MAX ? zsl->tail : zsl->header->level[0].forward);
/* Get the first or last element in the sorted set. */
eptr = ziplistIndex(zl,where == ZSET_MAX ? -2 : 0);
serverAssertWithInfo(c,zobj,eptr != NULL);
serverAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen(vstr,vlen);

// There must be an element in the sorted set
serverAssertWithInfo(c,zobj,zln != NULL);
ele = sdsdup(zln->ele);
score = zln->score;
} else {
serverPanic("Unknown sorted set encoding");
}
/* Get the score. */
sptr = ziplistNext(zl,eptr);
serverAssertWithInfo(c,zobj,sptr != NULL);
score = zzlGetScore(sptr);
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *zln;

/* Get the first or last element in the sorted set. */
zln = (where == ZSET_MAX ? zsl->tail :
zsl->header->level[0].forward);

/* There must be an element in the sorted set. */
serverAssertWithInfo(c,zobj,zln != NULL);
ele = sdsdup(zln->ele);
score = zln->score;
} else {
serverPanic("Unknown sorted set encoding");
}

// Remove the element
serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
server.dirty++;
signalModifiedKey(c->db,key);
char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
serverAssertWithInfo(c,zobj,zsetDel(zobj,ele));
server.dirty++;

// Remove the key, if indeed needed
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
}
if (arraylen == 0) { /* Do this only for the first iteration. */
char *events[2] = {"zpopmin","zpopmax"};
notifyKeyspaceEvent(NOTIFY_ZSET,events[where],key,c->db->id);
signalModifiedKey(c->db,key);
}

addReplyMultiBulkLen(c,3);
addReplyBulk(c,key);
addReplyDouble(c,score);
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
addReplyDouble(c,score);
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
arraylen += 2;

/* Remove the key, if indeed needed. */
if (zsetLength(zobj) == 0) {
dbDelete(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
break;
}
} while(--count);

setDeferredMultiBulkLength(c,arraylen_ptr,arraylen + (emitkey != 0));
}

// ZPOPMIN key [key ...]
/* ZPOPMIN key [<count>] */
void zpopminCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN);
if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MIN,0,
c->argc == 3 ? c->argv[2] : NULL);
}

// ZMAXPOP key [key ...]
/* ZMAXPOP key [<count>] */
void zpopmaxCommand(client *c) {
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX);
if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}
genericZpopCommand(c,&c->argv[1],c->argc-1,ZSET_MAX,0,
c->argc == 3 ? c->argv[2] : NULL);
}

/* BZPOPMIN / BZPOPMAX actual implementation. */
Expand All @@ -3178,7 +3217,7 @@ void blockingGenericZpopCommand(client *c, int where) {
} else {
if (zsetLength(o) != 0) {
/* Non empty zset, this is like a normal Z[REV]POP. */
genericZpopCommand(c,&c->argv[j],1,where);
genericZpopCommand(c,&c->argv[j],1,where,1,NULL);
/* Replicate it as an Z[REV]POP instead of BZ[REV]POP. */
rewriteClientCommandVector(c,2,
where == ZSET_MAX ? shared.zpopmax : shared.zpopmin,
Expand Down

0 comments on commit 56bbab2

Please sign in to comment.