From cf739e4d8274b63790e992a82610736a010d580e Mon Sep 17 00:00:00 2001 From: Eloy Coto Date: Fri, 6 May 2016 14:42:34 +0100 Subject: [PATCH] evapi: Added evapi_unicast and evapi_async_unicast methods --- modules/evapi/doc/evapi_admin.xml | 45 ++++++++++- modules/evapi/evapi_dispatch.c | 26 ++++++- modules/evapi/evapi_dispatch.h | 1 + modules/evapi/evapi_mod.c | 125 ++++++++++++++++++++++++++++-- 4 files changed, 185 insertions(+), 12 deletions(-) diff --git a/modules/evapi/doc/evapi_admin.xml b/modules/evapi/doc/evapi_admin.xml index 7d060006792..085968e622a 100644 --- a/modules/evapi/doc/evapi_admin.xml +++ b/modules/evapi/doc/evapi_admin.xml @@ -10,9 +10,9 @@ - + &adminguide; - +
Overview @@ -215,7 +215,7 @@ evapi_async_relay("{ \"event\": \"suspend\",\n \"data\":" <function>evapi_multicast</function> usage ... -evapi_relay("{ \"event\": \"test\",\n \"data\": { \"fU\": \"$fU\" }\n}", "tagx"); +evapi_multicast("{ \"event\": \"test\",\n \"data\": { \"fU\": \"$fU\" }\n}", "tagx"); ... @@ -241,6 +241,45 @@ evapi_async_multicast("{ \"event\": \"suspend\",\n \"data\":"
+
+ + <function moreinfo="none">evapi_unicast(evdata, etag)</function> + + + Relay the event data given as parameter to the first connection that + match the tag provided by etag value. The etag can be a variable. For + more see evapi_relay() and evapi_set_tag(). + + + <function>evapi_unicast</function> usage + +... +evapi_unicast("{ \"event\": \"test\",\n \"data\": { \"fU\": \"$fU\" }\n}", "tagx"); +... + + +
+ +
+ + <function moreinfo="none">evapi_async_unicast(evdata, etag)</function> + + + Async relay the event data given as parameter to the first connection + that match the tag provided by etag value. The etag can be a variable. + For more see evapi_async_relay() and evapi_set_tag(). + + + <function>evapi_async_unicast</function> usage + +... +evapi_async_unicast("{ \"event\": \"suspend\",\n \"data\":" + " { \"index\": \"$T(id_index)\", \"label\": \"$T(id_label)\" }\n}", "tagx"); +... + + +
+
<function moreinfo="none">evapi_close()</function> diff --git a/modules/evapi/evapi_dispatch.c b/modules/evapi/evapi_dispatch.c index 36cd75dcc7b..1dfc578d2cf 100644 --- a/modules/evapi/evapi_dispatch.c +++ b/modules/evapi/evapi_dispatch.c @@ -68,6 +68,7 @@ typedef struct _evapi_env { typedef struct _evapi_msg { str data; str tag; + int unicast; } evapi_msg_t; #define EVAPI_MAX_CLIENTS 8 @@ -263,6 +264,9 @@ int evapi_dispatch_notify(evapi_msg_t *emsg) wlen, emsg->data.len, _evapi_clients[i].sock, i); } n++; + if (emsg->unicast){ + break; + } } } } @@ -638,7 +642,7 @@ int evapi_run_worker(int prank) /** * */ -int evapi_relay_multicast(str *evdata, str *ctag) +int _evapi_relay(str *evdata, str *ctag, int unicast) { #define EVAPI_RELAY_FORMAT "%d:%.*s," @@ -680,6 +684,10 @@ int evapi_relay_multicast(str *evdata, str *ctag) emsg->tag.len = ctag->len; } + if (unicast){ + emsg->unicast = unicast; + } + LM_DBG("sending [%p] [%.*s] (%d)\n", emsg, emsg->data.len, emsg->data.s, emsg->data.len); len = write(_evapi_notify_sockets[1], &emsg, sizeof(evapi_msg_t*)); if(len<=0) { @@ -694,7 +702,21 @@ int evapi_relay_multicast(str *evdata, str *ctag) */ int evapi_relay(str *evdata) { - return evapi_relay_multicast(evdata, NULL); + return _evapi_relay(evdata, NULL, 0); +} + +/** + * + */ +int evapi_relay_multicast(str *evdata, str *ctag){ + return _evapi_relay(evdata, ctag, 0); +} + +/** + * + */ +int evapi_relay_unicast(str *evdata, str *ctag){ + return _evapi_relay(evdata, ctag, 1); } #if 0 diff --git a/modules/evapi/evapi_dispatch.h b/modules/evapi/evapi_dispatch.h index 10af96b9dc3..96aa8086575 100644 --- a/modules/evapi/evapi_dispatch.h +++ b/modules/evapi/evapi_dispatch.h @@ -37,6 +37,7 @@ int evapi_run_worker(int prank); int evapi_relay(str *evdata); int evapi_relay_multicast(str *evdata, str *ctag); +int evapi_relay_unicast(str *evdata, str *ctag); void evapi_init_environment(int dformat); diff --git a/modules/evapi/evapi_mod.c b/modules/evapi/evapi_mod.c index 12686bde9b8..055c9a395b0 100644 --- a/modules/evapi/evapi_mod.c +++ b/modules/evapi/evapi_mod.c @@ -60,25 +60,31 @@ static int w_evapi_relay(sip_msg_t* msg, char* evdata, char* p2); static int w_evapi_async_relay(sip_msg_t* msg, char* evdata, char* p2); static int w_evapi_multicast(sip_msg_t* msg, char* evdata, char* ptag); static int w_evapi_async_multicast(sip_msg_t* msg, char* evdata, char* ptag); +static int w_evapi_unicast(sip_msg_t *msg, char *evdata, char *ptag); +static int w_evapi_async_unicast(sip_msg_t *msg, char *evdata, char *ptag); static int w_evapi_close(sip_msg_t* msg, char* p1, char* p2); static int w_evapi_set_tag(sip_msg_t* msg, char* ptag, char* p2); static int fixup_evapi_relay(void** param, int param_no); static int fixup_evapi_multicast(void** param, int param_no); static cmd_export_t cmds[]={ - {"evapi_relay", (cmd_function)w_evapi_relay, 1, fixup_evapi_relay, + {"evapi_relay", (cmd_function)w_evapi_relay, 1, fixup_evapi_relay, 0, ANY_ROUTE}, - {"evapi_async_relay", (cmd_function)w_evapi_async_relay, 1, fixup_evapi_relay, + {"evapi_async_relay", (cmd_function)w_evapi_async_relay, 1, fixup_evapi_relay, 0, REQUEST_ROUTE}, - {"evapi_multicast", (cmd_function)w_evapi_multicast, 1, fixup_evapi_multicast, + {"evapi_multicast", (cmd_function)w_evapi_multicast, 2, fixup_evapi_multicast, 0, ANY_ROUTE}, - {"evapi_async_multicast", (cmd_function)w_evapi_async_multicast, 1, fixup_evapi_multicast, + {"evapi_async_multicast", (cmd_function)w_evapi_async_multicast, 2, fixup_evapi_multicast, 0, REQUEST_ROUTE}, - {"evapi_close", (cmd_function)w_evapi_close, 0, NULL, + {"evapi_unicast", (cmd_function)w_evapi_unicast, 2, fixup_evapi_multicast, 0, ANY_ROUTE}, - {"evapi_close", (cmd_function)w_evapi_close, 1, NULL, + {"evapi_async_unicast", (cmd_function)w_evapi_async_unicast,2, fixup_evapi_multicast, + 0, REQUEST_ROUTE}, + {"evapi_close", (cmd_function)w_evapi_close, 0, NULL, + 0, ANY_ROUTE}, + {"evapi_close", (cmd_function)w_evapi_close, 1, NULL, 0, ANY_ROUTE}, - {"evapi_set_tag", (cmd_function)w_evapi_set_tag, 1, fixup_spve_null, + {"evapi_set_tag", (cmd_function)w_evapi_set_tag, 1, fixup_spve_null, 0, ANY_ROUTE}, {0, 0, 0, 0, 0, 0} }; @@ -414,6 +420,111 @@ static int w_evapi_async_multicast(sip_msg_t *msg, char *evdata, char *ptag) return 1; } + +/** + * + */ +static int w_evapi_unicast(sip_msg_t *msg, char *evdata, char *ptag) +{ + str sdata; + str stag; + + if(evdata==0) { + LM_ERR("invalid parameters\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t*)evdata, &sdata)!=0) { + LM_ERR("unable to get data\n"); + return -1; + } + if(sdata.s==NULL || sdata.len == 0) { + LM_ERR("invalid data parameter\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t*)ptag, &stag)!=0) { + LM_ERR("unable to get tag\n"); + return -1; + } + if(stag.s==NULL || stag.len == 0) { + LM_ERR("invalid tag parameter\n"); + return -1; + } + if(evapi_relay_unicast(&sdata, &stag)<0) { + LM_ERR("failed to relay event: [[%.*s]] to [%.*s] \n", + sdata.len, sdata.s, stag.len, stag.s); + return -1; + } + return 1; +} + + +static int w_evapi_async_unicast(sip_msg_t *msg, char *evdata, char *ptag) +{ + str sdata; + str stag; + unsigned int tindex; + unsigned int tlabel; + tm_cell_t *t = 0; + + if(evdata==0) { + LM_ERR("invalid parameters\n"); + return -1; + } + + if(tmb.t_suspend==NULL) { + LM_ERR("evapi async relay is disabled - tm module not loaded\n"); + return -1; + } + + t = tmb.t_gett(); + if (t==NULL || t==T_UNDEFINED) + { + if(tmb.t_newtran(msg)<0) + { + LM_ERR("cannot create the transaction\n"); + return -1; + } + t = tmb.t_gett(); + if (t==NULL || t==T_UNDEFINED) + { + LM_ERR("cannot lookup the transaction\n"); + return -1; + } + } + if(tmb.t_suspend(msg, &tindex, &tlabel)<0) + { + LM_ERR("failed to suspend request processing\n"); + return -1; + } + + LM_DBG("transaction suspended [%u:%u]\n", tindex, tlabel); + + if(fixup_get_svalue(msg, (gparam_t*)evdata, &sdata)!=0) { + LM_ERR("unable to get data\n"); + return -1; + } + if(sdata.s==NULL || sdata.len == 0) { + LM_ERR("invalid data parameter\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t*)ptag, &stag)!=0) { + LM_ERR("unable to get tag\n"); + return -1; + } + if(stag.s==NULL || stag.len == 0) { + LM_ERR("invalid tag parameter\n"); + return -1; + } + + if(evapi_relay_unicast(&sdata, &stag)<0) { + LM_ERR("failed to relay event: [[%.*s]] to [%.*s] \n", + sdata.len, sdata.s, stag.len, stag.s); + return -2; + } + return 1; +} + /** * */