diff --git a/src/modules/siprepo/siprepo_data.c b/src/modules/siprepo/siprepo_data.c index 603d3744c11..f4e4d71a94f 100644 --- a/src/modules/siprepo/siprepo_data.c +++ b/src/modules/siprepo/siprepo_data.c @@ -38,6 +38,7 @@ #include "../../core/globals.h" #include "../../core/dset.h" #include "../../core/route.h" +#include "../../core/async_task.h" #include "../../core/kemi.h" #include "siprepo_data.h" @@ -47,6 +48,16 @@ static siprepo_slot_t *_siprepo_table = NULL; extern int _siprepo_table_size; extern int _siprepo_expire; +/* clang-format off */ +typedef struct siprepo_task_param { + str callid; + str msgid; + str rname; + int rmode; +} siprepo_task_param_t; +/* clang-format off */ + + /** * */ @@ -80,7 +91,7 @@ int siprepo_table_init(void) /** * */ -siprepo_msg_t *siprepo_msg_find(sip_msg_t *msg, str *callid, str *msgid, int lmode) +siprepo_msg_t *siprepo_msg_find(str *callid, str *msgid, int lmode) { unsigned int hid; unsigned int slotid; @@ -175,7 +186,7 @@ int siprepo_msg_set(sip_msg_t *msg, str *msgid, int rmode) hid = get_hash1_raw(scallid.s, scallid.len); slotid = hid % _siprepo_table_size; - if(siprepo_msg_find(msg, &scallid, msgid, 1)!=NULL) { + if(siprepo_msg_find(&scallid, msgid, 1)!=NULL) { LM_DBG("msg [%.*s] found in repo\n", msgid->len, msgid->s); lock_release(&_siprepo_table[slotid].lock); return 1; @@ -230,12 +241,12 @@ int siprepo_msg_set(sip_msg_t *msg, str *msgid, int rmode) /** * */ -int siprepo_msg_rm(sip_msg_t *msg, str *callid, str *msgid) +int siprepo_msg_rm(str *callid, str *msgid) { unsigned int slotid; siprepo_msg_t *it = NULL; - it = siprepo_msg_find(msg, callid, msgid, 1); + it = siprepo_msg_find(callid, msgid, 1); if(it==NULL) { LM_DBG("msg [%.*s] not found in repo\n", msgid->len, msgid->s); slotid = get_hash1_raw(callid->s, callid->len) % _siprepo_table_size; @@ -322,8 +333,7 @@ int siprepo_msg_check(sip_msg_t *msg) /** * */ -int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname, - int rmode) +int siprepo_msg_pull(str *callid, str *msgid, str *rname, int rmode) { unsigned int slotid; sip_msg_t lmsg; @@ -335,7 +345,7 @@ int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname, str evname = str_init("siprepo:msg"); char lbuf[BUF_SIZE]; - it = siprepo_msg_find(msg, callid, msgid, 1); + it = siprepo_msg_find(callid, msgid, 1); if(it==NULL) { LM_DBG("msg [%.*s] not found in repo\n", msgid->len, msgid->s); slotid = get_hash1_raw(callid->s, callid->len) % _siprepo_table_size; @@ -404,6 +414,94 @@ int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname, return 0; } +/** + * + */ +void siprepo_exec_task(void *param) +{ + siprepo_task_param_t *stp; + int ret; + + stp = (siprepo_task_param_t *)param; + + + LM_DBG("received task [%p] - callid [%.*s] msgid [%.*s]\n", stp, + stp->callid.len, stp->callid.s, stp->msgid.len, stp->msgid.s); + + ret = siprepo_msg_pull(&stp->callid, &stp->msgid, &stp->rname, stp->rmode); + + LM_DBG("execution return code: %d\n", ret); + shm_free(stp); + + return; +} + +/** + * + */ +int siprepo_send_task(str *gname, siprepo_task_param_t *stp) +{ + async_task_t *at = NULL; + int ret; + + at = (async_task_t *)shm_malloc(sizeof(async_task_t)); + if(at == NULL) { + LM_ERR("no more shm memory\n"); + return -1; + } + memset(at, 0, sizeof(async_task_t)); + at->exec = siprepo_exec_task; + at->param = stp; + + ret = async_task_group_push(gname, at); + if(ret < 0) { + shm_free(at); + return ret; + } + return 0; +} + +/** + * + */ +int siprepo_msg_async_pull(str *callid, str *msgid, str *gname, str *rname, + int rmode) +{ + size_t dsize; + siprepo_task_param_t *stp; + int ret; + + dsize = ROUND_POINTER(sizeof(siprepo_task_param_t)) + + ROUND_POINTER(callid->len + 1) + ROUND_POINTER(msgid->len + 1) + + ROUND_POINTER(rname->len + 1); + + stp = (siprepo_task_param_t*)shm_mallocxz(dsize); + if(stp == NULL) { + SHM_MEM_ERROR_FMT("new repo structure\n"); + return -1; + } + stp->callid.s = (char*)stp + ROUND_POINTER(sizeof(siprepo_task_param_t)); + memcpy(callid->s, stp->callid.s, callid->len); + stp->callid.len = callid->len; + + stp->msgid.s = stp->callid.s + ROUND_POINTER(callid->len + 1); + memcpy(msgid->s, stp->msgid.s, msgid->len); + stp->msgid.len = msgid->len; + + stp->rname.s = stp->msgid.s + ROUND_POINTER(msgid->len + 1); + memcpy(rname->s, stp->rname.s, rname->len); + stp->rname.len = rname->len; + + stp->rmode = rmode; + + ret = siprepo_send_task(gname, stp); + if(ret < 0) { + shm_free(stp); + return ret; + } + return 0; +} + /** * */ @@ -419,7 +517,7 @@ void siprepo_timer_exec(unsigned int ticks, int worker, void *param) lock_get(&_siprepo_table[i].lock); for(it=_siprepo_table[i].plist; it!=NULL; it=it->next) { if(it->itime+_siprepo_expire < tnow) { - siprepo_msg_unlink(it, slotid); + siprepo_msg_unlink(it, i); if(elist) { it->next = elist; elist = it; diff --git a/src/modules/siprepo/siprepo_data.h b/src/modules/siprepo/siprepo_data.h index 683348c8b0a..9301883b02c 100644 --- a/src/modules/siprepo/siprepo_data.h +++ b/src/modules/siprepo/siprepo_data.h @@ -56,9 +56,11 @@ typedef struct siprepo_slot { int siprepo_table_init(void); int siprepo_msg_set(sip_msg_t *msg, str *msgid, int rmode); -int siprepo_msg_rm(sip_msg_t *msg, str *callid, str *msgid); -int siprepo_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname, +int siprepo_msg_rm(str *callid, str *msgid); +int siprepo_msg_pull(str *callid, str *msgid, str *rname, int rmode); +int siprepo_msg_async_pull(str *callid, str *msgid, str *gname, + str *rname, int rmode); int siprepo_msg_check(sip_msg_t *msg); void siprepo_msg_timer(unsigned int ticks, int worker, void *param); diff --git a/src/modules/siprepo/siprepo_mod.c b/src/modules/siprepo/siprepo_mod.c index 35a97787431..940099622d2 100644 --- a/src/modules/siprepo/siprepo_mod.c +++ b/src/modules/siprepo/siprepo_mod.c @@ -189,7 +189,7 @@ static int ki_sr_msg_pull(sip_msg_t *msg, str *callid, str *msgid, str *rname, { int ret; - ret = siprepo_msg_pull(msg, callid, msgid, rname, rmode); + ret = siprepo_msg_pull(callid, msgid, rname, rmode); if(ret<0) { return ret; @@ -234,6 +234,13 @@ static int w_sr_msg_pull(sip_msg_t *msg, char *pcallid, char *pmsgid, char *prna static int ki_sr_msg_async_pull(sip_msg_t *msg, str *callid, str *msgid, str *gname, str *rname, int rmode) { + int ret; + + ret = siprepo_msg_async_pull(callid, msgid, gname, rname, rmode); + + if(ret<0) { + return ret; + } return 1; } @@ -281,7 +288,7 @@ static int ki_sr_msg_rm(sip_msg_t *msg, str *callid, str *msgid) { int ret; - ret = siprepo_msg_rm(msg, callid, msgid); + ret = siprepo_msg_rm(callid, msgid); if(ret<0) { return ret;