diff --git a/src/modules/db_redis/Makefile b/src/modules/db_redis/Makefile index ca8020d5439..179bc6f4068 100644 --- a/src/modules/db_redis/Makefile +++ b/src/modules/db_redis/Makefile @@ -10,6 +10,12 @@ HIREDIS_BUILDER = $(shell \ if pkg-config --exists hiredis; then \ echo 'pkg-config hiredis'; \ fi) + +HIREDIS_CLUSTER_BUILDER = $(shell \ + if pkg-config --exists hiredis_cluster; then \ + echo 'pkg-config hiredis_cluster'; \ + fi) + endif ifeq ($(HIREDIS_BUILDER),) @@ -33,6 +39,14 @@ endif DEFS+=$(HIREDISDEFS) LIBS=$(HIREDISLIBS) +ifneq ($(HIREDIS_CLUSTER_BUILDER),) + HIREDISCLUSTERDEFS = $(shell $(HIREDIS_CLUSTER_BUILDER) --cflags) + HIREDISCLUSTERLIBS = $(shell $(HIREDIS_CLUSTER_BUILDER) --libs) + DEFS+=-DWITH_HIREDIS_CLUSTER + DEFS+=$(HIREDISCLUSTERDEFS) + LIBS+=$(HIREDISCLUSTERLIBS) +endif + SERLIBPATH=../../lib SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1 diff --git a/src/modules/db_redis/doc/db_redis_admin.xml b/src/modules/db_redis/doc/db_redis_admin.xml index 6439c79b6f8..b3b54c08a20 100644 --- a/src/modules/db_redis/doc/db_redis_admin.xml +++ b/src/modules/db_redis/doc/db_redis_admin.xml @@ -146,6 +146,17 @@ location=entry:ruid&usrdom:username,domain&timer:partition,keepalive;acc + + The following library is an optional dependency to support redis cluster protocol: + + + + hiredis-cluster - available at + https://github.com/Nordix/hiredis-cluster + + + + @@ -216,6 +227,10 @@ modparam("db_redis", "verbosity", 0) 'redis://[username]@host:port/database'. Username is optional. The database portion must be a valid Redis database number. + + For cluster support you need to set the "db_url" modparam with a comma separated list of cluster hosts: + 'redis://host1:port1,host2:port2/'. The database portion is not supported in cluster mode. + Usage diff --git a/src/modules/db_redis/redis_connection.c b/src/modules/db_redis/redis_connection.c index dd421a78440..1166117961c 100644 --- a/src/modules/db_redis/redis_connection.c +++ b/src/modules/db_redis/redis_connection.c @@ -25,6 +25,15 @@ #include "redis_table.h" #include "redis_dbase.h" +#ifdef WITH_HIREDIS_CLUSTER +static unsigned int MAX_URL_LENGTH = 1023; +#define redisCommand redisClusterCommand +#define redisFree redisClusterFree +#define redisCommandArgv redisClusterCommandArgv +#define redisAppendCommandArgv redisClusterAppendCommandArgv +#define redisGetReply redisClusterGetReply +#endif + extern int db_redis_verbosity; static void print_query(redis_key_t *query) { @@ -104,21 +113,59 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) { int db_redis_connect(km_redis_con_t *con) { struct timeval tv; redisReply *reply; +#ifndef WITH_HIREDIS_CLUSTER int db; +#endif tv.tv_sec = 1; tv.tv_usec = 0; - +#ifndef WITH_HIREDIS_CLUSTER db = atoi(con->id->database); +#endif reply = NULL; // TODO: introduce require_master mod-param and check if we're indeed master // TODO: on carrier, if we have db fail-over, the currently connected // redis server will become slave without dropping connections? - +#ifdef WITH_HIREDIS_CLUSTER + int status; + char hosts[MAX_URL_LENGTH]; + char* host_begin; + char* host_end; + LM_DBG("connecting to redis cluster at %.*s\n", con->id->url.len, con->id->url.s); + host_begin = strstr(con->id->url.s, "redis://"); + if (host_begin) { + host_begin += 8; + } else { + LM_ERR("invalid url scheme\n"); + goto err; + } + host_end = strstr(host_begin, "/"); + if (! host_end) { + LM_ERR("invalid url: cannot find end of host part\n"); + goto err; + } + if ((host_end - host_begin) > (MAX_URL_LENGTH-1)) { + LM_ERR("url too long\n"); + goto err; + } + strncpy(hosts, host_begin, (host_end - host_begin)); + hosts[MAX_URL_LENGTH-1] = '\0'; + con->con = redisClusterContextInit(); + if (! con->con) { + LM_ERR("no private memory left\n"); + goto err; + } + redisClusterSetOptionAddNodes(con->con, hosts); + redisClusterSetOptionConnectTimeout(con->con, tv); + status = redisClusterConnect2(con->con); + if (status != REDIS_OK) { + LM_ERR("cannot open connection to cluster with hosts: %s, error: %s\n", hosts, con->con->errstr); + goto err; + } +#else LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port); con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv); - if (!con->con) { LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s); goto err; @@ -128,6 +175,7 @@ int db_redis_connect(km_redis_con_t *con) { con->con->errstr); goto err; } +#endif if (con->id->password) { reply = redisCommand(con->con, "AUTH %s", con->id->password); @@ -144,6 +192,7 @@ int db_redis_connect(km_redis_con_t *con) { freeReplyObject(reply); reply = NULL; } +#ifndef WITH_HIREDIS_CLUSTER reply = redisCommand(con->con, "PING"); if (!reply) { LM_ERR("cannot ping server on connection %.*s: %s\n", @@ -169,8 +218,10 @@ int db_redis_connect(km_redis_con_t *con) { goto err; } freeReplyObject(reply); reply = NULL; +#endif LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s); +#ifndef WITH_HIREDIS_CLUSTER reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA); if (!reply) { LM_ERR("failed to load LUA script to server %.*s: %s\n", @@ -194,6 +245,7 @@ int db_redis_connect(km_redis_con_t *con) { } strcpy(con->srem_key_lua, reply->str); freeReplyObject(reply); reply = NULL; +#endif LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s); return 0; @@ -291,6 +343,63 @@ void db_redis_free_connection(struct pool_con* con) { pkg_free(_c); } +#ifdef WITH_HIREDIS_CLUSTER +void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node) { + char **argv = NULL; + int argc; + #define MAX_CMD_LENGTH 256 + char cmd[MAX_CMD_LENGTH] = ""; + size_t cmd_len = MAX_CMD_LENGTH - 1; + int i; + + print_query(query); + + argc = db_redis_key_list2arr(query, &argv); + if (argc < 0) { + LM_ERR("Failed to allocate memory for query array\n"); + return NULL; + } + LM_DBG("query has %d args\n", argc); + + for (i=0; i cmd_len) + break; + strncat(cmd, argv[i], cmd_len); + cmd_len = cmd_len - arg_len; + if (cmd_len == 0) + break; + + if (i != argc - 1) { + strncat(cmd, " ", cmd_len); + cmd_len--; + } + + } + + LM_DBG("cmd is %s\n", cmd); + + redisReply *reply = redisClusterCommandToNode(con->con, node, cmd); + if (con->con->err != REDIS_OK) { + LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr); + if (db_redis_connect(con) != 0) { + LM_ERR("Failed to reconnect to redis db\n"); + pkg_free(argv); + if (con->con) { + redisFree(con->con); + con->con = NULL; + } + return NULL; + } + reply = redisClusterCommandToNode(con->con, node, cmd); + } + pkg_free(argv); + return reply; +} + +#endif + void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) { char **argv = NULL; int argc; diff --git a/src/modules/db_redis/redis_connection.h b/src/modules/db_redis/redis_connection.h index 22392838c56..ce5e569938e 100644 --- a/src/modules/db_redis/redis_connection.h +++ b/src/modules/db_redis/redis_connection.h @@ -23,14 +23,19 @@ #ifndef _REDIS_CONNECTION_H_ #define _REDIS_CONNECTION_H_ +#ifdef WITH_HIREDIS_CLUSTER +#include +#else #ifdef WITH_HIREDIS_PATH #include #else #include #endif +#endif #include "db_redis_mod.h" +#ifndef WITH_REDIS_CLUSTER #define db_redis_check_reply(con, reply, err) do { \ if (!(reply) && !(con)->con) { \ LM_ERR("Failed to fetch type entry: no connection to server\n"); \ @@ -49,6 +54,26 @@ goto err; \ } \ } while(0); +#else +#define db_redis_check_reply(con, reply, err) do { \ + if (!(reply) && !(con)->con) { \ + LM_ERR("Failed to fetch type entry: no connection to server\n"); \ + goto err; \ + } \ + if (!(reply)) { \ + LM_ERR("Failed to fetch type entry: %s\n", \ + (con)->con->errstr); \ + redisClusterFree((con)->con); \ + (con)->con = NULL; \ + goto err; \ + } \ + if ((reply)->type == REDIS_REPLY_ERROR) { \ + LM_ERR("Failed to fetch type entry: %s\n", \ + (reply)->str); \ + goto err; \ + } \ +} while(0); +#endif typedef struct redis_key redis_key_t; @@ -61,8 +86,11 @@ typedef struct km_redis_con { struct db_id* id; unsigned int ref; struct pool_con* next; - +#ifdef WITH_HIREDIS_CLUSTER + redisClusterContext *con; +#else redisContext *con; +#endif redis_command_t *command_queue; unsigned int append_counter; struct str_hash_table tables; @@ -86,4 +114,8 @@ void db_redis_consume_replies(km_redis_con_t *con); void db_redis_free_reply(redisReply **reply); const char *db_redis_get_error(km_redis_con_t *con); +#ifdef WITH_HIREDIS_CLUSTER +void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node); +#endif + #endif /* _REDIS_CONNECTION_H_ */ diff --git a/src/modules/db_redis/redis_dbase.c b/src/modules/db_redis/redis_dbase.c index a53d1a34431..dc0dc2136f5 100644 --- a/src/modules/db_redis/redis_dbase.c +++ b/src/modules/db_redis/redis_dbase.c @@ -832,10 +832,23 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc goto err; } +#ifdef WITH_HIREDIS_CLUSTER +nodeIterator niter; +cluster_node *node; +initNodeIterator(&niter, con->con); +while ((node = nodeNext(&niter)) != NULL) { + if (node->role != REDIS_ROLE_MASTER) + continue; + reply = db_redis_command_argv_to_node(con, query_v, node); + if (!reply) { + LM_ERR("Invalid null reply from node %s\n", node->addr); + goto err; + } + +#else reply = db_redis_command_argv(con, query_v); - db_redis_key_free(&query_v); +#endif db_redis_check_reply(con, reply, err); - keys_list = reply; #endif @@ -880,6 +893,10 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc } while (cursor > 0); #endif +#ifdef WITH_HIREDIS_CLUSTER + } +#endif + // for full table scans, we have to manually match all given keys // but only do this once for repeated invocations if (!*manual_keys) { @@ -898,6 +915,8 @@ static int db_redis_scan_query_keys_pattern(km_redis_con_t *con, const str *matc if (reply) { db_redis_free_reply(&reply); } + + db_redis_key_free(&query_v); LM_DBG("got %lu entries by scan\n", (unsigned long) i); return 0; @@ -1636,6 +1655,10 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con redis_key_t *type_key; redis_key_t *set_key; +#ifdef WITH_HIREDIS_CLUSTER + long long scard; +#endif + if (!*keys_count && do_table_scan) { if (!ts_scan_start) LM_WARN("performing full table scan on table '%.*s' while performing delete\n", @@ -1806,6 +1829,57 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con for (type_key = type_keys, set_key = set_keys; type_key; type_key = type_key->next, set_key = set_key->next) { +#ifdef WITH_HIREDIS_CLUSTER + if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + db_redis_free_reply(&reply); + + if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) { + LM_ERR("Failed to add scard command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + scard = reply->integer; + db_redis_free_reply(&reply); + + if (scard != 0) + continue; + + if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); +#else if (db_redis_key_add_string(&query_v, "EVALSHA", 7) != 0) { LM_ERR("Failed to add srem command to post-delete query\n"); goto error; @@ -1834,6 +1908,7 @@ static int db_redis_perform_delete(const db1_con_t* _h, km_redis_con_t *con, con db_redis_key_free(&query_v); db_redis_check_reply(con, reply, error); db_redis_free_reply(&reply); +#endif } LM_DBG("done with loop '%.*s'\n", k->key.len, k->key.s); db_redis_key_free(&type_keys); @@ -1883,6 +1958,9 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con redis_key_t *new_type_keys = NULL; int new_type_keys_count = 0; redis_key_t *all_type_key; +#ifdef WITH_HIREDIS_CLUSTER + long long scard; +#endif if (!(*keys_count) && do_table_scan) { LM_WARN("performing full table scan on table '%.*s' while performing update\n", @@ -2194,6 +2272,58 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con db_redis_key_free(&query_v); +#ifdef WITH_HIREDIS_CLUSTER + if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + db_redis_free_reply(&reply); + + if (db_redis_key_add_string(&query_v, "SCARD", 5) != 0) { + LM_ERR("Failed to add scard command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + scard = reply->integer; + db_redis_free_reply(&reply); + + if (scard != 0) + continue; + + if (db_redis_key_add_string(&query_v, "SREM", 4) != 0) { + LM_ERR("Failed to add srem command to post-delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &set_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + if (db_redis_key_add_str(&query_v, &type_key->key) != 0) { + LM_ERR("Failed to add key to delete query\n"); + goto error; + } + reply = db_redis_command_argv(con, query_v); + db_redis_key_free(&query_v); + db_redis_check_reply(con, reply, error); + update_queries++; +#else if (db_redis_key_add_string(&query_v, "EVAL", 4) != 0) { LM_ERR("Failed to add srem command to post-delete query\n"); goto error; @@ -2226,6 +2356,7 @@ static int db_redis_perform_update(const db1_con_t* _h, km_redis_con_t *con, con } db_redis_key_free(&query_v); +#endif } }