From 3c1f6a51e830f6b3c3d98b32178c3cf6af16f62d Mon Sep 17 00:00:00 2001 From: Carsten Bock Date: Mon, 24 Apr 2017 14:23:47 +0200 Subject: [PATCH] Fix ndb_redis Cluster implementation --- src/modules/ndb_redis/doc/ndb_redis_admin.xml | 10 + src/modules/ndb_redis/ndb_redis_mod.c | 15 +- src/modules/ndb_redis/redis_client.c | 266 ++++++++++-------- src/modules/ndb_redis/redis_client.h | 2 - 4 files changed, 154 insertions(+), 139 deletions(-) diff --git a/src/modules/ndb_redis/doc/ndb_redis_admin.xml b/src/modules/ndb_redis/doc/ndb_redis_admin.xml index cad2893327f..50bd0334577 100644 --- a/src/modules/ndb_redis/doc/ndb_redis_admin.xml +++ b/src/modules/ndb_redis/doc/ndb_redis_admin.xml @@ -157,6 +157,12 @@ modparam("ndb_redis", "cmd_timeout", 500) If set to 1, the module will connect to servers indicated in the "MOVED" reply. + + The module needs to know all existing REDIS-Nodes at startup. + The nodes are searched by the name "ip:port", e.g. if REDIS + replies with "MOVED 127.0.0.1:4711", ndb_redis needs to know + the databases "127.0.0.1:4711". + Default value is 0 (disabled). @@ -166,6 +172,10 @@ modparam("ndb_redis", "cmd_timeout", 500) Set <varname>cluster</varname> parameter ... +modparam("ndb_redis", "server", "name=127.0.0.1:26001;addr=127.0.0.1;port=26001") +modparam("ndb_redis", "server", "name=127.0.0.1:26004;addr=127.0.0.1;port=26004") +modparam("ndb_redis", "server", "name=127.0.0.1:26008;addr=127.0.0.1;port=26008") +... modparam("ndb_redis", "cluster", 1) ... diff --git a/src/modules/ndb_redis/ndb_redis_mod.c b/src/modules/ndb_redis/ndb_redis_mod.c index 6bb74d9fba4..862bda0a735 100644 --- a/src/modules/ndb_redis/ndb_redis_mod.c +++ b/src/modules/ndb_redis/ndb_redis_mod.c @@ -61,7 +61,6 @@ static int fixup_redis_cmd6(void** param, int param_no); static int w_redis_free_reply(struct sip_msg* msg, char* res); -static int mod_init(void); static void mod_destroy(void); static int child_init(int rank); @@ -114,7 +113,7 @@ struct module_exports exports = { 0, /* exported MI functions */ mod_pvs, /* exported pseudo-variables */ 0, /* extra processes */ - mod_init, /* module initialization function */ + 0, /* module initialization function */ 0, /* response function */ mod_destroy, /* destroy function */ child_init /* per child init function */ @@ -136,18 +135,6 @@ static int child_init(int rank) return 0; } -/** - * - */ -static int mod_init(void) -{ - if (init_list() < 0) { - LM_ERR("failed to initialize redis connections\n"); - return -1; - } - return 0; -} - /** * */ diff --git a/src/modules/ndb_redis/redis_client.c b/src/modules/ndb_redis/redis_client.c index 407967a7050..26f86da949a 100644 --- a/src/modules/ndb_redis/redis_client.c +++ b/src/modules/ndb_redis/redis_client.c @@ -41,7 +41,7 @@ #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;}) -static redisc_server_t ** _redisc_srv_list=NULL; +static redisc_server_t * _redisc_srv_list=NULL; static redisc_reply_t *_redisc_rpl_list=NULL; @@ -55,8 +55,9 @@ extern int redis_cluster_param; */ int redisc_init(void) { - char *addr, *pass, *unix_sock_path = NULL; - unsigned int port, db; + char addr[256], pass[256], unix_sock_path[256]; + + unsigned int port, db, sock = 0, haspass = 0; redisc_server_t *rsrv=NULL; param_t *pit = NULL; struct timeval tv_conn; @@ -68,27 +69,30 @@ int redisc_init(void) tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000; tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000; - if(*_redisc_srv_list==NULL) + if(_redisc_srv_list==NULL) { LM_ERR("no redis servers defined\n"); return -1; } - for(rsrv=*_redisc_srv_list; rsrv; rsrv=rsrv->next) + for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next) { - addr = "127.0.0.1"; port = 6379; db = 0; - pass = NULL; + haspass = 0; + sock = 0; + + memset(addr, 0, sizeof(addr)); + memset(pass, 0, sizeof(pass)); + memset(unix_sock_path, 0, sizeof(unix_sock_path)); for (pit = rsrv->attrs; pit; pit=pit->next) { if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) { - unix_sock_path = pit->body.s; - unix_sock_path[pit->body.len] = '\0'; + snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s); + sock = 1; } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) { - addr = pit->body.s; - addr[pit->body.len] = '\0'; + snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) { if(str2int(&pit->body, &port) < 0) port = 6379; @@ -96,38 +100,52 @@ int redisc_init(void) if(str2int(&pit->body, &db) < 0) db = 0; } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) { - pass = pit->body.s; - pass[pit->body.len] = '\0'; + snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s); + haspass = 1; } } - if(unix_sock_path != NULL) { + if(sock != 0) { LM_DBG("Connecting to unix socket: %s\n", unix_sock_path); rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn); } else { + LM_DBG("Connecting to %s:%d\n", addr, port); rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn); } - if(!rsrv->ctxRedis) + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + + if(!rsrv->ctxRedis) { + LM_ERR("Failed to create REDIS-Context.\n"); goto err; - if (rsrv->ctxRedis->err) + } + if (rsrv->ctxRedis->err) { + LM_ERR("Failed to create REDIS returned an error: %s\n", rsrv->ctxRedis->errstr); goto err2; - if ((pass != NULL) && redisc_check_auth(rsrv, pass)) + } + if ((haspass != 0) && redisc_check_auth(rsrv, pass)) { + LM_ERR("Authentication failed.\n"); goto err2; - if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) + } + if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) { + LM_ERR("Failed to set timeout.\n"); goto err2; - if (redisCommandNR(rsrv->ctxRedis, "PING")) + } + if (redisCommandNR(rsrv->ctxRedis, "PING")) { + LM_ERR("Failed to send PING (REDIS returned %s).\n", rsrv->ctxRedis->errstr); goto err2; - if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) + } + if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) { + LM_ERR("Failed to send \"SELECT %i\" (REDIS returned \"%s\", and not in cluster mode).\n", db, rsrv->ctxRedis->errstr); goto err2; - + } } return 0; err2: - if (unix_sock_path != NULL) { + if (sock != 0) { LM_ERR("error communicating with redis server [%.*s]" " (unix:%s db:%d): %s\n", rsrv->sname->len, rsrv->sname->s, unix_sock_path, db, @@ -190,38 +208,21 @@ int redisc_destroy(void) if(_redisc_srv_list==NULL) return -1; - if(*_redisc_srv_list==NULL) - return -1; - rsrv=*_redisc_srv_list; + rsrv=_redisc_srv_list; while(rsrv!=NULL) { rsrv1 = rsrv; rsrv=rsrv->next; - if (rsrv1->settings != NULL) - shm_free(rsrv1->settings); if (rsrv1->ctxRedis!=NULL) redisFree(rsrv1->ctxRedis); free_params(rsrv1->attrs); - shm_free(rsrv1); + pkg_free(rsrv1); } - shm_free(*_redisc_srv_list); - *_redisc_srv_list = NULL; _redisc_srv_list = NULL; return 0; } -int init_list(void) { - if (_redisc_srv_list == NULL) { - _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*)); - if(!_redisc_srv_list) { - LM_ERR("Out of memory\n"); - return -1; - } - } - return 0; -} - /** * */ @@ -241,17 +242,10 @@ int redisc_add_server(char *spec) LM_ERR("failed parsing params value\n"); goto error; } - if (_redisc_srv_list == NULL) { - _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*)); - if(!_redisc_srv_list) { - LM_ERR("Out of memory\n"); - return -1; - } - } - rsrv = (redisc_server_t*)shm_malloc(sizeof(redisc_server_t)); + rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t)); if(rsrv==NULL) { - LM_ERR("no more shm\n"); + LM_ERR("no more pkg\n"); goto error; } memset(rsrv, 0, sizeof(redisc_server_t)); @@ -269,15 +263,15 @@ int redisc_add_server(char *spec) LM_ERR("no server name\n"); goto error; } - rsrv->next = *_redisc_srv_list; - *_redisc_srv_list = rsrv; + rsrv->next = _redisc_srv_list; + _redisc_srv_list = rsrv; return 0; error: if(pit!=NULL) free_params(pit); if(rsrv!=NULL) - shm_free(rsrv); + pkg_free(rsrv); return -1; } @@ -291,7 +285,7 @@ redisc_server_t *redisc_get_server(str *name) hname = get_hash1_raw(name->s, name->len); LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s); - rsrv=*_redisc_srv_list; + rsrv=_redisc_srv_list; while(rsrv!=NULL) { LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len, rsrv->sname->s); @@ -309,8 +303,8 @@ redisc_server_t *redisc_get_server(str *name) */ int redisc_reconnect_server(redisc_server_t *rsrv) { - char *addr, *pass, *unix_sock_path = NULL; - unsigned int port, db; + char addr[256], pass[256], unix_sock_path[256]; + unsigned int port, db, sock = 0, haspass = 0; param_t *pit = NULL; struct timeval tv_conn; struct timeval tv_cmd; @@ -321,18 +315,18 @@ int redisc_reconnect_server(redisc_server_t *rsrv) tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000; tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000; - addr = "127.0.0.1"; + memset(addr, 0, sizeof(addr)); port = 6379; db = 0; - pass = NULL; + memset(pass, 0, sizeof(pass)); + memset(unix_sock_path, 0, sizeof(unix_sock_path)); for (pit = rsrv->attrs; pit; pit=pit->next) { if(pit->name.len==4 && strncmp(pit->name.s, "unix", 4)==0) { - unix_sock_path = pit->body.s; - unix_sock_path[pit->body.len] = '\0'; + snprintf(unix_sock_path, sizeof(unix_sock_path)-1, "%.*s", pit->body.len, pit->body.s); + sock = 1; } else if(pit->name.len==4 && strncmp(pit->name.s, "addr", 4)==0) { - addr = pit->body.s; - addr[pit->body.len] = '\0'; + snprintf(addr, sizeof(addr)-1, "%.*s", pit->body.len, pit->body.s); } else if(pit->name.len==4 && strncmp(pit->name.s, "port", 4)==0) { if(str2int(&pit->body, &port) < 0) port = 6379; @@ -340,25 +334,28 @@ int redisc_reconnect_server(redisc_server_t *rsrv) if(str2int(&pit->body, &db) < 0) db = 0; } else if(pit->name.len==4 && strncmp(pit->name.s, "pass", 4)==0) { - pass = pit->body.s; - pass[pit->body.len] = '\0'; + snprintf(pass, sizeof(pass)-1, "%.*s", pit->body.len, pit->body.s); + haspass = 1; } } + + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(rsrv->ctxRedis!=NULL) { redisFree(rsrv->ctxRedis); rsrv->ctxRedis = NULL; } - if(unix_sock_path != NULL) { + if(sock != 0) { rsrv->ctxRedis = redisConnectUnixWithTimeout(unix_sock_path, tv_conn); } else { rsrv->ctxRedis = redisConnectWithTimeout(addr, port, tv_conn); } + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); if(!rsrv->ctxRedis) goto err; if (rsrv->ctxRedis->err) goto err2; - if ((pass != NULL) && redisc_check_auth(rsrv, pass)) + if ((haspass) && redisc_check_auth(rsrv, pass)) goto err2; if (redisSetTimeout(rsrv->ctxRedis, tv_cmd)) goto err2; @@ -393,27 +390,13 @@ int redisc_reconnect_server(redisc_server_t *rsrv) int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { redisc_server_t *rsrv_new; - char *pass; - char buffer[100], buffername[100]; - unsigned int port, db; + char buffername[100]; + unsigned int port; str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0}; - param_t *pit = NULL; if (redis_cluster_param) { LM_DBG("Redis replied: \"%.*s\"\n", reply->len, reply->str); if ((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) { port = 6379; - db = 0; - pass = NULL; - // Copy DB and password from current server: - for (pit = (*rsrv)->attrs; pit; pit=pit->next) { - if (pit->name.len==2 && strncmp(pit->name.s, "db", 2) == 0) { - if (str2int(&pit->body, &db) < 0) - db = 0; - } else if (pit->name.len==4 && strncmp(pit->name.s, "pass", 4) == 0) { - pass = pit->body.s; - pass[pit->body.len] = '\0'; - } - } if (strchr(reply->str, ':') > 0) { tmpstr.s = strchr(reply->str, ':') + 1; tmpstr.len = reply->len - (tmpstr.s - reply->str); @@ -433,7 +416,8 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { return 0; } - name.len = snprintf(buffername, sizeof(buffername), "%.*s-%i-%i", addr.len, addr.s, port, db); + memset(buffername, 0, sizeof(buffername)); + name.len = snprintf(buffername, sizeof(buffername), "%.*s:%i", addr.len, addr.s, port); name.s = buffername; LM_DBG("Name of new connection: %.*s\n", name.len, name.s); rsrv_new = redisc_get_server(&name); @@ -442,30 +426,7 @@ int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { *rsrv = rsrv_new; return 1; } else { - LM_DBG("New Connection\n"); - if (pass) { - tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i;pass=%s", name.len, name.s, addr.len, addr.s, port, db, pass); - tmpstr.s = shm_malloc(tmpstr.len + 1); - memcpy(tmpstr.s, buffer, tmpstr.len); - } else { - tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i", name.len, name.s, addr.len, addr.s, port, db); - tmpstr.s = shm_malloc(tmpstr.len + 1); - memcpy(tmpstr.s, buffer, tmpstr.len); - } - tmpstr.s[tmpstr.len] = '\0'; - LM_DBG("Connection setup: %.*s\n", tmpstr.len, tmpstr.s); - if (redisc_add_server(tmpstr.s) == 0) { - rsrv_new = redisc_get_server(&name); - if (rsrv_new) { - rsrv_new->settings = tmpstr.s; - redisc_reconnect_server(rsrv_new); - LM_DBG("Connection successful\n"); - *rsrv = rsrv_new; - return 1; - } - } else { - LM_ERR("Failed to add Connection (%.*s)\n", tmpstr.len, tmpstr.s); - } + LM_ERR("No Connection with name (%.*s)\n", name.len, name.s); } } } @@ -480,7 +441,12 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) redisc_server_t *rsrv=NULL; redisc_reply_t *rpl; char c; - va_list ap; + va_list ap, ap2, ap3, ap4; + + va_start(ap, cmd); + va_copy(ap2, ap); + va_copy(ap3, ap); + va_copy(ap4, ap); if(srv==NULL || cmd==NULL || res==NULL) { @@ -498,28 +464,31 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) LM_ERR("no redis server found: %.*s\n", srv->len, srv->s); goto error_exec; } + + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + if(rsrv->ctxRedis==NULL) { LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); goto error_exec; } + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + rpl = redisc_get_reply(res); if(rpl==NULL) { LM_ERR("no redis reply id found: %.*s\n", res->len, res->s); goto error_exec; } -query: - va_start(ap, cmd); - + c = cmd->s[cmd->len]; + cmd->s[cmd->len] = '\0'; if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ freeReplyObject(rpl->rplRedis); rpl->rplRedis = NULL; } - c = cmd->s[cmd->len]; - cmd->s[cmd->len] = '\0'; + rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap ); if(rpl->rplRedis == NULL) { @@ -530,8 +499,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) } if(redisc_reconnect_server(rsrv)==0) { - va_end(ap); - rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap); + rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2); } else { LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s); @@ -540,15 +508,55 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) } } if (check_cluster_reply(rpl->rplRedis, &rsrv)) { - va_end(ap); - goto query; + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + if(rsrv->ctxRedis==NULL) + { + LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); + goto error_exec; + } + + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + + if(rpl->rplRedis!=NULL) + { + /* clean up previous redis reply */ + freeReplyObject(rpl->rplRedis); + rpl->rplRedis = NULL; + } + rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap3 ); + if(rpl->rplRedis == NULL) + { + /* null reply, reconnect and try again */ + if(rsrv->ctxRedis->err) + { + LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr); + } + if(redisc_reconnect_server(rsrv)==0) + { + rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap4); + } else { + LM_ERR("unable to reconnect to redis server: %.*s\n", + srv->len, srv->s); + cmd->s[cmd->len] = c; + goto error_exec; + } + } } cmd->s[cmd->len] = c; va_end(ap); + va_end(ap2); + va_end(ap3); + va_end(ap4); + + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + return 0; error_exec: va_end(ap); + va_end(ap2); + va_end(ap3); + va_end(ap4); return -1; } @@ -568,13 +576,23 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, { redisReply *res=NULL; - if(rsrv==NULL || rsrv->ctxRedis==NULL) + if(rsrv==NULL) { LM_ERR("no redis context found for server %.*s\n", (rsrv)?rsrv->sname->len:0, (rsrv)?rsrv->sname->s:""); return NULL; } + + LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); + if(rsrv->ctxRedis==NULL) + { + LM_ERR("no redis context found for server %.*s\n", + (rsrv)?rsrv->sname->len:0, + (rsrv)?rsrv->sname->s:""); + return NULL; + } + if(argc<=0) { LM_ERR("invalid parameters\n"); @@ -587,6 +605,13 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, } again: res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); + + /* null reply, reconnect and try again */ + if(rsrv->ctxRedis->err) + { + LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr); + } + if(res) { if (check_cluster_reply(res, &rsrv)) { @@ -595,11 +620,6 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, return res; } - /* null reply, reconnect and try again */ - if(rsrv->ctxRedis->err) - { - LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr); - } if(redisc_reconnect_server(rsrv)==0) { res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); diff --git a/src/modules/ndb_redis/redis_client.h b/src/modules/ndb_redis/redis_client.h index e8b0b5b4b00..ce334dfe7b8 100644 --- a/src/modules/ndb_redis/redis_client.h +++ b/src/modules/ndb_redis/redis_client.h @@ -34,7 +34,6 @@ #include "../../core/parser/parse_param.h" #include "../../core/mod_fix.h" -int init_list(void); int redisc_init(void); int redisc_destroy(void); int redisc_add_server(char *spec); @@ -46,7 +45,6 @@ typedef struct redisc_server { param_t *attrs; redisContext *ctxRedis; struct redisc_server *next; - char * settings; } redisc_server_t; typedef struct redisc_reply {