diff --git a/src/modules/ndb_redis/doc/ndb_redis.xml b/src/modules/ndb_redis/doc/ndb_redis.xml index cc62fe3063d..e33a8a787a3 100644 --- a/src/modules/ndb_redis/doc/ndb_redis.xml +++ b/src/modules/ndb_redis/doc/ndb_redis.xml @@ -33,6 +33,11 @@ Isaksen misak@uni-tel.dk + + Carsten + Bock + carsten@ng-voice.com + 2011 @@ -42,6 +47,10 @@ 2012 www.systemonenoc.com + + 2017 + ng-voice GmbH + diff --git a/src/modules/ndb_redis/doc/ndb_redis_admin.xml b/src/modules/ndb_redis/doc/ndb_redis_admin.xml index 0337dbbc9d5..cad2893327f 100644 --- a/src/modules/ndb_redis/doc/ndb_redis_admin.xml +++ b/src/modules/ndb_redis/doc/ndb_redis_admin.xml @@ -148,6 +148,25 @@ modparam("ndb_redis", "connect_timeout", 500) ... modparam("ndb_redis", "cmd_timeout", 500) +... + + + +
+ <varname>cluster</varname> (integer) + + If set to 1, the module will connect to servers indicated in the "MOVED" reply. + + + + Default value is 0 (disabled). + + + + Set <varname>cluster</varname> parameter + +... +modparam("ndb_redis", "cluster", 1) ... diff --git a/src/modules/ndb_redis/ndb_redis_mod.c b/src/modules/ndb_redis/ndb_redis_mod.c index 5bfe9a5fe28..6bb74d9fba4 100644 --- a/src/modules/ndb_redis/ndb_redis_mod.c +++ b/src/modules/ndb_redis/ndb_redis_mod.c @@ -4,6 +4,9 @@ * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com) * - for: redis array reply support * + * Copyright (C) 2017 Carsten Bock (ng-voice GmbH) + * - for: Cluster support + * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify @@ -44,6 +47,7 @@ int redis_srv_param(modparam_t type, void *val); int init_without_redis = 0; int redis_connect_timeout_param = 1000; int redis_cmd_timeout_param = 1000; +int redis_cluster_param = 0; static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd, char* sres); @@ -57,6 +61,7 @@ static int fixup_redis_cmd6(void** param, int param_no); static int w_redis_free_reply(struct sip_msg* msg, char* res); +static int mod_init(void); static void mod_destroy(void); static int child_init(int rank); @@ -96,6 +101,7 @@ static param_export_t params[]={ {"init_without_redis", INT_PARAM, &init_without_redis}, {"connect_timeout", INT_PARAM, &redis_connect_timeout_param}, {"cmd_timeout", INT_PARAM, &redis_cmd_timeout_param}, + {"cluster", INT_PARAM, &redis_cluster_param}, {0, 0, 0} }; @@ -108,7 +114,7 @@ struct module_exports exports = { 0, /* exported MI functions */ mod_pvs, /* exported pseudo-variables */ 0, /* extra processes */ - 0, /* module initialization function */ + mod_init, /* module initialization function */ 0, /* response function */ mod_destroy, /* destroy function */ child_init /* per child init function */ @@ -130,6 +136,18 @@ static int child_init(int rank) return 0; } +/** + * + */ +static int mod_init(void) +{ + if (init_list() < 0) { + LM_ERR("failed to initialize redis connections\n"); + return -1; + } + return 0; +} + /** * */ diff --git a/src/modules/ndb_redis/redis_client.c b/src/modules/ndb_redis/redis_client.c index c60066d6bd8..3310f8d0600 100644 --- a/src/modules/ndb_redis/redis_client.c +++ b/src/modules/ndb_redis/redis_client.c @@ -1,6 +1,12 @@ /** * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com) * + * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com) + * - for: redis array reply support + * + * Copyright (C) 2017 Carsten Bock (ng-voice GmbH) + * - for: Cluster support + * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify @@ -35,14 +41,14 @@ #define redisCommandNR(a...) (int)({ void *__tmp; __tmp = redisCommand(a); if (__tmp) freeReplyObject(__tmp); __tmp ? 0 : -1;}) -static redisc_server_t *_redisc_srv_list=NULL; +static redisc_server_t ** _redisc_srv_list=NULL; static redisc_reply_t *_redisc_rpl_list=NULL; extern int init_without_redis; extern int redis_connect_timeout_param; extern int redis_cmd_timeout_param; - +extern int redis_cluster_param; /** * @@ -62,13 +68,13 @@ int redisc_init(void) tv_cmd.tv_sec = (int) redis_cmd_timeout_param / 1000; tv_cmd.tv_usec = (int) (redis_cmd_timeout_param % 1000) * 1000; - if(_redisc_srv_list==NULL) + if(*_redisc_srv_list==NULL) { LM_ERR("no redis servers defined\n"); return -1; } - for(rsrv=_redisc_srv_list; rsrv; rsrv=rsrv->next) + for(rsrv=*_redisc_srv_list; rsrv; rsrv=rsrv->next) { addr = "127.0.0.1"; port = 6379; @@ -113,7 +119,7 @@ int redisc_init(void) goto err2; if (redisCommandNR(rsrv->ctxRedis, "PING")) goto err2; - if (redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) + if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) goto err2; } @@ -175,7 +181,7 @@ int redisc_destroy(void) freeReplyObject(rpl->rplRedis); if(rpl->rname.s != NULL) - pkg_free(rpl->rname.s); + shm_free(rpl->rname.s); pkg_free(rpl); rpl = next_rpl; @@ -184,21 +190,38 @@ int redisc_destroy(void) if(_redisc_srv_list==NULL) return -1; - rsrv=_redisc_srv_list; + if(*_redisc_srv_list==NULL) + return -1; + rsrv=*_redisc_srv_list; while(rsrv!=NULL) { rsrv1 = rsrv; rsrv=rsrv->next; - if(rsrv1->ctxRedis!=NULL) + if (rsrv1->settings != NULL) + shm_free(rsrv1->settings); + if (rsrv1->ctxRedis!=NULL) redisFree(rsrv1->ctxRedis); free_params(rsrv1->attrs); - pkg_free(rsrv1); + shm_free(rsrv1); } + shm_free(*_redisc_srv_list); + *_redisc_srv_list = NULL; _redisc_srv_list = NULL; return 0; } +int init_list(void) { + if (_redisc_srv_list == NULL) { + _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*)); + if(!_redisc_srv_list) { + LM_ERR("Out of memory\n"); + return -1; + } + } + return 0; +} + /** * */ @@ -218,10 +241,17 @@ int redisc_add_server(char *spec) LM_ERR("failed parsing params value\n"); goto error; } - rsrv = (redisc_server_t*)pkg_malloc(sizeof(redisc_server_t)); + if (_redisc_srv_list == NULL) { + _redisc_srv_list = (redisc_server_t **)shm_malloc(sizeof(redisc_server_t*)); + if(!_redisc_srv_list) { + LM_ERR("Out of memory\n"); + return -1; + } + } + rsrv = (redisc_server_t*)shm_malloc(sizeof(redisc_server_t)); if(rsrv==NULL) { - LM_ERR("no more pkg\n"); + LM_ERR("no more shm\n"); goto error; } memset(rsrv, 0, sizeof(redisc_server_t)); @@ -239,15 +269,15 @@ int redisc_add_server(char *spec) LM_ERR("no server name\n"); goto error; } - rsrv->next = _redisc_srv_list; - _redisc_srv_list = rsrv; + rsrv->next = *_redisc_srv_list; + *_redisc_srv_list = rsrv; return 0; error: if(pit!=NULL) free_params(pit); if(rsrv!=NULL) - pkg_free(rsrv); + shm_free(rsrv); return -1; } @@ -260,14 +290,17 @@ redisc_server_t *redisc_get_server(str *name) unsigned int hname; hname = get_hash1_raw(name->s, name->len); - rsrv=_redisc_srv_list; + LM_DBG("Hash %u (%.*s)\n", hname, name->len, name->s); + rsrv=*_redisc_srv_list; while(rsrv!=NULL) { + LM_DBG("Entry %u (%.*s)\n", rsrv->hname, rsrv->sname->len, rsrv->sname->s); if(rsrv->hname==hname && rsrv->sname->len==name->len && strncmp(rsrv->sname->s, name->s, name->len)==0) return rsrv; rsrv=rsrv->next; } + LM_DBG("No entry found.\n"); return NULL; } @@ -331,7 +364,7 @@ int redisc_reconnect_server(redisc_server_t *rsrv) goto err2; if (redisCommandNR(rsrv->ctxRedis, "PING")) goto err2; - if (redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) + if ((redis_cluster_param == 0) && redisCommandNR(rsrv->ctxRedis, "SELECT %i", db)) goto err2; return 0; @@ -358,6 +391,87 @@ int redisc_reconnect_server(redisc_server_t *rsrv) return -1; } +int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { + redisc_server_t *rsrv_new; + char *pass; + char buffer[100], buffername[100]; + unsigned int port, db; + str addr = {0, 0}, tmpstr = {0, 0}, name = {0, 0}; + param_t *pit = NULL; + if (redis_cluster_param) { + LM_DBG("Redis replied: \"%.*s\"\n", reply->len, reply->str); + if ((reply->len > 7) && (strncmp(reply->str, "MOVED", 5) == 0)) { + port = 6379; + db = 0; + pass = NULL; + // Copy DB and password from current server: + for (pit = (*rsrv)->attrs; pit; pit=pit->next) { + if (pit->name.len==2 && strncmp(pit->name.s, "db", 2) == 0) { + if (str2int(&pit->body, &db) < 0) + db = 0; + } else if (pit->name.len==4 && strncmp(pit->name.s, "pass", 4) == 0) { + pass = pit->body.s; + pass[pit->body.len] = '\0'; + } + } + if (strchr(reply->str, ':') > 0) { + tmpstr.s = strchr(reply->str, ':') + 1; + tmpstr.len = reply->len - (tmpstr.s - reply->str); + if(str2int(&tmpstr, &port) < 0) + port = 6379; + LM_DBG("Port \"%.*s\" [%i] => %i\n", tmpstr.len, tmpstr.s, tmpstr.len, port); + } else { + LM_ERR("No Port in REDIS MOVED Reply (%.*s)\n", reply->len, reply->str); + return 0; + } + if (strchr(reply->str+6, ' ') > 0) { + addr.len = tmpstr.s - strchr(reply->str+6, ' ') - 2; + addr.s = strchr(reply->str+6, ' ') + 1; + LM_DBG("Host \"%.*s\" [%i]\n", addr.len, addr.s, addr.len); + } else { + LM_ERR("No Host in REDIS MOVED Reply (%.*s)\n", reply->len, reply->str); + return 0; + } + + name.len = snprintf(buffername, sizeof(buffername), "%.*s-%i-%i", addr.len, addr.s, port, db); + name.s = buffername; + LM_DBG("Name of new connection: %.*s\n", name.len, name.s); + rsrv_new = redisc_get_server(&name); + if (rsrv_new) { + LM_DBG("Reusing Connection\n"); + *rsrv = rsrv_new; + return 1; + } else { + LM_DBG("New Connection\n"); + if (pass) { + tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i;pass=%s", name.len, name.s, addr.len, addr.s, port, db, pass); + tmpstr.s = shm_malloc(tmpstr.len + 1); + memcpy(tmpstr.s, buffer, tmpstr.len); + } else { + tmpstr.len = snprintf(buffer, sizeof(buffer)-1, "name=%.*s;addr=%.*s;port=%i;db=%i", name.len, name.s, addr.len, addr.s, port, db); + tmpstr.s = shm_malloc(tmpstr.len + 1); + memcpy(tmpstr.s, buffer, tmpstr.len); + } + tmpstr.s[tmpstr.len] = '\0'; + LM_DBG("Connection setup: %.*s\n", tmpstr.len, tmpstr.s); + if (redisc_add_server(tmpstr.s) == 0) { + rsrv_new = redisc_get_server(&name); + if (rsrv_new) { + rsrv_new->settings = tmpstr.s; + redisc_reconnect_server(rsrv_new); + LM_DBG("Connection successful\n"); + *rsrv = rsrv_new; + return 1; + } + } else { + LM_ERR("Failed to add Connection (%.*s)\n", tmpstr.len, tmpstr.s); + } + } + } + } + return 0; +} + /** * */ @@ -366,10 +480,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) redisc_server_t *rsrv=NULL; redisc_reply_t *rpl; char c; - va_list ap, ap2; - - va_start(ap, cmd); - va_copy(ap2, ap); + va_list ap; if(srv==NULL || cmd==NULL || res==NULL) { @@ -398,6 +509,9 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) LM_ERR("no redis reply id found: %.*s\n", res->len, res->s); goto error_exec; } +query: + va_start(ap, cmd); + if(rpl->rplRedis!=NULL) { /* clean up previous redis reply */ @@ -416,7 +530,8 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) } if(redisc_reconnect_server(rsrv)==0) { - rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2); + va_end(ap); + rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap); } else { LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s); @@ -424,14 +539,16 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) goto error_exec; } } + if (check_cluster_reply(rpl->rplRedis, &rsrv)) { + va_end(ap); + goto query; + } cmd->s[cmd->len] = c; va_end(ap); - va_end(ap2); return 0; error_exec: va_end(ap); - va_end(ap2); return -1; } @@ -468,9 +585,13 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, LM_ERR("invalid parameters\n"); return NULL; } +again: res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); if(res) { + if (check_cluster_reply(res, &rsrv)) { + goto again; + } return res; } @@ -482,6 +603,11 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, if(redisc_reconnect_server(rsrv)==0) { res = redisCommandArgv(rsrv->ctxRedis, argc, argv, argvlen); + if (res) { + if (check_cluster_reply(res, &rsrv)) { + goto again; + } + } } else { diff --git a/src/modules/ndb_redis/redis_client.h b/src/modules/ndb_redis/redis_client.h index 3f16784e5c6..e8b0b5b4b00 100644 --- a/src/modules/ndb_redis/redis_client.h +++ b/src/modules/ndb_redis/redis_client.h @@ -1,6 +1,12 @@ /** * Copyright (C) 2011 Daniel-Constantin Mierla (asipto.com) * + * Copyright (C) 2012 Vicente Hernando Ara (System One: www.systemonenoc.com) + * - for: redis array reply support + * + * Copyright (C) 2017 Carsten Bock (ng-voice GmbH) + * - for: Cluster support + * * This file is part of Kamailio, a free SIP server. * * Kamailio is free software; you can redistribute it and/or modify @@ -28,6 +34,7 @@ #include "../../core/parser/parse_param.h" #include "../../core/mod_fix.h" +int init_list(void); int redisc_init(void); int redisc_destroy(void); int redisc_add_server(char *spec); @@ -39,6 +46,7 @@ typedef struct redisc_server { param_t *attrs; redisContext *ctxRedis; struct redisc_server *next; + char * settings; } redisc_server_t; typedef struct redisc_reply {