Skip to content

Commit

Permalink
db_redis: use cluster api
Browse files Browse the repository at this point in the history
  • Loading branch information
Riccardo-78 authored and miconda committed Jan 20, 2022
1 parent f15c768 commit 8eec011
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 6 deletions.
115 changes: 112 additions & 3 deletions src/modules/db_redis/redis_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@
#include "redis_table.h"
#include "redis_dbase.h"

#ifdef WITH_HIREDIS_CLUSTER
static unsigned int MAX_URL_LENGTH = 1023;
#define redisCommand redisClusterCommand
#define redisFree redisClusterFree
#define redisCommandArgv redisClusterCommandArgv
#define redisAppendCommandArgv redisClusterAppendCommandArgv
#define redisGetReply redisClusterGetReply
#endif

extern int db_redis_verbosity;

static void print_query(redis_key_t *query) {
Expand Down Expand Up @@ -104,21 +113,59 @@ static redis_key_t* db_redis_shift_query(km_redis_con_t *con) {
int db_redis_connect(km_redis_con_t *con) {
struct timeval tv;
redisReply *reply;
#ifndef WITH_HIREDIS_CLUSTER
int db;
#endif

tv.tv_sec = 1;
tv.tv_usec = 0;

#ifndef WITH_HIREDIS_CLUSTER
db = atoi(con->id->database);
#endif
reply = NULL;

// TODO: introduce require_master mod-param and check if we're indeed master
// TODO: on carrier, if we have db fail-over, the currently connected
// redis server will become slave without dropping connections?

#ifdef WITH_HIREDIS_CLUSTER
int status;
char hosts[MAX_URL_LENGTH];
char* host_begin;
char* host_end;
LM_DBG("connecting to redis cluster at %.*s\n", con->id->url.len, con->id->url.s);
host_begin = strstr(con->id->url.s, "redis://");
if (host_begin) {
host_begin += 8;
} else {
LM_ERR("invalid url scheme\n");
goto err;
}
host_end = strstr(host_begin, "/");
if (! host_end) {
LM_ERR("invalid url: cannot find end of host part\n");
goto err;
}
if ((host_end - host_begin) > (MAX_URL_LENGTH-1)) {
LM_ERR("url too long\n");
goto err;
}
strncpy(hosts, host_begin, (host_end - host_begin));
hosts[MAX_URL_LENGTH-1] = '\0';
con->con = redisClusterContextInit();
if (! con->con) {
LM_ERR("no private memory left\n");
goto err;
}
redisClusterSetOptionAddNodes(con->con, hosts);
redisClusterSetOptionConnectTimeout(con->con, tv);
status = redisClusterConnect2(con->con);
if (status != REDIS_OK) {
LM_ERR("cannot open connection to cluster with hosts: %s, error: %s\n", hosts, con->con->errstr);
goto err;
}
#else
LM_DBG("connecting to redis at %s:%d\n", con->id->host, con->id->port);
con->con = redisConnectWithTimeout(con->id->host, con->id->port, tv);

if (!con->con) {
LM_ERR("cannot open connection: %.*s\n", con->id->url.len, con->id->url.s);
goto err;
Expand All @@ -128,6 +175,7 @@ int db_redis_connect(km_redis_con_t *con) {
con->con->errstr);
goto err;
}
#endif

if (con->id->password) {
reply = redisCommand(con->con, "AUTH %s", con->id->password);
Expand All @@ -144,6 +192,7 @@ int db_redis_connect(km_redis_con_t *con) {
freeReplyObject(reply); reply = NULL;
}

#ifndef WITH_HIREDIS_CLUSTER
reply = redisCommand(con->con, "PING");
if (!reply) {
LM_ERR("cannot ping server on connection %.*s: %s\n",
Expand All @@ -169,8 +218,10 @@ int db_redis_connect(km_redis_con_t *con) {
goto err;
}
freeReplyObject(reply); reply = NULL;
#endif
LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);

#ifndef WITH_HIREDIS_CLUSTER
reply = redisCommand(con->con, "SCRIPT LOAD %s", SREM_KEY_LUA);
if (!reply) {
LM_ERR("failed to load LUA script to server %.*s: %s\n",
Expand All @@ -194,6 +245,7 @@ int db_redis_connect(km_redis_con_t *con) {
}
strcpy(con->srem_key_lua, reply->str);
freeReplyObject(reply); reply = NULL;
#endif
LM_DBG("connection opened to %.*s\n", con->id->url.len, con->id->url.s);

return 0;
Expand Down Expand Up @@ -291,6 +343,63 @@ void db_redis_free_connection(struct pool_con* con) {
pkg_free(_c);
}

#ifdef WITH_HIREDIS_CLUSTER
void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node) {
char **argv = NULL;
int argc;
#define MAX_CMD_LENGTH 256
char cmd[MAX_CMD_LENGTH] = "";
size_t cmd_len = MAX_CMD_LENGTH - 1;
int i;

print_query(query);

argc = db_redis_key_list2arr(query, &argv);
if (argc < 0) {
LM_ERR("Failed to allocate memory for query array\n");
return NULL;
}
LM_DBG("query has %d args\n", argc);

for (i=0; i<argc; i++)
{
size_t arg_len = strlen(argv[i]);
if (arg_len > cmd_len)
break;
strncat(cmd, argv[i], cmd_len);
cmd_len = cmd_len - arg_len;
if (cmd_len == 0)
break;

if (i != argc - 1) {
strncat(cmd, " ", cmd_len);
cmd_len--;
}

}

LM_DBG("cmd is %s\n", cmd);

redisReply *reply = redisClusterCommandToNode(con->con, node, cmd);
if (con->con->err != REDIS_OK) {
LM_DBG("redis connection is gone, try reconnect. (%d:%s)\n", con->con->err, con->con->errstr);
if (db_redis_connect(con) != 0) {
LM_ERR("Failed to reconnect to redis db\n");
pkg_free(argv);
if (con->con) {
redisFree(con->con);
con->con = NULL;
}
return NULL;
}
reply = redisClusterCommandToNode(con->con, node, cmd);
}
pkg_free(argv);
return reply;
}

#endif

void *db_redis_command_argv(km_redis_con_t *con, redis_key_t *query) {
char **argv = NULL;
int argc;
Expand Down
34 changes: 33 additions & 1 deletion src/modules/db_redis/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
#ifndef _REDIS_CONNECTION_H_
#define _REDIS_CONNECTION_H_

#ifdef WITH_HIREDIS_CLUSTER
#include <hircluster.h>
#else
#ifdef WITH_HIREDIS_PATH
#include <hiredis/hiredis.h>
#else
#include <hiredis.h>
#endif
#endif

#include "db_redis_mod.h"

#ifndef WITH_REDIS_CLUSTER
#define db_redis_check_reply(con, reply, err) do { \
if (!(reply) && !(con)->con) { \
LM_ERR("Failed to fetch type entry: no connection to server\n"); \
Expand All @@ -49,6 +54,26 @@
goto err; \
} \
} while(0);
#else
#define db_redis_check_reply(con, reply, err) do { \
if (!(reply) && !(con)->con) { \
LM_ERR("Failed to fetch type entry: no connection to server\n"); \
goto err; \
} \
if (!(reply)) { \
LM_ERR("Failed to fetch type entry: %s\n", \
(con)->con->errstr); \
redisClusterFree((con)->con); \
(con)->con = NULL; \
goto err; \
} \
if ((reply)->type == REDIS_REPLY_ERROR) { \
LM_ERR("Failed to fetch type entry: %s\n", \
(reply)->str); \
goto err; \
} \
} while(0);
#endif

typedef struct redis_key redis_key_t;

Expand All @@ -61,8 +86,11 @@ typedef struct km_redis_con {
struct db_id* id;
unsigned int ref;
struct pool_con* next;

#ifdef WITH_HIREDIS_CLUSTER
redisClusterContext *con;
#else
redisContext *con;
#endif
redis_command_t *command_queue;
unsigned int append_counter;
struct str_hash_table tables;
Expand All @@ -86,4 +114,8 @@ void db_redis_consume_replies(km_redis_con_t *con);
void db_redis_free_reply(redisReply **reply);
const char *db_redis_get_error(km_redis_con_t *con);

#ifdef WITH_HIREDIS_CLUSTER
void *db_redis_command_argv_to_node(km_redis_con_t *con, redis_key_t *query, cluster_node *node);
#endif

#endif /* _REDIS_CONNECTION_H_ */

0 comments on commit 8eec011

Please sign in to comment.