From 25d42b7e69cc6242c66887a2816499d841e3ba89 Mon Sep 17 00:00:00 2001 From: Seven Du Date: Fri, 24 Jun 2022 18:44:28 +0800 Subject: [PATCH] nats: add KEMI publish function and event_callback param (#3154) --- src/modules/nats/defs.h | 3 + src/modules/nats/doc/nats_admin.xml | 33 ++++++++++- src/modules/nats/nats_mod.c | 85 +++++++++++++++++++++++------ src/modules/nats/nats_mod.h | 2 +- src/modules/nats/nats_pub.c | 5 ++ src/modules/nats/nats_pub.h | 1 + 6 files changed, 110 insertions(+), 19 deletions(-) diff --git a/src/modules/nats/defs.h b/src/modules/nats/defs.h index 0ec06451e55..f6b6b639eee 100644 --- a/src/modules/nats/defs.h +++ b/src/modules/nats/defs.h @@ -27,6 +27,7 @@ #include #include +#include "../../core/str.h" #define NATS_DEFAULT_URL "nats://localhost:4222" #define NATS_MAX_SERVERS 10 @@ -63,6 +64,8 @@ typedef struct _init_nats_server typedef struct _nats_on_message { int rt; + char *_evname; + str evname; } nats_on_message, *nats_on_message_ptr; struct nats_consumer_worker diff --git a/src/modules/nats/doc/nats_admin.xml b/src/modules/nats/doc/nats_admin.xml index b530b6b9f18..ed6ca4a29d0 100644 --- a/src/modules/nats/doc/nats_admin.xml +++ b/src/modules/nats/doc/nats_admin.xml @@ -173,6 +173,37 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020") modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject modparam("nats", "subject_queue_group", "MyQueue1:2021") modparam("nats", "subject_queue_group", "MyQueue2:2021") +... + + + +
+ + <varname>event_callback</varname> + (str) + + + Name of the KEMI function to be executed instead of the event route. + + + Default value is not set. + + + + Set + <varname>event_callback</varname> + parameter + + +... +modparam("nats", "event_callback", "ksr_nats_event") + +-- event callback function implemented in Lua +function ksr_nats_event(evname) + KSR.info("===== nats module received event: " .. evname .. + ", data:" .. KSR.pv.gete('$natsData') .. "\n"); + return 1; +end ... @@ -254,4 +285,4 @@ event_route[nats:MyQueue1]
- \ No newline at end of file + diff --git a/src/modules/nats/nats_mod.c b/src/modules/nats/nats_mod.c index 6108d555e81..852d0fbbab2 100644 --- a/src/modules/nats/nats_mod.c +++ b/src/modules/nats/nats_mod.c @@ -24,6 +24,8 @@ #include "defs.h" #include "nats_mod.h" +#include "nats_pub.h" +#include "../../core/kemi.h" MODULE_VERSION @@ -38,6 +40,7 @@ char *eventData = NULL; int *nats_pub_worker_pipes_fds = NULL; int *nats_pub_worker_pipes = NULL; +static str nats_event_callback = STR_NULL; static nats_evroutes_t _nats_rts; @@ -50,7 +53,10 @@ static param_export_t params[] = { {"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add}, {"num_publish_workers", INT_PARAM, &nats_pub_workers_num}, {"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM, - (void *)_init_nats_sub_add}}; + (void *)_init_nats_sub_add}, + {"event_callback", PARAM_STR, &nats_event_callback}, + {0, 0, 0} +}; static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f, 2, fixup_publish_get_value, @@ -73,16 +79,9 @@ static void onMsg( natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { nats_on_message_ptr on_message = (nats_on_message_ptr)closure; - char *s = (char *)natsMsg_GetSubject(msg); char *data = (char *)natsMsg_GetData(msg); - if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) { - LM_INFO("event-route [nats:%s] does not exist\n", s); - goto end; - } eventData = data; - nats_run_cfg_route(on_message->rt); - -end: + nats_run_cfg_route(on_message->rt, &on_message->evname); eventData = NULL; natsMsg_Destroy(msg); } @@ -90,22 +89,25 @@ static void onMsg( static void connectedCB(natsConnection *nc, void *closure) { char url[NATS_URL_MAX_SIZE]; + str evname = str_init("nats:connected"); natsConnection_GetConnectedUrl(nc, url, sizeof(url)); - nats_run_cfg_route(_nats_rts.connected); + nats_run_cfg_route(_nats_rts.connected, &evname); } static void disconnectedCb(natsConnection *nc, void *closure) { char url[NATS_URL_MAX_SIZE]; + str evname = str_init("nats:disconnected"); natsConnection_GetConnectedUrl(nc, url, sizeof(url)); - nats_run_cfg_route(_nats_rts.disconnected); + nats_run_cfg_route(_nats_rts.disconnected, &evname); } static void reconnectedCb(natsConnection *nc, void *closure) { char url[NATS_URL_MAX_SIZE]; + str evname = str_init("nats:connected"); natsConnection_GetConnectedUrl(nc, url, sizeof(url)); - nats_run_cfg_route(_nats_rts.connected); + nats_run_cfg_route(_nats_rts.connected, &evname); } static void closedCB(natsConnection *nc, void *closure) @@ -247,9 +249,13 @@ int init_worker( if(rt < 0 || event_rt.rlist[rt] == NULL) { LM_INFO("route [%s] does not exist\n", routename); worker->on_message->rt = -1; - return 0; + } else { + worker->on_message->rt = rt; } - worker->on_message->rt = rt; + worker->on_message->_evname = malloc(buffsize); + strcpy(worker->on_message->_evname, routename); + worker->on_message->evname.s = worker->on_message->_evname; + worker->on_message->evname.len = strlen(worker->on_message->_evname); worker->nc = nc; return 0; } @@ -565,6 +571,9 @@ int nats_destroy_workers() } } if(worker->on_message != NULL) { + if (worker->on_message->_evname) { + free(worker->on_message->_evname); + } shm_free(worker->on_message); } shm_free(worker); @@ -657,15 +666,18 @@ int _init_nats_sub_add(modparam_t type, void *val) /** * Invoke a event route block */ -int nats_run_cfg_route(int rt) +int nats_run_cfg_route(int rt, str *evname) { struct run_act_ctx ctx; + sr_kemi_eng_t *keng = NULL; sip_msg_t *fmsg; sip_msg_t tmsg; + keng = sr_kemi_eng_get(); + // check for valid route pointer - if(rt < 0) { - return 0; + if(rt < 0 || !event_rt.rlist[rt]) { + if (keng == NULL) return 0; } fmsg = faked_msg_next(); @@ -673,6 +685,13 @@ int nats_run_cfg_route(int rt) fmsg = &tmsg; set_route_type(EVENT_ROUTE); init_run_actions_ctx(&ctx); + if (rt < 0 && keng) { + if (sr_kemi_route(keng, fmsg, EVENT_ROUTE, + &nats_event_callback, evname) < 0) { + LM_ERR("error running event route kemi callback\n"); + } + return 0; + } run_top_route(event_rt.rlist[rt], fmsg, 0); return 0; } @@ -791,3 +810,35 @@ int nats_pv_get_event_payload( return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData); } + +/** + * + */ +int ki_nats_publish(sip_msg_t *msg, str *subject, str *payload) +{ + return w_nats_publish(msg, *subject, *payload); +} + +/** + * + */ +/* clang-format off */ +static sr_kemi_t sr_kemi_nats_exports[] = { + { str_init("nats"), str_init("publish"), + SR_KEMIP_INT, ki_nats_publish, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + + { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } } +}; +/* clang-format on */ + +/** + * + */ +int mod_register(char *path, int *dlflags, void *p1, void *p2) +{ + sr_kemi_modules_add(sr_kemi_nats_exports); + return 0; +} diff --git a/src/modules/nats/nats_mod.h b/src/modules/nats/nats_mod.h index 215c4f50d9e..3bec1f5270b 100644 --- a/src/modules/nats/nats_mod.h +++ b/src/modules/nats/nats_mod.h @@ -40,7 +40,7 @@ extern int fixup_publish_get_value(void **param, int param_no); extern int fixup_publish_get_value_free(void **param, int param_no); extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events); -int nats_run_cfg_route(int rt); +int nats_run_cfg_route(int rt, str *evname); void nats_init_environment(); int _init_nats_server_url_add(modparam_t type, void *val); diff --git a/src/modules/nats/nats_pub.c b/src/modules/nats/nats_pub.c index e1d3cc181ec..25d55362081 100644 --- a/src/modules/nats/nats_pub.c +++ b/src/modules/nats/nats_pub.c @@ -90,6 +90,11 @@ int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload) return -1; } + return w_nats_publish(msg, subj_s, payload_s); +} + +int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s) +{ // round-robin pub workers pub_worker++; if(pub_worker >= nats_pub_workers_num) { diff --git a/src/modules/nats/nats_pub.h b/src/modules/nats/nats_pub.h index a509c6145aa..1c0e7dee50f 100644 --- a/src/modules/nats/nats_pub.h +++ b/src/modules/nats/nats_pub.h @@ -38,6 +38,7 @@ typedef struct _nats_pub_delivery nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload); void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr); int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload); +int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s); int fixup_publish_get_value(void **param, int param_no); int fixup_publish_get_value_free(void **param, int param_no);