From 03c20c99ba1fae9b86ed7754bc7574ec2467b820 Mon Sep 17 00:00:00 2001 From: Claudiu Boriga Date: Thu, 27 Apr 2017 17:05:25 +0300 Subject: [PATCH 1/2] ndb_redis: fix connection problems with pipelining -fix problem when a connection with a REDIS server fails and the pipelined command line is lost, while a new connection will not be established --- src/modules/ndb_redis/redis_client.c | 112 +++++++++++++++++++++++---- src/modules/ndb_redis/redis_client.h | 11 ++- 2 files changed, 104 insertions(+), 19 deletions(-) diff --git a/src/modules/ndb_redis/redis_client.c b/src/modules/ndb_redis/redis_client.c index 4b90c5f7235..029816c7e4b 100644 --- a/src/modules/ndb_redis/redis_client.c +++ b/src/modules/ndb_redis/redis_client.c @@ -50,6 +50,15 @@ extern int redis_connect_timeout_param; extern int redis_cmd_timeout_param; extern int redis_cluster_param; +/* backwards compatibility with hiredis < 0.12 */ +#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12) +typedef char *sds; +sds sdscatlen(sds s, const void *t, size_t len); +int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len); +#else +#define redis_append_formatted_command redisAppendFormattedCommand +#endif + /** * */ @@ -421,7 +430,7 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...) LM_ERR("no redis context for server: %.*s\n", srv->len, srv->s); goto error_cmd; } - if (rsrv->pendingReplies >= MAXIMUM_PIPELINED_COMMANDS) + if (rsrv->piped.pending_commands >= MAXIMUM_PIPELINED_COMMANDS) { LM_ERR("Too many pipelined commands, maximum is %d\n",MAXIMUM_PIPELINED_COMMANDS); goto error_cmd; @@ -435,13 +444,17 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...) c = cmd->s[cmd->len]; cmd->s[cmd->len] = '\0'; - if (redisvAppendCommand(rsrv->ctxRedis,cmd->s,ap) != REDIS_OK) + rsrv->piped.commands[rsrv->piped.pending_commands].len = redisvFormatCommand( + &rsrv->piped.commands[rsrv->piped.pending_commands].s, + cmd->s, + ap); + if (rsrv->piped.commands[rsrv->piped.pending_commands].len < 0) { LM_ERR("Invalid redis command : %s\n",cmd->s); goto error_cmd; } - rsrv->pipelinedReplies[rsrv->pendingReplies]=rpl; - rsrv->pendingReplies++; + rsrv->piped.replies[rsrv->piped.pending_commands]=rpl; + rsrv->piped.pending_commands++; cmd->s[cmd->len] = c; va_end(ap); @@ -495,7 +508,7 @@ int redisc_exec_pipelined_cmd_all() rsrv=_redisc_srv_list; while(rsrv!=NULL) { - if ((rsrv->ctxRedis != NULL) && (rsrv->pendingReplies != 0)) + if ((rsrv->ctxRedis != NULL) && (rsrv->piped.pending_commands != 0)) { redisc_exec_pipelined(rsrv); } @@ -505,6 +518,48 @@ int redisc_exec_pipelined_cmd_all() return 0; } +/** + * + */ +int redisc_create_pipelined_message(redisc_server_t *rsrv) +{ + int i; + + if (rsrv->ctxRedis->err) + { + LM_DBG("Reconnecting server because of error %d: \"%s\"",rsrv->ctxRedis->err,rsrv->ctxRedis->errstr); + if (redisc_reconnect_server(rsrv)) + { + LM_ERR("unable to reconnect to REDIS server: %.*s\n", rsrv->sname->len,rsrv->sname->s); + return -1; + } + } + + for (i=0;ipiped.pending_commands;i++) + { + if (redis_append_formatted_command(rsrv->ctxRedis,rsrv->piped.commands[i].s,rsrv->piped.commands[i].len) != REDIS_OK) + { + LM_ERR("Error while appending command %d",i); + return -1; + } + } + return 0; +} + +/** + * + */ +void redisc_free_pipelined_cmds(redisc_server_t *rsrv) +{ + int i; + for (i=0;ipiped.pending_commands;i++) + { + free(rsrv->piped.commands[i].s); + rsrv->piped.commands[i].len=0; + } + rsrv->piped.pending_commands=0; +} + /** * */ @@ -513,14 +568,19 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) redisc_reply_t *rpl; int i; LM_DBG("redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s); - if (rsrv->pendingReplies == 0) + if (rsrv->piped.pending_commands == 0) { - LM_ERR("call for redis_cmd without any pipelined commands\n"); + LM_WARN("call for redis_cmd without any pipelined commands\n"); return -1; } + if(rsrv->ctxRedis==NULL) + { + LM_ERR("no redis context for server: %.*s\n", rsrv->sname->len,rsrv->sname->s); + goto error_exec; + } - /* send the first command and wait for the replies */ - rpl=rsrv->pipelinedReplies[0]; + /* send the commands and retrieve the first reply */ + rpl=rsrv->piped.replies[0]; if(rpl->rplRedis!=NULL) { @@ -529,7 +589,9 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) rpl->rplRedis = NULL; } + redisc_create_pipelined_message(rsrv); redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis); + if (rpl->rplRedis == NULL) { /* null reply, reconnect and try again */ @@ -537,7 +599,7 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) { LM_ERR("Redis error: %s\n", rsrv->ctxRedis->errstr); } - if (redisc_reconnect_server(rsrv) == 0) + if (redisc_create_pipelined_message(rsrv) == 0) { redisGetReply(rsrv->ctxRedis, (void**) &rpl->rplRedis); if (rpl->rplRedis == NULL) @@ -548,28 +610,27 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) } else { - LM_ERR("unable to reconnect to redis server: %.*s\n", rsrv->sname->len,rsrv->sname->s); goto error_exec; } } LM_DBG("reply is [%s]",rpl->rplRedis->str); /* replies are received just retrieve them */ - for (i=1;ipendingReplies;i++) + for (i=1;ipiped.pending_commands;i++) { - rpl=rsrv->pipelinedReplies[i]; + rpl=rsrv->piped.replies[i]; if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK) { LM_ERR("Unable to read reply\n"); - goto error_exec; + continue; } LM_DBG("reply is [%s]",rpl->rplRedis->str); } - rsrv->pendingReplies = 0; + redisc_free_pipelined_cmds(rsrv); return 0; error_exec: - rsrv->pendingReplies = 0; + redisc_free_pipelined_cmds(rsrv); return -1; } @@ -659,7 +720,7 @@ int redisc_exec(str *srv, str *res, str *cmd, ...) } LM_DBG("rsrv->ctxRedis = %p\n", rsrv->ctxRedis); - if (rsrv->pendingReplies != 0) + if (rsrv->piped.pending_commands != 0) { LM_NOTICE("Calling redis_cmd with pipelined commands in the buffer. Automatically call redis_execute"); redisc_exec_pipelined(rsrv); @@ -919,3 +980,20 @@ int redisc_check_auth(redisc_server_t *rsrv, char *pass) freeReplyObject(reply); return retval; } + +/* backwards compatibility with hiredis < 0.12 */ +#if (HIREDIS_MAJOR == 0) && (HIREDIS_MINOR < 12) +int redis_append_formatted_command(redisContext *c, const char *cmd, size_t len) +{ + sds newbuf; + + newbuf = sdscatlen(c->obuf,cmd,len); + if (newbuf == NULL) { + c->err = REDIS_ERR_OOM; + strcpy(c->errstr,"Out of memory"); + return REDIS_ERR; + } + c->obuf = newbuf; + return REDIS_OK; +} +#endif diff --git a/src/modules/ndb_redis/redis_client.h b/src/modules/ndb_redis/redis_client.h index b99da28b81b..fe47583744b 100644 --- a/src/modules/ndb_redis/redis_client.h +++ b/src/modules/ndb_redis/redis_client.h @@ -47,14 +47,19 @@ typedef struct redisc_reply { struct redisc_reply *next; } redisc_reply_t; +typedef struct redisc_piped_cmds { + str commands[MAXIMUM_PIPELINED_COMMANDS]; + redisc_reply_t *replies[MAXIMUM_PIPELINED_COMMANDS]; + int pending_commands; +} redisc_piped_cmds_t; + typedef struct redisc_server { str *sname; unsigned int hname; param_t *attrs; redisContext *ctxRedis; struct redisc_server *next; - redisc_reply_t *pipelinedReplies[MAXIMUM_PIPELINED_COMMANDS]; - int pendingReplies; + redisc_piped_cmds_t piped; } redisc_server_t; typedef struct redisc_pv { @@ -75,6 +80,8 @@ int redisc_append_cmd(str *srv, str *res, str *cmd, ...); int redisc_exec_pipelined_cmd(str *srv); int redisc_exec_pipelined_cmd_all(); int redisc_exec_pipelined(redisc_server_t *rsrv); +int redisc_create_pipelined_message(redisc_server_t *rsrv); +void redisc_free_pipelined_cmds(redisc_server_t *rsrv); redisReply* redisc_exec_argv(redisc_server_t *rsrv, int argc, const char **argv, const size_t *argvlen); redisc_reply_t *redisc_get_reply(str *name); From 4ac34dc72488f98c3deba727fe9cffbb186a5b1e Mon Sep 17 00:00:00 2001 From: Claudiu Boriga Date: Fri, 28 Apr 2017 10:00:11 +0300 Subject: [PATCH 2/2] ndb_redis: fix memory leak - fix memory leak when re-using a reply-id --- src/modules/ndb_redis/redis_client.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/modules/ndb_redis/redis_client.c b/src/modules/ndb_redis/redis_client.c index 029816c7e4b..1e4a573daf0 100644 --- a/src/modules/ndb_redis/redis_client.c +++ b/src/modules/ndb_redis/redis_client.c @@ -619,6 +619,12 @@ int redisc_exec_pipelined(redisc_server_t *rsrv) for (i=1;ipiped.pending_commands;i++) { rpl=rsrv->piped.replies[i]; + if(rpl->rplRedis!=NULL) + { + /* clean up previous redis reply */ + freeReplyObject(rpl->rplRedis); + rpl->rplRedis = NULL; + } if (redisGetReplyFromReader(rsrv->ctxRedis, (void**) &rpl->rplRedis) != REDIS_OK) { LM_ERR("Unable to read reply\n");