From d0101010b891e60fdab0f76ae5ce899747d6586e Mon Sep 17 00:00:00 2001 From: Claudiu Boriga Date: Wed, 3 May 2017 16:19:04 +0300 Subject: [PATCH] ndb_redis: add disable server on failure feature - if a server fails multiple consecutive times it is disabled temporarily and commands to it will not do anything. --- src/modules/ndb_redis/doc/ndb_redis_admin.xml | 58 ++++++++++++++ src/modules/ndb_redis/ndb_redis_mod.c | 4 + src/modules/ndb_redis/redis_client.c | 77 ++++++++++++++++++- src/modules/ndb_redis/redis_client.h | 9 +++ 4 files changed, 147 insertions(+), 1 deletion(-) diff --git a/src/modules/ndb_redis/doc/ndb_redis_admin.xml b/src/modules/ndb_redis/doc/ndb_redis_admin.xml index f7cc9449056..e7dc381038d 100644 --- a/src/modules/ndb_redis/doc/ndb_redis_admin.xml +++ b/src/modules/ndb_redis/doc/ndb_redis_admin.xml @@ -177,6 +177,64 @@ modparam("ndb_redis", "server", "name=127.0.0.1:26004;addr=127.0.0.1;port=26004" modparam("ndb_redis", "server", "name=127.0.0.1:26008;addr=127.0.0.1;port=26008") ... modparam("ndb_redis", "cluster", 1) +... + + + +
+ <varname>allowed_timeouts</varname> (integer) + + If this is set to a non-negative value, it sets the number + of consecutive REDIS commands that can fail before temporarily + disabling the REDIS server. This is similar to rtpengine_disable_tout + parameter from the rtpengine module. + + + When communicating with a REDIS server, if redis_cmd or redis_execute + will fail for more than allowed_timeouts consecutive + times, the server will be temporary disabled for a number of seconds + configured by the disable_time parameter. + + + Disabling a server means that further redis_cmd and redis_execute commands + will not do anything and return a negative value -2. + Messages are also logged when disabling and re-enabling a server. + + + The number of consecutive fails are counted by each Kamailio process, + so when disabling a server this is done just for that process, not globally. + + + + Default value is -1 (disabled). + + + + Set <varname>allowed_timeots</varname> parameter + +... +modparam("ndb_redis", "allowed_timeouts", 3) +... + + +
+
+ <varname>disable_time</varname> (integer) + + If allowed_timeouts is set to a non negative value this determines the + number of seconds the REDIS server will be disabled + + + + Default value is 0. + + + + Set <varname>disable_time</varname> parameter + +... +modparam("ndb_redis", "allowed_timeouts", 0) +modparam("ndb_redis", "disable_time", 30) ... diff --git a/src/modules/ndb_redis/ndb_redis_mod.c b/src/modules/ndb_redis/ndb_redis_mod.c index 36b4efd3fe3..0906aab441c 100644 --- a/src/modules/ndb_redis/ndb_redis_mod.c +++ b/src/modules/ndb_redis/ndb_redis_mod.c @@ -48,6 +48,8 @@ int init_without_redis = 0; int redis_connect_timeout_param = 1000; int redis_cmd_timeout_param = 1000; int redis_cluster_param = 0; +int disable_time=0; +int allowed_timeouts=-1; static int w_redis_cmd3(struct sip_msg* msg, char* ssrv, char* scmd, char* sres); @@ -120,6 +122,8 @@ static param_export_t params[]={ {"connect_timeout", INT_PARAM, &redis_connect_timeout_param}, {"cmd_timeout", INT_PARAM, &redis_cmd_timeout_param}, {"cluster", INT_PARAM, &redis_cluster_param}, + {"disable_time", INT_PARAM, &disable_time}, + {"allowed_timeouts", INT_PARAM, &allowed_timeouts}, {0, 0, 0} }; diff --git a/src/modules/ndb_redis/redis_client.c b/src/modules/ndb_redis/redis_client.c index 4ebdeb19473..728a93d656e 100644 --- a/src/modules/ndb_redis/redis_client.c +++ b/src/modules/ndb_redis/redis_client.c @@ -49,6 +49,8 @@ extern int init_without_redis; extern int redis_connect_timeout_param; extern int redis_cmd_timeout_param; extern int redis_cluster_param; +extern int disable_time; +extern int allowed_timeouts; /* backwards compatibility with hiredis < 0.12 */ #if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12) @@ -547,7 +549,15 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) { redisc_reply_t *rpl; int i; + LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s); + + /* if server is disabled do nothing unless the disable time has passed */ + if (redis_check_server(rsrv)) + { + goto srv_disabled; + } + if (rsrv->piped.pending_commands == 0) { LM_WARN("call for redis_cmd without any pipelined commands\n"); @@ -584,12 +594,14 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis); if (rpl->rplRedis == NULL) { + redis_count_err_and_disable(rsrv); LM_ERR("Unable to read reply\n"); goto error_exec; } } else { + redis_count_err_and_disable(rsrv); goto error_exec; } } @@ -613,11 +625,16 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) LM_DBG("reply is [%s]",rpl->rplRedis->str); } redisc_free_pipelined_cmds(rsrv); + rsrv->disable.consecutive_errors = 0; return 0; error_exec: redisc_free_pipelined_cmds(rsrv); return -1; + +srv_disabled: + redisc_free_pipelined_cmds(rsrv); + return -2; } int check_cluster_reply(redisReply *reply, redisc_server_t **rsrv) { @@ -711,6 +728,11 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute"); redisc_exec_pipelined(rsrv); } + /* if server is disabled do nothing unless the disable time has passed */ + if (redis_check_server(rsrv)) + { + goto srv_disabled; + } rpl = redisc_get_reply(res); if(rpl==NULL) @@ -738,7 +760,15 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) if(redisc_reconnect_server(rsrv)==0) { rpl->rplRedis = redisvCommand(rsrv->ctxRedis, cmd->s, ap2); - } else { + if (rpl->rplRedis ==NULL) + { + redis_count_err_and_disable(rsrv); + goto error_exec; + } + } + else + { + redis_count_err_and_disable(rsrv); LM_ERR("unable to reconnect to redis server: %.*s\n", srv->len, srv->s); cmd->s[cmd->len] = c; @@ -781,6 +811,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) } } cmd->s[cmd->len] = c; + rsrv->disable.consecutive_errors = 0; va_end(ap); va_end(ap2); va_end(ap3); @@ -797,6 +828,13 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) va_end(ap4); return -1; +srv_disabled: + va_end(ap); + va_end(ap2); + va_end(ap3); + va_end(ap4); + return -2; + } /** @@ -983,3 +1021,40 @@ int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len) return REDIS_OK; } #endif + +int redis_check_server(redisc_server_t *rsrv) +{ + + if (rsrv->disable.disabled) + { + if (get_ticks() > rsrv->disable.restore_tick) + { + LM_INFO("REDIS server %.*s re-enabled",rsrv->sname->len,rsrv->sname->s); + rsrv->disable.disabled = 0; + rsrv->disable.consecutive_errors = 0; + } + else + { + return 1; + } + } + return 0; +} + +int redis_count_err_and_disable(redisc_server_t *rsrv) +{ + if (allowed_timeouts < 0) + { + return 0; + } + + rsrv->disable.consecutive_errors++; + if (rsrv->disable.consecutive_errors > allowed_timeouts) + { + rsrv->disable.disabled=1; + rsrv->disable.restore_tick=get_ticks() + disable_time; + LM_WARN("REDIS server %.*s disabled for %d seconds",rsrv->sname->len,rsrv->sname->s,disable_time); + return 1; + } + return 0; +} diff --git a/src/modules/ndb_redis/redis_client.h b/src/modules/ndb_redis/redis_client.h index f0dcf94ed66..de52d33945a 100644 --- a/src/modules/ndb_redis/redis_client.h +++ b/src/modules/ndb_redis/redis_client.h @@ -53,6 +53,12 @@ typedef struct redisc_piped_cmds { int pending_commands; } redisc_piped_cmds_t; +typedef struct redisc_srv_disable { + int disabled; + int consecutive_errors; + time_t restore_tick; +} redisc_srv_disable_t; + typedef struct redisc_server { str *sname; unsigned int hname; @@ -60,6 +66,7 @@ typedef struct redisc_server { redisContext *ctxRedis; struct redisc_server *next; redisc_piped_cmds_t piped; + redisc_srv_disable_t disable; } redisc_server_t; typedef struct redisc_pv { @@ -86,4 +93,6 @@ redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, redisc_reply_t *redisc_get_reply(str *name); int redisc_free_reply(str *name); int redisc_check_auth(redisc_server_t *rsrv, char *pass); +int redis_check_server(redisc_server_t *rsrv); +int redis_count_err_and_disable(redisc_server_t *rsrv); #endif