diff --git a/src/modules/topos_redis/Makefile b/src/modules/topos_redis/Makefile new file mode 100644 index 00000000000..1f26efce9a2 --- /dev/null +++ b/src/modules/topos_redis/Makefile @@ -0,0 +1,34 @@ +# +# WARNING: do not run this directly, it should be run by the master Makefile + +include ../../Makefile.defs +auto_gen= +NAME=topos_redis.so + +ifeq ($(CROSS_COMPILE),) +HIREDIS_BUILDER = $(shell \ + if pkg-config --exists hiredis; then \ + echo 'pkg-config hiredis'; \ + fi) +endif + +ifeq ($(HIREDIS_BUILDER),) + HIREDISDEFS=-I$(LOCALBASE)/include + HIREDISLIBS=-L$(LOCALBASE)/lib -lhiredis +else + HIREDISDEFS = $(shell $(HIREDIS_BUILDER) --cflags) + HIREDISLIBS = $(shell $(HIREDIS_BUILDER) --libs) + +ifeq ($(HIREDISLIBS),-L -lhiredis) + HIREDISDEFS = $(shell $(HIREDIS_BUILDER) --cflags) /opt/local/include + HIREDISLIBS = -L/opt/local/lib -lhiredis +endif + +endif + +DEFS+=$(HIREDISDEFS) +LIBS=$(HIREDISLIBS) + +DEFS+=-DKAMAILIO_MOD_INTERFACE + +include ../../Makefile.modules diff --git a/src/modules/topos_redis/README b/src/modules/topos_redis/README new file mode 100644 index 00000000000..c1b038dffc8 --- /dev/null +++ b/src/modules/topos_redis/README @@ -0,0 +1,94 @@ +TOPOS_REDIS Module + +Daniel-Constantin Mierla + + + +Edited by + +Daniel-Constantin Mierla + + + + Copyright © 2017 kamailio.org + + Copyright © 2017 flowroute.com + __________________________________________________________________ + + Table of Contents + + 1. Admin Guide + + 1. Overview + 2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + + 3. Parameters + + 3.1. serverid (str) + + List of Examples + + 1.1. Set serverid parameter + +Chapter 1. Admin Guide + + Table of Contents + + 1. Overview + 2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + + 3. Parameters + + 3.1. serverid (str) + +1. Overview + + This module offers REDIS storage support for TOPOS module. + +2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + +2.1. Kamailio Modules + + The following modules must be loaded before this module: + * ndb_redis module - for interaction with a REDIS server. + * topos module - to bind to as a storage engine. + +2.2. External Libraries or Applications + + The following libraries or applications must be installed before + running Kamailio with this module loaded: + * hiredis - available at https://github.com/antirez/hiredis . + +3. Parameters + + 3.1. serverid (str) + +3.1. serverid (str) + + The name of the REDIS server to be used as provided to the 'name' + attribute of 'server' parameter for NDB_REDIS module. + + The 'storage' parameter for TOPOS module must be set to 'redis'. + + Default value is “NULL”. + + Example 1.1. Set serverid parameter +... +# ----- ndb_redis params ----- +modparam("ndb_redis", "server", "name=srv8;addr=127.0.0.1;port=6379;db=8") + +# ----- topos params ----- +modparam("topos", "storage", "redis") + +# ----- topos_redis params ----- +modparam("topos_redis", "serverid", "srv8") +... diff --git a/src/modules/topos_redis/doc/Makefile b/src/modules/topos_redis/doc/Makefile new file mode 100644 index 00000000000..74204068b5a --- /dev/null +++ b/src/modules/topos_redis/doc/Makefile @@ -0,0 +1,4 @@ +docs = topos_redis.xml + +docbook_dir = ../../../../doc/docbook +include $(docbook_dir)/Makefile.module diff --git a/src/modules/topos_redis/doc/topos_redis.xml b/src/modules/topos_redis/doc/topos_redis.xml new file mode 100644 index 00000000000..f6a5314d825 --- /dev/null +++ b/src/modules/topos_redis/doc/topos_redis.xml @@ -0,0 +1,40 @@ + + + +%docentities; + +]> + + + + TOPOS_REDIS Module + kamailio.org + + + Daniel-Constantin + Mierla + miconda@gmail.com + + + Daniel-Constantin + Mierla + miconda@gmail.com + + + + 2017 + kamailio.org + + + 2017 + flowroute.com + + + + + + + diff --git a/src/modules/topos_redis/doc/topos_redis_admin.xml b/src/modules/topos_redis/doc/topos_redis_admin.xml new file mode 100644 index 00000000000..0cd840d9806 --- /dev/null +++ b/src/modules/topos_redis/doc/topos_redis_admin.xml @@ -0,0 +1,94 @@ + + + +%docentities; + +]> + + + + + &adminguide; + +
+ Overview + + This module offers REDIS storage support for TOPOS module. + +
+
+ Dependencies +
+ &kamailio; Modules + + The following modules must be loaded before this module: + + + + ndb_redis module - for interaction with + a REDIS server. + + + + + topos module - to bind to as a storage + engine. + + + + +
+
+ External Libraries or Applications + + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + + + + hiredis - available at + https://github.com/antirez/hiredis . + + + + +
+
+
+ Parameters +
+ <varname>serverid</varname> (str) + + The name of the REDIS server to be used as provided to the 'name' + attribute of 'server' parameter for NDB_REDIS module. + + + The 'storage' parameter for TOPOS module must be set to 'redis'. + + + + Default value is NULL. + + + + Set <varname>serverid</varname> parameter + +... +# ----- ndb_redis params ----- +modparam("ndb_redis", "server", "name=srv8;addr=127.0.0.1;port=6379;db=8") + +# ----- topos params ----- +modparam("topos", "storage", "redis") + +# ----- topos_redis params ----- +modparam("topos_redis", "serverid", "srv8") +... + + +
+
+
+ diff --git a/src/modules/topos_redis/topos_redis_mod.c b/src/modules/topos_redis/topos_redis_mod.c new file mode 100644 index 00000000000..0fdef7882c7 --- /dev/null +++ b/src/modules/topos_redis/topos_redis_mod.c @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2017 kamailio.org + * Copyright (C) 2017 flowroute.com + * + * This file is part of Kamailio, a free SIP server. + * + * Kamailio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * Kamailio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include +#include +#include +#include + +#include "../../core/sr_module.h" +#include "../../core/dprint.h" +#include "../../core/mod_fix.h" + +#include "../ndb_redis/api.h" +#include "../topos/api.h" + +#include "topos_redis_storage.h" + +MODULE_VERSION + +str _topos_redis_serverid = STR_NULL; + +static int mod_init(void); +static void mod_destroy(void); +static int child_init(int rank); + +tps_storage_api_t _tps_storage_api = {0}; +topos_api_t _tps_api = {0}; + +ndb_redis_api_t _tps_redis_api = {0}; + +static cmd_export_t cmds[]={ + {0, 0, 0, 0, 0, 0} +}; + +static param_export_t params[]={ + {"serverid", PARAM_STR, &_topos_redis_serverid}, + {0, 0, 0} +}; + +struct module_exports exports = { + "topos_redis", + DEFAULT_DLFLAGS, /* dlopen flags */ + cmds, + params, + 0, + 0, /* exported MI functions */ + 0, /* exported pseudo-variables */ + 0, /* extra processes */ + mod_init, /* module initialization function */ + 0, /* response function */ + mod_destroy, /* destroy function */ + child_init /* per child init function */ +}; + + +/** + * + */ +static int mod_init(void) +{ + if(_topos_redis_serverid.s==NULL + || _topos_redis_serverid.len<=0) { + LM_ERR("invalid serverid parameter\n"); + return -1; + } + if(topos_load_api(&_tps_api)<0) { + LM_ERR("failed to bind to topos module\n"); + return -1; + } + if(ndb_redis_load_api(&_tps_redis_api)) { + LM_ERR("failed to bind to ndb_redis module\n"); + return -1; + } + + _tps_storage_api.insert_dialog = tps_redis_insert_dialog; + _tps_storage_api.clean_dialogs = tps_redis_clean_dialogs; + _tps_storage_api.insert_branch = tps_redis_insert_branch; + _tps_storage_api.clean_branches = tps_redis_clean_branches; + _tps_storage_api.load_branch = tps_redis_load_branch; + _tps_storage_api.load_dialog = tps_redis_load_dialog; + _tps_storage_api.update_dialog = tps_redis_update_dialog; + _tps_storage_api.end_dialog = tps_redis_end_dialog; + + if(_tps_api.set_storage_api(&_tps_storage_api)<0) { + LM_ERR("failed to set topos storae api\n"); + return -1; + } + return 0; +} + +/** + * + */ +static int child_init(int rank) +{ + /* skip child init for non-worker process ranks */ + if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) + return 0; + + return 0; +} + +/** + * + */ +static void mod_destroy(void) +{ + LM_DBG("cleaning up\n"); +} diff --git a/src/modules/topos_redis/topos_redis_storage.c b/src/modules/topos_redis/topos_redis_storage.c new file mode 100644 index 00000000000..204d9fbd07e --- /dev/null +++ b/src/modules/topos_redis/topos_redis_storage.c @@ -0,0 +1,838 @@ +/** + * Copyright (C) 2017 kamailio.org + * Copyright (C) 2017 flowroute.com + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/*! + * \file + * \brief Kamailio topos :: + * \ingroup topos + * Module: \ref topos + */ + +#include +#include +#include + +#include "../../core/dprint.h" +#include "../../core/ut.h" + +#include "../ndb_redis/api.h" +#include "../topos/api.h" + +#include "topos_redis_storage.h" + + +extern str _topos_redis_serverid; +extern ndb_redis_api_t _tps_redis_api; + +static str _tps_redis_bprefix = str_init("b:x:"); +static str _tps_redis_dprefix = str_init("d:z:"); + +// void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen); + +#define TPS_REDIS_NR_KEYS 32 +#define TPS_REDIS_DATA_SIZE 8192 + +static char _tps_redis_cbuf[TPS_DATA_SIZE]; + +/** + * storage keys + */ +str td_key_rectime = str_init("rectime"); +str td_key_a_callid = str_init("a_callid"); +str td_key_a_uuid = str_init("a_uuid"); +str td_key_b_uuid = str_init("b_uuid"); +str td_key_a_contact = str_init("a_contact"); +str td_key_b_contact = str_init("b_contact"); +str td_key_as_contact = str_init("as_contact"); +str td_key_bs_contact = str_init("bs_contact"); +str td_key_a_tag = str_init("a_tag"); +str td_key_b_tag = str_init("b_tag"); +str td_key_a_rr = str_init("a_rr"); +str td_key_b_rr = str_init("b_rr"); +str td_key_s_rr = str_init("s_rr"); +str td_key_iflags = str_init("iflags"); +str td_key_a_uri = str_init("a_uri"); +str td_key_b_uri = str_init("b_uri"); +str td_key_r_uri = str_init("r_uri"); +str td_key_a_srcaddr = str_init("a_srcaddr"); +str td_key_b_srcaddr = str_init("b_srcaddr"); +str td_key_s_method = str_init("s_method"); +str td_key_s_cseq = str_init("s_cseq"); + +str tt_key_rectime = str_init("rectime"); +str tt_key_a_callid = str_init("a_callid"); +str tt_key_a_uuid = str_init("a_uuid"); +str tt_key_b_uuid = str_init("b_uuid"); +str tt_key_direction = str_init("direction"); +str tt_key_x_via = str_init("x_via"); +str tt_key_x_vbranch = str_init("x_vbranch"); +str tt_key_x_rr = str_init("x_rr"); +str tt_key_y_rr = str_init("y_rr"); +str tt_key_s_rr = str_init("s_rr"); +str tt_key_x_uri = str_init("x_uri"); +str tt_key_x_tag = str_init("x_tag"); +str tt_key_s_method = str_init("s_method"); +str tt_key_s_cseq = str_init("s_cseq"); + +#define TPS_REDIS_SET_ARGS(sval, argc, akey, argv, argvlen) \ + do { \ + if((sval)->s!=NULL && (sval)->len>0) { \ + argv[argc] = (akey)->s; \ + argvlen[argc] = (akey)->len; \ + argc++; \ + argv[argc] = (sval)->s; \ + argvlen[argc] = (sval)->len; \ + argc++; \ + } \ + } while(0) + +#define TPS_REDIS_SET_ARGSX(sval, argc, akey, argv, argvlen) \ + do { \ + if((sval)!=NULL) { \ + TPS_REDIS_SET_ARGS(sval, argc, akey, argv, argvlen); \ + } \ + } while(0) + +#define TPS_REDIS_SET_ARGN(nval, rp, sval, argc, akey, argv, argvlen) \ + do { \ + (sval)->s = int2bstr((unsigned long)nval, rp, &(sval)->len); \ + rp = (sval)->s + (sval)->len + 1; \ + TPS_REDIS_SET_ARGS((sval), argc, akey, argv, argvlen); \ + } while(0) + +/** + * + */ +int tps_redis_insert_dialog(tps_data_t *td) +{ + char* argv[TPS_REDIS_NR_KEYS]; + size_t argvlen[TPS_REDIS_NR_KEYS]; + int argc = 0; + str rcmd = str_init("HMSET"); + str rkey = STR_NULL; + char *rp; + str rval = STR_NULL; + redisc_server_t *rsrv = NULL; + redisReply *rrpl = NULL; + unsigned long lval = 0; + + if(td->a_uuid.len<=0 && td->b_uuid.len<=0) { + LM_INFO("no uuid for this message\n"); + return -1; + } + + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); + if(rsrv==NULL) { + LM_ERR("cannot find redis server [%.*s]\n", + _topos_redis_serverid.len, _topos_redis_serverid.s); + return -1; + } + + memset(argv, 0, TPS_REDIS_NR_KEYS * sizeof(char*)); + memset(argvlen, 0, TPS_REDIS_NR_KEYS * sizeof(size_t)); + argc = 0; + + rp = _tps_redis_cbuf; + memcpy(rp, _tps_redis_dprefix.s, _tps_redis_dprefix.len); + + if(td->a_uuid.len>0) { + memcpy(rp + _tps_redis_dprefix.len, + td->a_uuid.s, td->a_uuid.len); + if(td->a_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+td->a_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+td->a_uuid.len; + rp += _tps_redis_dprefix.len+td->a_uuid.len+1; + } else { + memcpy(rp + _tps_redis_dprefix.len, + td->b_uuid.s, td->b_uuid.len); + if(td->b_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+td->b_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+td->b_uuid.len; + rp += _tps_redis_dprefix.len+td->b_uuid.len+1; + } + + argv[argc] = rcmd.s; + argvlen[argc] = rcmd.len; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + lval = (unsigned long)time(NULL); + TPS_REDIS_SET_ARGN(lval, rp, &rval, argc, &td_key_rectime, + argv, argvlen); + TPS_REDIS_SET_ARGS(&td->a_callid, argc, &td_key_a_callid, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_uuid, argc, &td_key_a_uuid, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_uuid, argc, &td_key_b_uuid, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_contact, argc, &td_key_a_contact, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_contact, argc, &td_key_b_contact, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->as_contact, argc, &td_key_as_contact, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->bs_contact, argc, &td_key_bs_contact, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_tag, argc, &td_key_a_tag, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_tag, argc, &td_key_b_tag, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_rr, argc, &td_key_a_rr, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_rr, argc, &td_key_b_rr, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->s_rr, argc, &td_key_s_rr, argv, argvlen); + + TPS_REDIS_SET_ARGN(td->iflags, rp, &rval, argc, &td_key_iflags, + argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_uri, argc, &td_key_a_uri, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_uri, argc, &td_key_b_uri, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->r_uri, argc, &td_key_r_uri, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_srcaddr, argc, &td_key_a_srcaddr, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_srcaddr, argc, &td_key_b_srcaddr, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->s_method, argc, &td_key_s_method, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->s_cseq, argc, &td_key_s_cseq, argv, argvlen); + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + LM_DBG("inserted dialog record for [%.*s] with argc %d\n", + rkey.len, rkey.s, argc); + freeReplyObject(rrpl); + + return 0; +} + +/** + * + */ +int tps_redis_clean_dialogs(void) +{ + return 0; +} + +/** + * + */ +int tps_redis_insert_branch(tps_data_t *td) +{ + char* argv[TPS_REDIS_NR_KEYS]; + size_t argvlen[TPS_REDIS_NR_KEYS]; + int argc = 0; + str rcmd = str_init("HMSET"); + str rkey = STR_NULL; + char *rp; + str rval = STR_NULL; + redisc_server_t *rsrv = NULL; + redisReply *rrpl = NULL; + unsigned long lval = 0; + + if(td->x_vbranch1.len<=0) { + LM_INFO("no via branch for this message\n"); + return -1; + } + + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); + if(rsrv==NULL) { + LM_ERR("cannot find redis server [%.*s]\n", + _topos_redis_serverid.len, _topos_redis_serverid.s); + return -1; + } + + memset(argv, 0, TPS_REDIS_NR_KEYS * sizeof(char*)); + memset(argvlen, 0, TPS_REDIS_NR_KEYS * sizeof(size_t)); + argc = 0; + + rp = _tps_redis_cbuf; + memcpy(rp, _tps_redis_bprefix.s, _tps_redis_bprefix.len); + memcpy(rp + _tps_redis_bprefix.len, + td->x_vbranch1.s, td->x_vbranch1.len); + rp[_tps_redis_bprefix.len+td->x_vbranch1.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_bprefix.len+td->x_vbranch1.len; + rp += _tps_redis_bprefix.len+td->x_vbranch1.len+1; + + argv[argc] = rcmd.s; + argvlen[argc] = rcmd.len; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + lval = (unsigned long)time(NULL); + TPS_REDIS_SET_ARGN(lval, rp, &rval, argc, &tt_key_rectime, + argv, argvlen); + TPS_REDIS_SET_ARGS(&td->a_callid, argc, &tt_key_a_callid, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->a_uuid, argc, &tt_key_a_uuid, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->b_uuid, argc, &tt_key_b_uuid, argv, argvlen); + + TPS_REDIS_SET_ARGN(td->direction, rp, &rval, argc, &tt_key_direction, + argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->x_via, argc, &tt_key_x_via, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->x_vbranch1, argc, &tt_key_x_vbranch, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->x_rr, argc, &tt_key_x_rr, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->y_rr, argc, &tt_key_y_rr, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->s_rr, argc, &tt_key_s_rr, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->x_uri, argc, &tt_key_x_uri, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->x_tag, argc, &tt_key_x_tag, argv, argvlen); + + TPS_REDIS_SET_ARGS(&td->s_method, argc, &tt_key_s_method, argv, argvlen); + TPS_REDIS_SET_ARGS(&td->s_cseq, argc, &tt_key_s_cseq, argv, argvlen); + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + LM_DBG("inserting branch record for [%.*s] with argc %d\n", + rkey.len, rkey.s, argc); + + freeReplyObject(rrpl); + + return 0; +} + +/** + * + */ +int tps_redis_clean_branches(void) +{ + return 0; +} + +#define TPS_REDIS_DATA_APPEND(_sd, _k, _v, _r) \ + do { \ + if((_sd)->cp + (_v)->len >= (_sd)->cbuf + TPS_DATA_SIZE) { \ + LM_ERR("not enough space for %.*s\n", (_k)->len, (_k)->s); \ + goto error; \ + } \ + if((_v)->len>0) { \ + (_r)->s = (_sd)->cp; \ + (_r)->len = (_v)->len; \ + memcpy((_sd)->cp, (_v)->s, (_v)->len); \ + (_sd)->cp += (_v)->len; \ + (_sd)->cp[0] = '\0'; \ + (_sd)->cp++; \ + } \ + } while(0) + +/** + * + */ +int tps_redis_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) +{ + char* argv[TPS_REDIS_NR_KEYS]; + size_t argvlen[TPS_REDIS_NR_KEYS]; + int argc = 0; + str rcmd = str_init("HGETALL"); + str rkey = STR_NULL; + char *rp; + int i; + redisc_server_t *rsrv = NULL; + redisReply *rrpl = NULL; + str skey = STR_NULL; + str sval = STR_NULL; + + if(msg==NULL || md==NULL || sd==NULL) + return -1; + + if(md->x_vbranch1.len<=0) { + LM_INFO("no via branch for this message\n"); + return -1; + } + + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); + if(rsrv==NULL) { + LM_ERR("cannot find redis server [%.*s]\n", + _topos_redis_serverid.len, _topos_redis_serverid.s); + return -1; + } + + memset(argv, 0, TPS_REDIS_NR_KEYS * sizeof(char*)); + memset(argvlen, 0, TPS_REDIS_NR_KEYS * sizeof(size_t)); + argc = 0; + + rp = _tps_redis_cbuf; + memcpy(rp, _tps_redis_bprefix.s, _tps_redis_bprefix.len); + memcpy(rp + _tps_redis_bprefix.len, + md->x_vbranch1.s, md->x_vbranch1.len); + rp[_tps_redis_bprefix.len+md->x_vbranch1.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_bprefix.len+md->x_vbranch1.len; + rp += _tps_redis_bprefix.len+md->x_vbranch1.len+1; + + argv[argc] = rcmd.s; + argvlen[argc] = rcmd.len; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + LM_DBG("loading branch record for [%.*s]\n", rkey.len, rkey.s); + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + + if(rrpl->type != REDIS_REPLY_ARRAY) { + LM_WARN("invalid redis result type: %d\n", rrpl->type); + freeReplyObject(rrpl); + return -1; + } + + if(rrpl->elements<=0) { + LM_DBG("hmap with key [%.*s] not found\n", rkey.len, rkey.s); + freeReplyObject(rrpl); + return 1; + } + if(rrpl->elements % 2) { + LM_DBG("hmap with key [%.*s] has invalid result\n", rkey.len, rkey.s); + freeReplyObject(rrpl); + return -1; + } + + sd->cp = sd->cbuf; + + for(i=0; ielements; i++) { + if(rrpl->element[i]->type != REDIS_REPLY_STRING) { + LM_ERR("invalid type for hmap[%.*s] key pos[%d]\n", + rkey.len, rkey.s, i); + freeReplyObject(rrpl); + return -1; + } + skey.s = rrpl->element[i]->str; + skey.len = rrpl->element[i]->len; + i++; + if(rrpl->element[i]==NULL) { + continue; + } + sval.s = NULL; + switch(rrpl->element[i]->type) { + case REDIS_REPLY_STRING: + LM_DBG("r[%d]: s[%.*s]\n", i, rrpl->element[i]->len, + rrpl->element[i]->str); + sval.s = rrpl->element[i]->str; + sval.len = rrpl->element[i]->len; + break; + case REDIS_REPLY_INTEGER: + LM_DBG("r[%d]: n[%lld]\n", i, rrpl->element[i]->integer); + break; + default: + LM_WARN("unexpected type [%d] at pos [%d]\n", + rrpl->element[i]->type, i); + } + if(sval.s==NULL) { + continue; + } + + if(skey.len==tt_key_rectime.len + && strncmp(skey.s, tt_key_rectime.s, skey.len)==0) { + /* skip - not needed */ + } else if(skey.len==tt_key_a_callid.len + && strncmp(skey.s, tt_key_a_callid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_callid); + } else if(skey.len==tt_key_a_uuid.len + && strncmp(skey.s, tt_key_a_uuid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_uuid); + } else if(skey.len==tt_key_b_uuid.len + && strncmp(skey.s, tt_key_b_uuid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_uuid); + } else if(skey.len==tt_key_direction.len + && strncmp(skey.s, tt_key_direction.s, skey.len)==0) { + /* skip - not needed */ + } else if(skey.len==tt_key_x_via.len + && strncmp(skey.s, tt_key_x_via.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->x_via); + } else if(skey.len==tt_key_x_vbranch.len + && strncmp(skey.s, tt_key_x_vbranch.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->x_vbranch1); + } else if(skey.len==tt_key_x_rr.len + && strncmp(skey.s, tt_key_x_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->x_rr); + } else if(skey.len==tt_key_y_rr.len + && strncmp(skey.s, tt_key_y_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->y_rr); + } else if(skey.len==tt_key_s_rr.len + && strncmp(skey.s, tt_key_s_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_rr); + } else if(skey.len==tt_key_x_uri.len + && strncmp(skey.s, tt_key_x_uri.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->x_uri); + } else if(skey.len==tt_key_x_tag.len + && strncmp(skey.s, tt_key_x_tag.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->x_tag); + } else if(skey.len==tt_key_s_method.len + && strncmp(skey.s, tt_key_s_method.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_method); + } else if(skey.len==tt_key_s_cseq.len + && strncmp(skey.s, tt_key_s_cseq.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_cseq); + } else { + LM_WARN("unknow key[%.*s]\n", skey.len, skey.s); + } + } + + freeReplyObject(rrpl); + return 0; + +error: + if(rrpl) freeReplyObject(rrpl); + return -1; +} + +/** + * + */ +int tps_redis_load_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) +{ + char* argv[TPS_REDIS_NR_KEYS]; + size_t argvlen[TPS_REDIS_NR_KEYS]; + int argc = 0; + str rcmd = str_init("HGETALL"); + str rkey = STR_NULL; + char *rp; + int i; + redisc_server_t *rsrv = NULL; + redisReply *rrpl = NULL; + str skey = STR_NULL; + str sval = STR_NULL; + + if(msg==NULL || md==NULL || sd==NULL) + return -1; + + if(md->a_uuid.len<=0 && md->b_uuid.len<=0) { + LM_DBG("no dlg uuid provided\n"); + return -1; + } + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); + if(rsrv==NULL) { + LM_ERR("cannot find redis server [%.*s]\n", + _topos_redis_serverid.len, _topos_redis_serverid.s); + return -1; + } + + memset(argv, 0, TPS_REDIS_NR_KEYS * sizeof(char*)); + memset(argvlen, 0, TPS_REDIS_NR_KEYS * sizeof(size_t)); + argc = 0; + + rp = _tps_redis_cbuf; + memcpy(rp, _tps_redis_dprefix.s, _tps_redis_dprefix.len); + + if(md->a_uuid.len>0) { + memcpy(rp + _tps_redis_dprefix.len, + md->a_uuid.s, md->a_uuid.len); + if(md->a_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+md->a_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+md->a_uuid.len; + rp += _tps_redis_dprefix.len+md->a_uuid.len+1; + } else { + memcpy(rp + _tps_redis_dprefix.len, + md->b_uuid.s, md->b_uuid.len); + if(md->b_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+md->b_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+md->b_uuid.len; + rp += _tps_redis_dprefix.len+md->b_uuid.len+1; + } + + argv[argc] = rcmd.s; + argvlen[argc] = rcmd.len; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + LM_DBG("loading dialog record for [%.*s]\n", rkey.len, rkey.s); + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + + if(rrpl->type != REDIS_REPLY_ARRAY) { + LM_WARN("invalid redis result type: %d\n", rrpl->type); + freeReplyObject(rrpl); + return -1; + } + + if(rrpl->elements<=0) { + LM_DBG("hmap with key [%.*s] not found\n", rkey.len, rkey.s); + freeReplyObject(rrpl); + return 1; + } + if(rrpl->elements % 2) { + LM_DBG("hmap with key [%.*s] has invalid result\n", rkey.len, rkey.s); + freeReplyObject(rrpl); + return -1; + } + + sd->cp = sd->cbuf; + + for(i=0; ielements; i++) { + if(rrpl->element[i]->type != REDIS_REPLY_STRING) { + LM_ERR("invalid type for hmap[%.*s] key pos[%d]\n", + rkey.len, rkey.s, i); + freeReplyObject(rrpl); + return -1; + } + skey.s = rrpl->element[i]->str; + skey.len = rrpl->element[i]->len; + i++; + if(rrpl->element[i]==NULL) { + continue; + } + sval.s = NULL; + switch(rrpl->element[i]->type) { + case REDIS_REPLY_STRING: + LM_DBG("r[%d]: s[%.*s]\n", i, rrpl->element[i]->len, + rrpl->element[i]->str); + sval.s = rrpl->element[i]->str; + sval.len = rrpl->element[i]->len; + break; + case REDIS_REPLY_INTEGER: + LM_DBG("r[%d]: n[%lld]\n", i, rrpl->element[i]->integer); + break; + default: + LM_WARN("unexpected type [%d] at pos [%d]\n", + rrpl->element[i]->type, i); + } + if(sval.s==NULL) { + continue; + } + if(skey.len==td_key_rectime.len + && strncmp(skey.s, td_key_rectime.s, skey.len)==0) { + /* skip - not needed */ + } else if(skey.len==td_key_a_callid.len + && strncmp(skey.s, td_key_a_callid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_callid); + } else if(skey.len==td_key_a_uuid.len + && strncmp(skey.s, td_key_a_uuid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_uuid); + } else if(skey.len==td_key_b_uuid.len + && strncmp(skey.s, td_key_b_uuid.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_uuid); + } else if(skey.len==td_key_a_contact.len + && strncmp(skey.s, td_key_a_contact.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_contact); + } else if(skey.len==td_key_b_contact.len + && strncmp(skey.s, td_key_b_contact.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_contact); + } else if(skey.len==td_key_as_contact.len + && strncmp(skey.s, td_key_as_contact.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->as_contact); + } else if(skey.len==td_key_bs_contact.len + && strncmp(skey.s, td_key_bs_contact.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->bs_contact); + } else if(skey.len==td_key_a_tag.len + && strncmp(skey.s, td_key_a_tag.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_tag); + } else if(skey.len==td_key_b_tag.len + && strncmp(skey.s, td_key_b_tag.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_tag); + } else if(skey.len==td_key_a_rr.len + && strncmp(skey.s, td_key_a_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_rr); + } else if(skey.len==td_key_b_rr.len + && strncmp(skey.s, td_key_b_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_rr); + } else if(skey.len==td_key_s_rr.len + && strncmp(skey.s, td_key_s_rr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_rr); + } else if(skey.len==td_key_iflags.len + && strncmp(skey.s, td_key_iflags.s, skey.len)==0) { + /* skip - not needed */ + } else if(skey.len==td_key_a_uri.len + && strncmp(skey.s, td_key_a_uri.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_uri); + } else if(skey.len==td_key_b_uri.len + && strncmp(skey.s, td_key_b_uri.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_uri); + } else if(skey.len==td_key_r_uri.len + && strncmp(skey.s, td_key_r_uri.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->r_uri); + } else if(skey.len==td_key_a_srcaddr.len + && strncmp(skey.s, td_key_a_srcaddr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->a_srcaddr); + } else if(skey.len==td_key_b_srcaddr.len + && strncmp(skey.s, td_key_b_srcaddr.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->b_srcaddr); + } else if(skey.len==td_key_s_method.len + && strncmp(skey.s, td_key_s_method.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_method); + } else if(skey.len==td_key_s_cseq.len + && strncmp(skey.s, td_key_s_cseq.s, skey.len)==0) { + TPS_REDIS_DATA_APPEND(sd, &skey, &sval, &sd->s_cseq); + } else { + LM_WARN("unknow key[%.*s]\n", skey.len, skey.s); + } + } + + freeReplyObject(rrpl); + return 0; + +error: + if(rrpl) freeReplyObject(rrpl); + return -1; +} + +/** + * + */ +int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) +{ + char* argv[TPS_REDIS_NR_KEYS]; + size_t argvlen[TPS_REDIS_NR_KEYS]; + int argc = 0; + str rcmd = str_init("HMSET"); + str rkey = STR_NULL; + char *rp; + str rval = STR_NULL; + redisc_server_t *rsrv = NULL; + redisReply *rrpl = NULL; + int32_t liflags; + + if(sd->a_uuid.len<=0 && sd->b_uuid.len<=0) { + LM_INFO("no uuid for this message\n"); + return -1; + } + + rsrv = _tps_redis_api.get_server(&_topos_redis_serverid); + if(rsrv==NULL) { + LM_ERR("cannot find redis server [%.*s]\n", + _topos_redis_serverid.len, _topos_redis_serverid.s); + return -1; + } + + memset(argv, 0, TPS_REDIS_NR_KEYS * sizeof(char*)); + memset(argvlen, 0, TPS_REDIS_NR_KEYS * sizeof(size_t)); + argc = 0; + + rp = _tps_redis_cbuf; + memcpy(rp, _tps_redis_dprefix.s, _tps_redis_dprefix.len); + + if(sd->a_uuid.len>0) { + memcpy(rp + _tps_redis_dprefix.len, + sd->a_uuid.s, sd->a_uuid.len); + if(sd->a_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+sd->a_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+sd->a_uuid.len; + rp += _tps_redis_dprefix.len+sd->a_uuid.len+1; + } else { + memcpy(rp + _tps_redis_dprefix.len, + sd->b_uuid.s, sd->b_uuid.len); + if(sd->b_uuid.s[0]=='b') { + rp[_tps_redis_dprefix.len] = 'a'; + } + rp[_tps_redis_dprefix.len+sd->b_uuid.len] = '\0'; + rkey.s = rp; + rkey.len = _tps_redis_dprefix.len+sd->b_uuid.len; + rp += _tps_redis_dprefix.len+sd->b_uuid.len+1; + } + + argv[argc] = rcmd.s; + argvlen[argc] = rcmd.len; + argc++; + + argv[argc] = rkey.s; + argvlen[argc] = rkey.len; + argc++; + + TPS_REDIS_SET_ARGS(&md->b_contact, argc, &td_key_b_contact, argv, argvlen); + + if(msg->first_line.type==SIP_REPLY) { + if(sd->b_tag.len<=0 + && msg->first_line.u.reply.statuscode>=200 + && msg->first_line.u.reply.statuscode<300) { + + if((sd->iflags&TPS_IFLAG_DLGON) == 0) { + TPS_REDIS_SET_ARGS(&md->b_rr, argc, &td_key_b_rr, argv, argvlen); + } + + TPS_REDIS_SET_ARGS(&md->b_tag, argc, &td_key_b_tag, argv, argvlen); + + liflags = sd->iflags|TPS_IFLAG_DLGON; + TPS_REDIS_SET_ARGN(liflags, rp, &rval, argc, &td_key_iflags, + argv, argvlen); + } + } + + rrpl = _tps_redis_api.exec_argv(rsrv, argc, (const char **)argv, argvlen); + if(rrpl==NULL) { + LM_ERR("failed to execute redis command\n"); + if(rsrv->ctxRedis->err) { + LM_ERR("redis error: %s\n", rsrv->ctxRedis->errstr); + } + return -1; + } + LM_DBG("updated dialog record for [%.*s] with argc %d\n", + rkey.len, rkey.s, argc); + freeReplyObject(rrpl); + + return 0; +} + +/** + * + */ +int tps_redis_end_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd) +{ + return 0; +} diff --git a/src/modules/topos_redis/topos_redis_storage.h b/src/modules/topos_redis/topos_redis_storage.h new file mode 100644 index 00000000000..cdeb56f43e5 --- /dev/null +++ b/src/modules/topos_redis/topos_redis_storage.h @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2017 kamailio.org + * Copyright (C) 2017 flowroute.com + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/*! + * \file + * \brief Kamailio topos :: + * \ingroup topos + * Module: \ref topos + */ + +#ifndef _TOPOS_REDIS_STORAGE_H_ +#define _TOPOS_REDIS_STORAGE_H_ + +#include "../topos/api.h" + +int tps_redis_insert_dialog(tps_data_t *td); +int tps_redis_clean_dialogs(void); +int tps_redis_insert_branch(tps_data_t *td); +int tps_redis_clean_branches(void); +int tps_redis_load_branch(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd); +int tps_redis_load_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd); +int tps_redis_update_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd); +int tps_redis_end_dialog(sip_msg_t *msg, tps_data_t *md, tps_data_t *sd); + +#endif