Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds lpoprpush, rpoprpush and lpoplpush commands #2664

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ struct redisCommand redisCommandTable[] = {
{"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
{"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
{"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
{"blpoprpush",blpoprpushCommand,4,"wms",0,NULL,1,2,1,0,0},
{"brpoprpush",brpoprpushCommand,4,"wms",0,NULL,1,2,1,0,0},
{"blpoplpush",blpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
{"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
{"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
Expand All @@ -158,6 +161,9 @@ struct redisCommand redisCommandTable[] = {
{"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
{"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
{"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
{"lpoprpush",lpoprpushCommand,3,"wm",0,NULL,1,2,1,0,0},
{"rpoprpush",rpoprpushCommand,3,"wm",0,NULL,1,2,1,0,0},
{"lpoplpush",lpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
{"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
{"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
Expand Down
9 changes: 8 additions & 1 deletion src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ typedef struct blockingState {
dict *keys; /* The keys we are waiting to terminate a blocking
* operation such as BLPOP. Otherwise NULL. */
robj *target; /* The key that should receive the element,
* for BRPOPLPUSH. */
* for BRPOPLPUSH, BLPOPRPUSH, BRPOPRPUSH and
* BLPOPLPUSH. */

/* REDIS_BLOCK_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
Expand Down Expand Up @@ -1471,6 +1472,9 @@ void flushallCommand(redisClient *c);
void sortCommand(redisClient *c);
void lremCommand(redisClient *c);
void rpoplpushCommand(redisClient *c);
void lpoprpushCommand(redisClient *c);
void rpoprpushCommand(redisClient *c);
void lpoplpushCommand(redisClient *c);
void infoCommand(redisClient *c);
void mgetCommand(redisClient *c);
void monitorCommand(redisClient *c);
Expand Down Expand Up @@ -1508,6 +1512,9 @@ void discardCommand(redisClient *c);
void blpopCommand(redisClient *c);
void brpopCommand(redisClient *c);
void brpoplpushCommand(redisClient *c);
void blpoprpushCommand(redisClient *c);
void brpoprpushCommand(redisClient *c);
void blpoplpushCommand(redisClient *c);
void appendCommand(redisClient *c);
void strlenCommand(redisClient *c);
void zrankCommand(redisClient *c);
Expand Down
93 changes: 66 additions & 27 deletions src/t_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,23 +517,8 @@ void lremCommand(redisClient *c) {
if (removed) signalModifiedKey(c->db,c->argv[1]);
}

/* This is the semantic of this command:
* RPOPLPUSH srclist dstlist:
* IF LLEN(srclist) > 0
* element = RPOP srclist
* LPUSH dstlist element
* RETURN element
* ELSE
* RETURN nil
* END
* END
*
* The idea is to be able to get an element from a list in a reliable way
* since the element is not just returned but pushed against another list
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/

void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
void poppushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value, int where) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
Expand All @@ -542,13 +527,15 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dbAdd(c->db,dstkey,dstobj);
}
signalModifiedKey(c->db,dstkey);
listTypePush(dstobj,value,REDIS_HEAD);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"lpush",dstkey,c->db->id);
listTypePush(dstobj,value,where);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,
where == REDIS_HEAD ? "lpush" : "rpush",
dstkey,c->db->id);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}

void rpoplpushCommand(redisClient *c) {
void poppushGenericCommand(redisClient *c, int wherefrom, int whereto) {
robj *sobj, *value;
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,REDIS_LIST)) return;
Expand All @@ -562,18 +549,20 @@ void rpoplpushCommand(redisClient *c) {
robj *touchedkey = c->argv[1];

if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
value = listTypePop(sobj,wherefrom);
/* We saved touched key, and protect it, since poppushHandlePush
* may change the client command argument vector (it does not
* currently). */
incrRefCount(touchedkey);
rpoplpushHandlePush(c,c->argv[2],dobj,value);
poppushHandlePush(c,c->argv[2],dobj,value,whereto);

/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);

/* Delete the source list when it is empty */
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"rpop",touchedkey,c->db->id);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,
wherefrom == REDIS_HEAD ? "lpop" : "rpop",
touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
Expand All @@ -585,6 +574,40 @@ void rpoplpushCommand(redisClient *c) {
}
}



/* This is the semantic of this command:
* RPOPLPUSH srclist dstlist:
* IF LLEN(srclist) > 0
* element = RPOP srclist
* LPUSH dstlist element
* RETURN element
* ELSE
* RETURN nil
* END
* END
*
* The idea is to be able to get an element from a list in a reliable way
* since the element is not just returned but pushed against another list
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/

void rpoplpushCommand(redisClient *c) {
poppushGenericCommand(c, REDIS_TAIL, REDIS_HEAD);
}

void lpoprpushCommand(redisClient *c) {
poppushGenericCommand(c, REDIS_HEAD, REDIS_TAIL);
}

void rpoprpushCommand(redisClient *c) {
poppushGenericCommand(c, REDIS_TAIL, REDIS_TAIL);
}

void lpoplpushCommand(redisClient *c) {
poppushGenericCommand(c, REDIS_HEAD, REDIS_HEAD);
}

/*-----------------------------------------------------------------------------
* Blocking POP operations
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -752,8 +775,8 @@ int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, red
db->id,argv,2,
REDIS_PROPAGATE_AOF|
REDIS_PROPAGATE_REPL);
rpoplpushHandlePush(receiver,dstkey,dstobj,
value);
poppushHandlePush(receiver,dstkey,dstobj,
value,REDIS_HEAD);
/* Propagate the LPUSH operation. */
argv[0] = shared.lpush;
argv[1] = dstkey;
Expand Down Expand Up @@ -927,7 +950,7 @@ void brpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_TAIL);
}

void brpoplpushCommand(redisClient *c) {
void blockingPopPushGenericCommand(redisClient *c, int wherefrom, int whereto) {
mstime_t timeout;

if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
Expand All @@ -951,7 +974,23 @@ void brpoplpushCommand(redisClient *c) {
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
redisAssertWithInfo(c,key,listTypeLength(key) > 0);
rpoplpushCommand(c);
poppushGenericCommand(c,wherefrom,whereto);
}
}
}

void brpoplpushCommand(redisClient *c) {
blockingPopPushGenericCommand(c,REDIS_TAIL,REDIS_HEAD);
}

void blpoprpushCommand(redisClient *c) {
blockingPopPushGenericCommand(c,REDIS_HEAD,REDIS_TAIL);
}

void brpoprpushCommand(redisClient *c) {
blockingPopPushGenericCommand(c,REDIS_TAIL,REDIS_TAIL);
}

void blpoplpushCommand(redisClient *c) {
blockingPopPushGenericCommand(c,REDIS_HEAD,REDIS_HEAD);
}