Skip to content

Commit

Permalink
Boost up performance for redis PUB-SUB patterns matching
Browse files Browse the repository at this point in the history
If lots of clients PSUBSCRIBE to same patterns, multiple pattens matching will take place. This commit change it into just one single pattern matching by using a `dict *` to store the unique pattern and which clients subscribe to it.
  • Loading branch information
leeyiw committed Mar 1, 2018
1 parent 3a5bf75 commit dfb12f0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
58 changes: 46 additions & 12 deletions src/pubsub.c
Expand Up @@ -125,6 +125,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {

/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
list *clients;
int retval = 0;

if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
Expand All @@ -136,6 +138,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns_dict,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
Expand All @@ -148,6 +160,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
pubsubPattern pat;
int retval = 0;
Expand All @@ -160,6 +174,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client. */
dictDelete(server.pubsub_patterns_dict,pattern);
}
}
/* Notify the client */
if (notify) {
Expand Down Expand Up @@ -225,6 +251,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;

Expand All @@ -247,25 +274,32 @@ int pubsubPublishMessage(robj *channel, robj *message) {
}
}
/* Send to clients listening to matching channels */
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
di = dictGetIterator(server.pubsub_patterns_dict);
if (di) {
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;

if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
continue;
}
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);

addReply(c,shared.mbulkhdr[4]);
addReply(c,shared.pmessagebulk);
addReplyBulk(c,pattern);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Expand Up @@ -1900,6 +1900,7 @@ void initServer(void) {
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Expand Up @@ -1163,6 +1163,7 @@ struct redisServer {
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
/* Cluster */
Expand Down

0 comments on commit dfb12f0

Please sign in to comment.