diff --git a/redis-cli.c b/redis-cli.c index 3f63aeaa6658..5760da15a803 100644 --- a/redis-cli.c +++ b/redis-cli.c @@ -159,6 +159,9 @@ static struct redisCommand cmdTable[] = { {"hgetall",2,REDIS_CMD_INLINE}, {"hexists",3,REDIS_CMD_BULK}, {"config",-2,REDIS_CMD_BULK}, + {"subscribe",-2,REDIS_CMD_INLINE}, + {"unsubscribe",-1,REDIS_CMD_INLINE}, + {"publish",3,REDIS_CMD_BULK}, {NULL,0,0} }; diff --git a/redis.c b/redis.c index e84ae6b5fad6..44db9d683681 100644 --- a/redis.c +++ b/redis.c @@ -327,6 +327,7 @@ typedef struct redisClient { * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + dict *pubsub_classes; /* Classes a client is interested in (SUBSCRIBE) */ } redisClient; struct saveparam { @@ -435,6 +436,9 @@ struct redisServer { unsigned long long vm_stats_swapped_objects; unsigned long long vm_stats_swapouts; unsigned long long vm_stats_swapins; + /* Pubsub */ + dict *pubsub_classes; /* Associate classes to list of subscribed clients */ + /* Misc */ FILE *devnull; }; @@ -501,7 +505,8 @@ struct sharedObjectsStruct { *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, - *select5, *select6, *select7, *select8, *select9; + *select5, *select6, *select7, *select8, *select9, + *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3; } shared; /* Global vars that are actally used as constants. The following double @@ -601,6 +606,8 @@ static struct redisCommand *lookupCommand(char *name); static void call(redisClient *c, struct redisCommand *cmd); static void resetClient(redisClient *c); static void convertToRealHash(robj *o); +static int pubsubUnsubscribeAll(redisClient *c, int notify); +static void usage(); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -698,6 +705,9 @@ static void hgetallCommand(redisClient *c); static void hexistsCommand(redisClient *c); static void configCommand(redisClient *c); static void hincrbyCommand(redisClient *c); +static void subscribeCommand(redisClient *c); +static void unsubscribeCommand(redisClient *c); +static void publishCommand(redisClient *c); /*================================= Globals ================================= */ @@ -801,11 +811,12 @@ static struct redisCommand cmdTable[] = { {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, + {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, + {"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0}, {NULL,NULL,0,0,NULL,0,0,0} }; -static void usage(); - /*============================ Utility functions ============================ */ /* Glob-style pattern matching. */ @@ -1473,6 +1484,10 @@ static void createSharedObjects(void) { shared.select7 = createStringObject("select 7\r\n",10); shared.select8 = createStringObject("select 8\r\n",10); shared.select9 = createStringObject("select 9\r\n",10); + shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); + shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); + shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",17); + shared.mbulk3 = createStringObject("*3\r\n",4); } static void appendServerSaveParams(time_t seconds, int changes) { @@ -1576,6 +1591,7 @@ static void initServer() { server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } + server.pubsub_classes = dictCreate(&keylistDictType,NULL); server.cronloops = 0; server.bgsavechildpid = -1; server.bgrewritechildpid = -1; @@ -1839,6 +1855,10 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* Unsubscribe from all the pubsub classes */ + pubsubUnsubscribeAll(c,0); + dictRelease(c->pubsub_classes); + /* Obvious cleanup */ aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); listRelease(c->reply); @@ -1861,7 +1881,7 @@ static void freeClient(redisClient *c) { dontWaitForSwappedKey(c,ln->value); } listRelease(c->io_keys); - /* Other cleanup */ + /* Master/slave cleanup */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); @@ -1874,6 +1894,7 @@ static void freeClient(redisClient *c) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; } + /* Release memory */ zfree(c->argv); zfree(c->mbargv); freeClientMultiState(c); @@ -2480,6 +2501,7 @@ static redisClient *createClient(int fd) { c->blockingkeys = NULL; c->blockingkeysnum = 0; c->io_keys = listCreate(); + c->pubsub_classes = dictCreate(&setDictType,NULL); listSetFreeMethod(c->io_keys,decrRefCount); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c) == AE_ERR) { @@ -9236,6 +9258,132 @@ static void configCommand(redisClient *c) { (char*) c->argv[1]->ptr)); } +/* =========================== Pubsub implementation ======================== */ + +/* Subscribe a client to a class. Returns 1 if the operation succeeded, or + * 0 if the client was already subscribed to that class. */ +static int pubsubSubscribe(redisClient *c, robj *class) { + struct dictEntry *de; + list *clients = NULL; + int retval = 0; + + /* Add the class to the client -> classes hash table */ + if (dictAdd(c->pubsub_classes,class,NULL) == DICT_OK) { + retval = 1; + incrRefCount(class); + /* Add the client to the class -> list of clients hash table */ + de = dictFind(server.pubsub_classes,class); + if (de == NULL) { + clients = listCreate(); + dictAdd(server.pubsub_classes,class,clients); + incrRefCount(class); + } else { + clients = dictGetEntryVal(de); + } + listAddNodeTail(clients,c); + } + /* Notify the client */ + addReply(c,shared.mbulk3); + addReply(c,shared.subscribebulk); + addReplyBulk(c,class); + addReplyLong(c,dictSize(c->pubsub_classes)); + return retval; +} + +/* Unsubscribe a client from a class. Returns 1 if the operation succeeded, or + * 0 if the client was not subscribed to the specified class. */ +static int pubsubUnsubscribe(redisClient *c, robj *class, int notify) { + struct dictEntry *de; + list *clients; + listNode *ln; + int retval = 0; + + /* Remove the class from the client -> classes hash table */ + if (dictDelete(c->pubsub_classes,class) == DICT_OK) { + retval = 1; + /* Remove the client from the class -> clients list hash table */ + de = dictFind(server.pubsub_classes,class); + assert(de != NULL); + clients = dictGetEntryVal(de); + ln = listSearchKey(clients,c); + assert(ln != NULL); + listDelNode(clients,ln); + } + /* Notify the client */ + if (notify) { + addReply(c,shared.mbulk3); + addReply(c,shared.unsubscribebulk); + addReplyBulk(c,class); + addReplyLong(c,dictSize(c->pubsub_classes)); + } + return retval; +} + +/* Unsubscribe from all the classes. Return the number of classes the + * client was subscribed to. */ +static int pubsubUnsubscribeAll(redisClient *c, int notify) { + dictIterator *di = dictGetIterator(c->pubsub_classes); + dictEntry *de; + int count = 0; + + while((de = dictNext(di)) != NULL) { + robj *class = dictGetEntryKey(de); + + count += pubsubUnsubscribe(c,class,notify); + } + dictReleaseIterator(di); + return count; +} + +/* Publish a message */ +static int pubsubPublishMessage(robj *class, robj *message) { + int receivers = 0; + struct dictEntry *de; + + de = dictFind(server.pubsub_classes,class); + if (de) { + list *list = dictGetEntryVal(de); + listNode *ln; + listIter li; + + listRewind(list,&li); + while ((ln = listNext(&li)) != NULL) { + redisClient *c = ln->value; + + addReply(c,shared.mbulk3); + addReply(c,shared.messagebulk); + addReplyBulk(c,class); + addReplyBulk(c,message); + receivers++; + } + } + return receivers; +} + +static void subscribeCommand(redisClient *c) { + int j; + + for (j = 1; j < c->argc; j++) + pubsubSubscribe(c,c->argv[j]); +} + +static void unsubscribeCommand(redisClient *c) { + if (c->argc == 1) { + pubsubUnsubscribeAll(c,1); + return; + } else { + int j; + + for (j = 1; j < c->argc; j++) + pubsubUnsubscribe(c,c->argv[j],1); + } +} + +static void publishCommand(redisClient *c) { + int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); + addReplyLong(c,receivers); +} + /* ================================= Debugging ============================== */ static void debugCommand(redisClient *c) {