diff --git a/src/modules/siprepo/Makefile b/src/modules/siprepo/Makefile new file mode 100644 index 00000000000..68eac6436f5 --- /dev/null +++ b/src/modules/siprepo/Makefile @@ -0,0 +1,9 @@ +# +# WARNING: do not run this directly, it should be run by the main Makefile + +include ../../Makefile.defs +auto_gen= +NAME=siprepo.so +LIBS= + +include ../../Makefile.modules diff --git a/src/modules/siprepo/README b/src/modules/siprepo/README new file mode 100644 index 00000000000..fde17e281af --- /dev/null +++ b/src/modules/siprepo/README @@ -0,0 +1,239 @@ +SIPREPO Module + +Daniel-Constantin Mierla + + + +Edited by + +Daniel-Constantin Mierla + + + + Copyright © 2022 asipto.com + __________________________________________________________________ + + Table of Contents + + 1. Admin Guide + + 1. Overview + 2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + + 3. Parameters + + 3.1. hash_size (int) + 3.2. expire (int) + 3.3. timer_interval (int) + 3.4. timer_procs (int) + + 4. Functions + + 4.1. sr_msg_push(msgid) + 4.2. sr_msg_pull(callid, msgid, rname) + 4.3. sr_msg_async_pull(callid, msgid, gname, rname) + 4.4. sr_msg_rm(callid, msgid) + 4.5. sr_msg_check() + + List of Examples + + 1.1. hash_size usage + 1.2. expire usage + 1.3. timer_interval usage + 1.4. timer_procs usage + 1.5. sr_msg_push() usage + 1.6. sr_msg_pull() usage + 1.7. sr_msg_async_pull() usage + 1.8. sr_msg_rm() usage + 1.9. sr_msg_checkj() usage + +Chapter 1. Admin Guide + + Table of Contents + + 1. Overview + 2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + + 3. Parameters + + 3.1. hash_size (int) + 3.2. expire (int) + 3.3. timer_interval (int) + 3.4. timer_procs (int) + + 4. Functions + + 4.1. sr_msg_push(msgid) + 4.2. sr_msg_pull(callid, msgid, rname) + 4.3. sr_msg_async_pull(callid, msgid, gname, rname) + 4.4. sr_msg_rm(callid, msgid) + 4.5. sr_msg_check() + +1. Overview + + This module can store and fetch SIP message content in an in-memory + hash table. + +2. Dependencies + + 2.1. Kamailio Modules + 2.2. External Libraries or Applications + +2.1. Kamailio Modules + + The following modules must be loaded before this module: + * None. + +2.2. External Libraries or Applications + + The following libraries or applications must be installed before + running Kamailio with this module loaded: + * None + +3. Parameters + + 3.1. hash_size (int) + 3.2. expire (int) + 3.3. timer_interval (int) + 3.4. timer_procs (int) + +3.1. hash_size (int) + + Number of slots in the hash table. It should be power of 2. + + Default value: 256. + + Example 1.1. hash_size usage +... +modparam("siprepo", "hash_size", 1024) +... + +3.2. expire (int) + + Number of seconds until the stored message content expires. + + Default value: 180. + + Example 1.2. expire usage +... +modparam("siprepo", "expire", 240) +... + +3.3. timer_interval (int) + + Number of seconds to run the timer routine. + + Default value: 10. + + Example 1.3. timer_interval usage +... +modparam("siprepo", "timer_interval", 5) +... + +3.4. timer_procs (int) + + Number of timer processes. + + Default value: 1. + + Example 1.4. timer_procs usage +... +modparam("siprepo", "timer_procs", 2) +... + +4. Functions + + 4.1. sr_msg_push(msgid) + 4.2. sr_msg_pull(callid, msgid, rname) + 4.3. sr_msg_async_pull(callid, msgid, gname, rname) + 4.4. sr_msg_rm(callid, msgid) + 4.5. sr_msg_check() + +4.1. sr_msg_push(msgid) + + Push the message content to hash table and associate it with `msgid`. + The Call-Id and msgid are needed to pull the message. + + This function can be used from ANY_ROUTE. + + Example 1.5. sr_msg_push() usage +... +request_route { + ... + $var(msgid) = $sruid; + if(sr_msg_push("$var(msgid)")) { + } + ... +} +... + +4.2. sr_msg_pull(callid, msgid, rname) + + Pull the message content and execute the route block 'rname'. + + This function can be used from ANY_ROUTE. + + Example 1.6. sr_msg_pull() usage +... +request_route { + ... + if(sr_msg_pull("$var(callid)", "$var(msgid)", "REPOPULL")) { + } + ... +} +... + +4.3. sr_msg_async_pull(callid, msgid, gname, rname) + + Pull the message content and execute the route block 'rname' via async + group 'gname'. + + This function can be used from ANY_ROUTE. + + Example 1.7. sr_msg_async_pull() usage +... +request_route { + ... + if(sr_msg_async_pull("$var(callid)", "$var(msgid)", "WG01", "REPOPULL")) { + } + ... +} +... + +4.4. sr_msg_rm(callid, msgid) + + Remove the message content. + + This function can be used from ANY_ROUTE. + + Example 1.8. sr_msg_rm() usage +... +request_route { + ... + if(sr_msg_rm("$var(callid)", "$var(msgid)")) { + } + ... +} +... + +4.5. sr_msg_check() + + Check if the message is stored. + + This function can be used from ANY_ROUTE. + + Example 1.9. sr_msg_checkj() usage +... +request_route { + ... + if(sr_msg_check()) { + } + ... +} +... diff --git a/src/modules/siprepo/doc/Makefile b/src/modules/siprepo/doc/Makefile new file mode 100644 index 00000000000..4c63c2f89f1 --- /dev/null +++ b/src/modules/siprepo/doc/Makefile @@ -0,0 +1,4 @@ +docs = siprepo.xml + +docbook_dir = ../../../../doc/docbook +include $(docbook_dir)/Makefile.module diff --git a/src/modules/siprepo/doc/siprepo.xml b/src/modules/siprepo/doc/siprepo.xml new file mode 100644 index 00000000000..6cee4ceb1d2 --- /dev/null +++ b/src/modules/siprepo/doc/siprepo.xml @@ -0,0 +1,36 @@ + + + +%docentities; + +]> + + + + SIPREPO Module + kamailio.org + + + Daniel-Constantin + Mierla + miconda@gmail.com + + + Daniel-Constantin + Mierla + miconda@gmail.com + + + + 2022 + asipto.com + + + + + + + diff --git a/src/modules/siprepo/doc/siprepo_admin.xml b/src/modules/siprepo/doc/siprepo_admin.xml new file mode 100644 index 00000000000..306eaf701eb --- /dev/null +++ b/src/modules/siprepo/doc/siprepo_admin.xml @@ -0,0 +1,263 @@ + + + +%docentities; + +]> + + + + + &adminguide; + +
+ Overview + + This module can store and fetch SIP message content in an in-memory + hash table. + +
+ +
+ Dependencies +
+ &kamailio; Modules + + The following modules must be loaded before this module: + + + + None. + + + + +
+
+ External Libraries or Applications + + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + + + + None + + + + +
+
+ +
+ Parameters +
+ + <function moreinfo="none">hash_size (int)</function> + + + Number of slots in the hash table. It should be power of 2. + + + Default value: 256. + + + <function>hash_size</function> usage + +... +modparam("siprepo", "hash_size", 1024) +... + + +
+
+ + <function moreinfo="none">expire (int)</function> + + + Number of seconds until the stored message content expires. + + + Default value: 180. + + + <function>expire</function> usage + +... +modparam("siprepo", "expire", 240) +... + + +
+
+ + <function moreinfo="none">timer_interval (int)</function> + + + Number of seconds to run the timer routine. + + + Default value: 10. + + + <function>timer_interval</function> usage + +... +modparam("siprepo", "timer_interval", 5) +... + + +
+
+ + <function moreinfo="none">timer_procs (int)</function> + + + Number of timer processes. + + + Default value: 1. + + + <function>timer_procs</function> usage + +... +modparam("siprepo", "timer_procs", 2) +... + + +
+ +
+ +
+ Functions +
+ + <function moreinfo="none">sr_msg_push(msgid)</function> + + + Push the message content to hash table and associate it with + `msgid`. The Call-Id and msgid are needed to pull the message. + + + This function can be used from ANY_ROUTE. + + + <function>sr_msg_push()</function> usage + +... +request_route { + ... + $var(msgid) = $sruid; + if(sr_msg_push("$var(msgid)")) { + } + ... +} +... + + +
+
+ + <function moreinfo="none">sr_msg_pull(callid, msgid, rname)</function> + + + Pull the message content and execute the route block 'rname'. + + + This function can be used from ANY_ROUTE. + + + <function>sr_msg_pull()</function> usage + +... +request_route { + ... + if(sr_msg_pull("$var(callid)", "$var(msgid)", "REPOPULL")) { + } + ... +} +... + + +
+
+ + <function moreinfo="none">sr_msg_async_pull(callid, msgid, gname, rname)</function> + + + Pull the message content and execute the route block 'rname' via + async group 'gname'. + + + This function can be used from ANY_ROUTE. + + + <function>sr_msg_async_pull()</function> usage + +... +request_route { + ... + if(sr_msg_async_pull("$var(callid)", "$var(msgid)", "WG01", "REPOPULL")) { + } + ... +} +... + + +
+
+ + <function moreinfo="none">sr_msg_rm(callid, msgid)</function> + + + Remove the message content. + + + This function can be used from ANY_ROUTE. + + + <function>sr_msg_rm()</function> usage + +... +request_route { + ... + if(sr_msg_rm("$var(callid)", "$var(msgid)")) { + } + ... +} +... + + +
+
+ + <function moreinfo="none">sr_msg_check()</function> + + + Check if the message is stored. + + + This function can be used from ANY_ROUTE. + + + <function>sr_msg_checkj()</function> usage + +... +request_route { + ... + if(sr_msg_check()) { + } + ... +} +... + + +
+ +
+
diff --git a/src/modules/siprepo/siprepo_data.c b/src/modules/siprepo/siprepo_data.c new file mode 100644 index 00000000000..9010eceb6d4 --- /dev/null +++ b/src/modules/siprepo/siprepo_data.c @@ -0,0 +1,344 @@ +/** + * Copyright (C) 2022 Daniel-Constantin Mierla (asipto.com) + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include +#include +#include + +#include "../../core/dprint.h" +#include "../../core/mem/shm_mem.h" +#include "../../core/locking.h" +#include "../../core/hashes.h" +#include "../../core/config.h" +#include "../../core/parser/parse_via.h" +#include "../../core/parser/parse_from.h" +#include "../../core/route.h" +#include "../../core/trim.h" +#include "../../core/ut.h" +#include "../../core/receive.h" +#include "../../core/route.h" +#include "../../core/kemi.h" + +#include "siprepo_data.h" + + +static siprepo_slot_t *_siprepo_table = NULL; +extern int _siprepo_table_size; + +/** + * + */ +int siprepo_table_init(void) +{ + int n; + + _siprepo_table = (siprepo_slot_t*)shm_malloc(_siprepo_table_size + * sizeof(siprepo_slot_t)); + if(_siprepo_table == NULL) { + SHM_MEM_ERROR; + return -1; + } + memset(_siprepo_table, 0, _siprepo_table_size * sizeof(siprepo_slot_t)); + for(n=0; n<_siprepo_table_size; n++) { + if(lock_init(&_siprepo_table[n].lock)==NULL) { + LM_ERR("cannot init the lock %d\n", n); + n--; + while(n>=0) { + lock_destroy(&_siprepo_table[n].lock); + n--; + } + shm_free(_siprepo_table); + _siprepo_table = 0; + return -1; + } + } + return 0; +} + +/** + * + */ +siprepo_msg_t *siprepo_msg_find(sip_msg_t *msg, str *callid, str *msgid, int lmode) +{ + unsigned int hid; + unsigned int slotid; + siprepo_msg_t *it; + + hid = get_hash1_raw(callid->s, callid->len); + slotid = hid % _siprepo_table_size; + + lock_get(&_siprepo_table[slotid].lock); + for(it=_siprepo_table[slotid].plist; it!=NULL; it=it->next) { + if(hid==it->hid && callid->len==it->callid.len + && msgid->len==it->msgid.len + && memcmp(callid->s, it->callid.s, callid->len)==0 + && memcmp(msgid->s, it->msgid.s, msgid->len)==0) { + if(lmode==0) { + lock_release(&_siprepo_table[slotid].lock); + } + return it; + } + } + if(lmode==0) { + lock_release(&_siprepo_table[slotid].lock); + } + + return 0; +} + +/** + * + */ +int siprepo_msg_set(sip_msg_t *msg, str *msgid) +{ + unsigned int hid; + unsigned int slotid; + size_t dsize; + struct via_param *vbr; + str scallid; + siprepo_msg_t *it = NULL; + + if(_siprepo_table == NULL) { + LM_ERR("hash table not initialized\n"); + return -1; + } + + if(parse_headers(msg, HDR_FROM_F|HDR_VIA1_F|HDR_CALLID_F|HDR_CSEQ_F, 0)<0) { + LM_ERR("failed to parse required headers\n"); + return -1; + } + if (msg->callid==NULL || msg->callid->body.s==NULL) { + LM_ERR("failed to parse callid headers\n"); + return -1; + } + if(msg->cseq==NULL || msg->cseq->parsed==NULL) { + LM_ERR("failed to parse cseq headers\n"); + return -1; + } + if(get_cseq(msg)->method_id==METHOD_ACK + || get_cseq(msg)->method_id==METHOD_CANCEL) { + LM_DBG("no pre-transaction management for ACK or CANCEL\n"); + return -1; + } + if (msg->via1==0) { + LM_ERR("failed to get Via header\n"); + return -1; + } + if (parse_from_header(msg)<0 || get_from(msg)->tag_value.len==0) { + LM_ERR("failed to get From header\n"); + return -1; + } + + scallid = msg->callid->body; + trim(&scallid); + hid = get_hash1_raw(scallid.s, scallid.len); + slotid = hid % _siprepo_table_size; + + if(siprepo_msg_find(msg, &scallid, msgid, 1)!=NULL) { + LM_DBG("msg [%.*s] found in repo\n", msgid->len, msgid->s); + lock_release(&_siprepo_table[slotid].lock); + return 1; + } + + dsize = ROUND_POINTER(sizeof(siprepo_msg_t)) + + ROUND_POINTER(msgid->len + 1) + ROUND_POINTER(msg->len + 1); + + it = (siprepo_msg_t*)shm_malloc(dsize); + if(it == NULL) { + SHM_MEM_ERROR_FMT("new repo structure\n"); + lock_release(&_siprepo_table[slotid].lock); + return -1; + } + memset(it, 0, dsize); + + it->dbuf.s = (char*)it + ROUND_POINTER(sizeof(siprepo_msg_t)); + it->callid.len = scallid.len; + it->callid.s = translate_pointer(it->dbuf.s, msg->buf, it->callid.s); + + it->cseqmet = get_cseq(msg)->method; + trim(&it->cseqmet); + it->cseqnum = get_cseq(msg)->number; + trim(&it->cseqnum); + it->ftag = get_from(msg)->tag_value; + trim(&it->ftag); + + vbr = msg->via1->branch; + if(likely(vbr!=NULL)) { + it->vbranch = vbr->value; + trim(&it->vbranch); + } + + it->hid = get_hash1_raw(it->callid.s, it->callid.len); + slotid = it->hid % _siprepo_table_size; + + it->mtype = msg->first_line.type; + it->itime = time(NULL); + + _siprepo_table[slotid].plist->prev = it; + it->next = _siprepo_table[slotid].plist; + _siprepo_table[slotid].plist = it; + + lock_release(&_siprepo_table[slotid].lock); + + return 0; +} + +/** + * + */ +int siprepo_msg_rm(sip_msg_t *msg, str *callid, str *msgid) +{ + unsigned int slotid; + siprepo_msg_t *it = NULL; + + it = siprepo_msg_find(msg, callid, msgid, 1); + if(it==NULL) { + LM_DBG("msg [%.*s] not found in repo\n", msgid->len, msgid->s); + slotid = get_hash1_raw(callid->s, callid->len) % _siprepo_table_size; + lock_release(&_siprepo_table[slotid].lock); + return 1; + } + slotid = it->hid % _siprepo_table_size; + if(it->prev==NULL) { + _siprepo_table[slotid].plist = it->next; + if(_siprepo_table[slotid].plist) { + _siprepo_table[slotid].plist->prev = NULL; + } + } else { + it->prev->next = it->next; + } + if(it->next!=NULL) { + it->next->prev = it->prev; + } + lock_release(&_siprepo_table[slotid].lock); + shm_free(it); + + return 0; +} + +/** + * + */ +int siprepo_msg_check(sip_msg_t *msg) +{ + unsigned int hid; + unsigned int slotid; + str scallid; + siprepo_msg_t *it = NULL; + + if(_siprepo_table == NULL) { + LM_ERR("hash table not initialized\n"); + return -1; + } + + if(parse_headers(msg, HDR_FROM_F|HDR_VIA1_F|HDR_CALLID_F|HDR_CSEQ_F, 0)<0) { + LM_ERR("failed to parse required headers\n"); + return -1; + } + if (msg->callid==NULL || msg->callid->body.s==NULL) { + LM_ERR("failed to parse callid headers\n"); + return -1; + } + if(msg->cseq==NULL || msg->cseq->parsed==NULL) { + LM_ERR("failed to parse cseq headers\n"); + return -1; + } + if(get_cseq(msg)->method_id==METHOD_ACK + || get_cseq(msg)->method_id==METHOD_CANCEL) { + LM_DBG("no pre-transaction management for ACK or CANCEL\n"); + return -1; + } + if (msg->via1==0) { + LM_ERR("failed to get Via header\n"); + return -1; + } + if (parse_from_header(msg)<0 || get_from(msg)->tag_value.len==0) { + LM_ERR("failed to get From header\n"); + return -1; + } + + scallid = msg->callid->body; + trim(&scallid); + hid = get_hash1_raw(scallid.s, scallid.len); + slotid = hid % _siprepo_table_size; + + lock_get(&_siprepo_table[slotid].lock); + for(it=_siprepo_table[slotid].plist; it!=NULL; it=it->next) { + if(hid==it->hid && scallid.len==it->callid.len + && memcmp(scallid.s, it->callid.s, scallid.len)==0) { + lock_release(&_siprepo_table[slotid].lock); + return 1; + } + } + lock_release(&_siprepo_table[slotid].lock); + + return 0; +} + +/** + * + */ +int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname) +{ + unsigned int slotid; + sip_msg_t lmsg; + siprepo_msg_t *it = NULL; + int rtype; + int rtbk; + int rtno; + sr_kemi_eng_t *keng = NULL; + str evname = str_init("siprepo:msg"); + + it = siprepo_msg_find(msg, callid, msgid, 1); + if(it==NULL) { + LM_DBG("msg [%.*s] not found in repo\n", msgid->len, msgid->s); + slotid = get_hash1_raw(callid->s, callid->len) % _siprepo_table_size; + lock_release(&_siprepo_table[slotid].lock); + return 1; + } + memset(&lmsg, 0, sizeof(sip_msg_t)); + slotid = it->hid % _siprepo_table_size; + + if(it->mtype==SIP_REQUEST) { + rtype = REQUEST_ROUTE; + } else { + rtype = CORE_ONREPLY_ROUTE; + } + rtbk = get_route_type(); + set_route_type(REQUEST_ROUTE); + keng = sr_kemi_eng_get(); + if(keng==NULL) { + rtno = route_lookup(&main_rt, rname->s); + if(rtno>=0 && main_rt.rlist[rtno]!=NULL) { + run_top_route(main_rt.rlist[rtno], &lmsg, 0); + } + } else { + if(sr_kemi_route(keng, &lmsg, rtype, rname, &evname)<0) { + LM_ERR("error running route kemi callback [%.*s]\n", + rname->len, rname->s); + } + } + set_route_type(rtbk); + ksr_msg_env_reset(); + + return 0; +} diff --git a/src/modules/siprepo/siprepo_data.h b/src/modules/siprepo/siprepo_data.h new file mode 100644 index 00000000000..f7c01fbf28c --- /dev/null +++ b/src/modules/siprepo/siprepo_data.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2022 Daniel-Constantin Mierla (asipto.com) + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifndef _SIPREPO_DATA_ +#define _SIPREPO_DATA_ + +#include + +#include "../../core/parser/msg_parser.h" + +typedef struct siprepo_msg { + unsigned int hid; + int mtype; + str msgid; + str callid; + str ftag; + str cseqnum; + str cseqmet; + str vbranch; + str dbuf; + unsigned int cseqmetid; + int pid; + receive_info_t rcv; + time_t itime; + struct siprepo_msg *next; + struct siprepo_msg *prev; +} siprepo_msg_t; + +typedef struct siprepo_slot { + siprepo_msg_t *plist; + gen_lock_t lock; +} siprepo_slot_t; + +int siprepo_table_init(void); +int siprepo_msg_set(sip_msg_t *msg, str *msgid); +int siprepo_msg_rm(sip_msg_t *msg, str *callid, str *msgid); +int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname); +int siprepo_msg_check(sip_msg_t *msg); + +#endif diff --git a/src/modules/siprepo/siprepo_mod.c b/src/modules/siprepo/siprepo_mod.c new file mode 100644 index 00000000000..129317b853d --- /dev/null +++ b/src/modules/siprepo/siprepo_mod.c @@ -0,0 +1,340 @@ +/** + * Copyright (C) 2022 Daniel-Constantin Mierla (asipto.com) + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include +#include +#include +#include + +#include "../../core/sr_module.h" +#include "../../core/dprint.h" +#include "../../core/ut.h" +#include "../../core/fmsg.h" +#include "../../core/receive.h" +#include "../../core/mod_fix.h" +#include "../../core/async_task.h" +#include "../../core/kemi.h" + +#include "siprepo_data.h" + +MODULE_VERSION + +int _siprepo_table_size = 256; +int _siprepo_expire = 180; +int _siprepo_timer_interval = 10; +int _siprepo_timer_procs = 1; + +static int mod_init(void); +static int child_init(int); +static void mod_destroy(void); + +static int w_sr_msg_push(sip_msg_t *msg, char *pmsgid, char *p2); +static int w_sr_msg_pull(sip_msg_t *msg, char *pcallid, char *pmsgid, char *prname); +static int w_sr_msg_async_pull(sip_msg_t *msg, char *pcallid, char *pmsgid, + char *pgname, char *prname); +static int w_sr_msg_rm(sip_msg_t *msg, char *pcallid, char *pmsgid); +static int w_sr_msg_check(sip_msg_t *msg, char *p1, char *p2); + +/* clang-format off */ +typedef struct sworker_task_param { + char *buf; + int len; + receive_info_t rcv; + str xdata; +} sworker_task_param_t; + +static cmd_export_t cmds[]={ + {"sr_msg_push", (cmd_function)w_sr_msg_push, 1, fixup_spve_null, + fixup_free_spve_null, REQUEST_ROUTE|CORE_ONREPLY_ROUTE}, + {"sr_msg_pull", (cmd_function)w_sr_msg_pull, 2, fixup_spve_spve, + fixup_free_spve_spve, REQUEST_ROUTE|CORE_ONREPLY_ROUTE}, + {"sr_msg_async_pull", (cmd_function)w_sr_msg_async_pull, 4, fixup_spve_all, + fixup_free_spve_all, ANY_ROUTE}, + {"sr_msg_rm", (cmd_function)w_sr_msg_rm, 2, fixup_spve_spve, + fixup_free_spve_spve, REQUEST_ROUTE|CORE_ONREPLY_ROUTE}, + {"sr_msg_check", (cmd_function)w_sr_msg_check, 0, 0, + 0, ANY_ROUTE}, + {0, 0, 0, 0, 0, 0} +}; + +static param_export_t params[]={ + {"hash_size", PARAM_INT, &_siprepo_table_size}, + {"expire", PARAM_INT, &_siprepo_expire}, + {"timer_interval", PARAM_INT, &_siprepo_timer_interval}, + {"timer_procs", PARAM_INT, &_siprepo_timer_procs}, + {0, 0, 0} +}; + +struct module_exports exports = { + "siprepo", /* module name */ + DEFAULT_DLFLAGS, /* dlopen flags */ + cmds, /* exported functions */ + params, /* exported parameters */ + 0, /* exported RPC methods */ + 0, /* exported pseudo-variables */ + 0, /* response function */ + mod_init, /* module initialization function */ + child_init, /* per child init function */ + mod_destroy /* destroy function */ +}; +/* clang-format on */ + + +/** + * init module function + */ +static int mod_init(void) +{ + if(siprepo_table_init()<0) { + LM_ERR("failed to initialize hash table\n"); + return -1; + } + return 0; +} + +/** + * @brief Initialize async module children + */ +static int child_init(int rank) +{ + return 0; +} + +/** + * destroy module function + */ +static void mod_destroy(void) +{ +} + +/** + * + */ +static int ki_sr_msg_push(sip_msg_t *msg, str *msgid) +{ + int ret; + + ret = siprepo_msg_set(msg, msgid); + + if(ret<0) { + return ret; + } + return 1; +} + +/** + * + */ +static int w_sr_msg_push(sip_msg_t *msg, char *pmsgid, char *p2) +{ + str msgid = STR_NULL; + + if(fixup_get_svalue(msg, (gparam_t *)pmsgid, &msgid) != 0) { + LM_ERR("cannot get msgid value\n"); + return -1; + } + + return ki_sr_msg_push(msg, &msgid); +} + +/** + * + */ +static int ki_sr_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname) +{ + int ret; + + ret = siprepo_msg_pull(msg, callid, msgid, rname); + + if(ret<0) { + return ret; + } + return 1; +} + +/** + * + */ +static int w_sr_msg_pull(sip_msg_t *msg, char *pcallid, char *pmsgid, char *prname) +{ + str callid = STR_NULL; + str msgid = STR_NULL; + str rname = STR_NULL; + + if(fixup_get_svalue(msg, (gparam_t *)pcallid, &callid) != 0) { + LM_ERR("cannot get callid value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)pmsgid, &msgid) != 0) { + LM_ERR("cannot get msgid value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)prname, &rname) != 0) { + LM_ERR("cannot get route name\n"); + return -1; + } + + return ki_sr_msg_pull(msg, &callid, &msgid, &rname); +} + +/** + * + */ +static int ki_sr_msg_async_pull(sip_msg_t *msg, str *callid, str *msgid, + str *gname, str *rname) +{ + return 1; +} + +/** + * + */ +static int w_sr_msg_async_pull(sip_msg_t *msg, char *pcallid, char *pmsgid, + char *pgname, char *prname) +{ + str callid = STR_NULL; + str msgid = STR_NULL; + str gname = STR_NULL; + str rname = STR_NULL; + + if(fixup_get_svalue(msg, (gparam_t *)pcallid, &callid) != 0) { + LM_ERR("cannot get callid value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)pmsgid, &msgid) != 0) { + LM_ERR("cannot get msgid value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)pgname, &gname) != 0) { + LM_ERR("cannot get aync group name\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)prname, &rname) != 0) { + LM_ERR("cannot get route name\n"); + return -1; + } + + return ki_sr_msg_async_pull(msg, &callid, &msgid, &gname, &rname); +} + + +/** + * + */ +static int ki_sr_msg_rm(sip_msg_t *msg, str *callid, str *msgid) +{ + int ret; + + ret = siprepo_msg_rm(msg, callid, msgid); + + if(ret<0) { + return ret; + } + return 1; +} + +/** + * + */ +static int w_sr_msg_rm(sip_msg_t *msg, char *pcallid, char *pmsgid) +{ + str callid = STR_NULL; + str msgid = STR_NULL; + + if(fixup_get_svalue(msg, (gparam_t *)pcallid, &callid) != 0) { + LM_ERR("cannot get callid value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)pmsgid, &msgid) != 0) { + LM_ERR("cannot get msgid value\n"); + return -1; + } + + return ki_sr_msg_rm(msg, &callid, &msgid); +} + +/** + * + */ +static int ki_sr_msg_check(sip_msg_t *msg) +{ + int ret; + + ret = siprepo_msg_check(msg); + + if(ret<=0) { + return (ret-1); + } + return ret; +} + +/** + * + */ +static int w_sr_msg_check(sip_msg_t *msg, char *p1, char *p2) +{ + return ki_sr_msg_check(msg); +} + +/** + * + */ +/* clang-format off */ +static sr_kemi_t sr_kemi_sworker_exports[] = { + { str_init("siprepo"), str_init("sr_msg_push"), + SR_KEMIP_INT, ki_sr_msg_push, + { SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("siprepo"), str_init("sr_msg_pull"), + SR_KEMIP_INT, ki_sr_msg_pull, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("siprepo"), str_init("sr_msg_async_pull"), + SR_KEMIP_INT, ki_sr_msg_async_pull, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR, + SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("siprepo"), str_init("sr_msg_rm"), + SR_KEMIP_INT, ki_sr_msg_rm, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("siprepo"), str_init("sr_msg_check"), + SR_KEMIP_INT, ki_sr_msg_check, + { SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + + { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } } +}; +/* clang-format on */ + +/** + * + */ +int mod_register(char *path, int *dlflags, void *p1, void *p2) +{ + sr_kemi_modules_add(sr_kemi_sworker_exports); + return 0; +}