Skip to content

Commit

Permalink
Find our callback by pattern with PSUBSCRIBE
Browse files Browse the repository at this point in the history
* Use the pattern Redis provides us not the channel, if this is
a wildcard based `PSUBSCRIBE` payload.

* Don't test whether our slots match in `SSUBSCRIBE` when not in cluster
  mode.

Fixes #2395
  • Loading branch information
michael-grunder committed Sep 26, 2023
1 parent f4c2ac2 commit 2f276dc
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
27 changes: 16 additions & 11 deletions library.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,9 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
/* Multibulk response, {[pattern], type, channel, payload } */
while (redis_sock->subs[i]) {
zval z_ret, z_args[4], *z_type, *z_chan, *z_pat = NULL, *z_data;
HashTable *ht_tab;
int tab_idx = 1, is_pmsg = 0;
HashTable *ht_tab;
zend_string *zs;

ZVAL_NULL(&z_resp);
if (!redis_sock_read_multibulk_reply_zval(redis_sock, &z_resp)) {
Expand All @@ -573,22 +574,26 @@ PHP_REDIS_API int redis_subscribe_response(INTERNAL_FUNCTION_PARAMETERS,
}

// Extract pattern if it's a pmessage
if(is_pmsg) {
if ((z_pat = zend_hash_index_find(ht_tab, tab_idx++)) == NULL) {
if (is_pmsg) {
z_pat = zend_hash_index_find(ht_tab, tab_idx++);
if (z_pat == NULL || Z_TYPE_P(z_pat) != IS_STRING)
goto failure;
}
}

// Extract channel and data
if ((z_chan = zend_hash_index_find(ht_tab, tab_idx++)) == NULL ||
(z_data = zend_hash_index_find(ht_tab, tab_idx++)) == NULL
) {
/* Extract channel */
z_chan = zend_hash_index_find(ht_tab, tab_idx++);
if (z_chan == NULL || Z_TYPE_P(z_chan) != IS_STRING)
goto failure;
}

if ((cb = zend_hash_str_find_ptr(redis_sock->subs[i], Z_STRVAL_P(z_chan), Z_STRLEN_P(z_chan))) == NULL) {
/* Finally, extract data */
z_data = zend_hash_index_find(ht_tab, tab_idx++);
if (z_data == NULL)
goto failure;

/* Find our callback, either by channel or pattern string */
zs = z_pat != NULL ? Z_STR_P(z_pat) : Z_STR_P(z_chan);
if ((cb = zend_hash_find_ptr(redis_sock->subs[i], zs)) == NULL)
goto failure;
}

// Different args for SUBSCRIBE and PSUBSCRIBE
z_args[0] = *getThis();
Expand Down
2 changes: 1 addition & 1 deletion redis_commands.c
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ int redis_subscribe_cmd(INTERNAL_FUNCTION_PARAMETERS, RedisSock *redis_sock,
ZEND_HASH_FOREACH_VAL(ht_chan, z_chan) {
redis_cmd_append_sstr_key_zval(&cmdstr, z_chan, redis_sock, slot ? &s2 : NULL);

if (shardslot != REDIS_CLUSTER_SLOTS && s2 != shardslot) {
if (slot && (shardslot != REDIS_CLUSTER_SLOTS && s2 != shardslot)) {
php_error_docref(NULL, E_WARNING, "All shard channels needs to belong to a single slot");
smart_string_free(&cmdstr);
efree(sctx);
Expand Down

0 comments on commit 2f276dc

Please sign in to comment.