Skip to content

Commit

Permalink
nats: add KEMI publish function and event_callback param (#3154)
Browse files Browse the repository at this point in the history
  • Loading branch information
seven1240 committed Jun 24, 2022
1 parent 0667917 commit 25d42b7
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 19 deletions.
3 changes: 3 additions & 0 deletions src/modules/nats/defs.h
Expand Up @@ -27,6 +27,7 @@

#include <nats/nats.h>
#include <uv.h>
#include "../../core/str.h"

#define NATS_DEFAULT_URL "nats://localhost:4222"
#define NATS_MAX_SERVERS 10
Expand Down Expand Up @@ -63,6 +64,8 @@ typedef struct _init_nats_server
typedef struct _nats_on_message
{
int rt;
char *_evname;
str evname;
} nats_on_message, *nats_on_message_ptr;

struct nats_consumer_worker
Expand Down
33 changes: 32 additions & 1 deletion src/modules/nats/doc/nats_admin.xml
Expand Up @@ -173,6 +173,37 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020")
modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject
modparam("nats", "subject_queue_group", "MyQueue1:2021")
modparam("nats", "subject_queue_group", "MyQueue2:2021")
...
</programlisting>
</example>
</section>
<section>
<title>
<varname>event_callback</varname>
(str)
</title>
<para>
Name of the KEMI function to be executed instead of the event route.
</para>
<para>
<emphasis>Default value is not set.</emphasis>
</para>
<example>
<title>
Set
<varname>event_callback</varname>
parameter
</title>
<programlisting format="linespecific">
...
modparam("nats", "event_callback", "ksr_nats_event")

-- event callback function implemented in Lua
function ksr_nats_event(evname)
KSR.info("===== nats module received event: " .. evname ..
", data:" .. KSR.pv.gete('$natsData') .. "\n");
return 1;
end
...
</programlisting>
</example>
Expand Down Expand Up @@ -254,4 +285,4 @@ event_route[nats:MyQueue1]
</section>


</chapter>
</chapter>
85 changes: 68 additions & 17 deletions src/modules/nats/nats_mod.c
Expand Up @@ -24,6 +24,8 @@

#include "defs.h"
#include "nats_mod.h"
#include "nats_pub.h"
#include "../../core/kemi.h"

MODULE_VERSION

Expand All @@ -38,6 +40,7 @@ char *eventData = NULL;

int *nats_pub_worker_pipes_fds = NULL;
int *nats_pub_worker_pipes = NULL;
static str nats_event_callback = STR_NULL;

static nats_evroutes_t _nats_rts;

Expand All @@ -50,7 +53,10 @@ static param_export_t params[] = {
{"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add},
{"num_publish_workers", INT_PARAM, &nats_pub_workers_num},
{"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM,
(void *)_init_nats_sub_add}};
(void *)_init_nats_sub_add},
{"event_callback", PARAM_STR, &nats_event_callback},
{0, 0, 0}
};

static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f,
2, fixup_publish_get_value,
Expand All @@ -73,39 +79,35 @@ static void onMsg(
natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
nats_on_message_ptr on_message = (nats_on_message_ptr)closure;
char *s = (char *)natsMsg_GetSubject(msg);
char *data = (char *)natsMsg_GetData(msg);
if(on_message->rt < 0 || event_rt.rlist[on_message->rt] == NULL) {
LM_INFO("event-route [nats:%s] does not exist\n", s);
goto end;
}
eventData = data;
nats_run_cfg_route(on_message->rt);

end:
nats_run_cfg_route(on_message->rt, &on_message->evname);
eventData = NULL;
natsMsg_Destroy(msg);
}

static void connectedCB(natsConnection *nc, void *closure)
{
char url[NATS_URL_MAX_SIZE];
str evname = str_init("nats:connected");
natsConnection_GetConnectedUrl(nc, url, sizeof(url));
nats_run_cfg_route(_nats_rts.connected);
nats_run_cfg_route(_nats_rts.connected, &evname);
}

static void disconnectedCb(natsConnection *nc, void *closure)
{
char url[NATS_URL_MAX_SIZE];
str evname = str_init("nats:disconnected");
natsConnection_GetConnectedUrl(nc, url, sizeof(url));
nats_run_cfg_route(_nats_rts.disconnected);
nats_run_cfg_route(_nats_rts.disconnected, &evname);
}

static void reconnectedCb(natsConnection *nc, void *closure)
{
char url[NATS_URL_MAX_SIZE];
str evname = str_init("nats:connected");
natsConnection_GetConnectedUrl(nc, url, sizeof(url));
nats_run_cfg_route(_nats_rts.connected);
nats_run_cfg_route(_nats_rts.connected, &evname);
}

static void closedCB(natsConnection *nc, void *closure)
Expand Down Expand Up @@ -247,9 +249,13 @@ int init_worker(
if(rt < 0 || event_rt.rlist[rt] == NULL) {
LM_INFO("route [%s] does not exist\n", routename);
worker->on_message->rt = -1;
return 0;
} else {
worker->on_message->rt = rt;
}
worker->on_message->rt = rt;
worker->on_message->_evname = malloc(buffsize);
strcpy(worker->on_message->_evname, routename);
worker->on_message->evname.s = worker->on_message->_evname;
worker->on_message->evname.len = strlen(worker->on_message->_evname);
worker->nc = nc;
return 0;
}
Expand Down Expand Up @@ -565,6 +571,9 @@ int nats_destroy_workers()
}
}
if(worker->on_message != NULL) {
if (worker->on_message->_evname) {
free(worker->on_message->_evname);
}
shm_free(worker->on_message);
}
shm_free(worker);
Expand Down Expand Up @@ -657,22 +666,32 @@ int _init_nats_sub_add(modparam_t type, void *val)
/**
* Invoke a event route block
*/
int nats_run_cfg_route(int rt)
int nats_run_cfg_route(int rt, str *evname)
{
struct run_act_ctx ctx;
sr_kemi_eng_t *keng = NULL;
sip_msg_t *fmsg;
sip_msg_t tmsg;

keng = sr_kemi_eng_get();

// check for valid route pointer
if(rt < 0) {
return 0;
if(rt < 0 || !event_rt.rlist[rt]) {
if (keng == NULL) return 0;
}

fmsg = faked_msg_next();
memcpy(&tmsg, fmsg, sizeof(sip_msg_t));
fmsg = &tmsg;
set_route_type(EVENT_ROUTE);
init_run_actions_ctx(&ctx);
if (rt < 0 && keng) {
if (sr_kemi_route(keng, fmsg, EVENT_ROUTE,
&nats_event_callback, evname) < 0) {
LM_ERR("error running event route kemi callback\n");
}
return 0;
}
run_top_route(event_rt.rlist[rt], fmsg, 0);
return 0;
}
Expand Down Expand Up @@ -791,3 +810,35 @@ int nats_pv_get_event_payload(
return eventData == NULL ? pv_get_null(msg, param, res)
: pv_get_strzval(msg, param, res, eventData);
}

/**
*
*/
int ki_nats_publish(sip_msg_t *msg, str *subject, str *payload)
{
return w_nats_publish(msg, *subject, *payload);
}

/**
*
*/
/* clang-format off */
static sr_kemi_t sr_kemi_nats_exports[] = {
{ str_init("nats"), str_init("publish"),
SR_KEMIP_INT, ki_nats_publish,
{ SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
},

{ {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
};
/* clang-format on */

/**
*
*/
int mod_register(char *path, int *dlflags, void *p1, void *p2)
{
sr_kemi_modules_add(sr_kemi_nats_exports);
return 0;
}
2 changes: 1 addition & 1 deletion src/modules/nats/nats_mod.h
Expand Up @@ -40,7 +40,7 @@ extern int fixup_publish_get_value(void **param, int param_no);
extern int fixup_publish_get_value_free(void **param, int param_no);
extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events);

int nats_run_cfg_route(int rt);
int nats_run_cfg_route(int rt, str *evname);
void nats_init_environment();

int _init_nats_server_url_add(modparam_t type, void *val);
Expand Down
5 changes: 5 additions & 0 deletions src/modules/nats/nats_pub.c
Expand Up @@ -90,6 +90,11 @@ int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload)
return -1;
}

return w_nats_publish(msg, subj_s, payload_s);
}

int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s)
{
// round-robin pub workers
pub_worker++;
if(pub_worker >= nats_pub_workers_num) {
Expand Down
1 change: 1 addition & 0 deletions src/modules/nats/nats_pub.h
Expand Up @@ -38,6 +38,7 @@ typedef struct _nats_pub_delivery
nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload);
void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr);
int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload);
int w_nats_publish(sip_msg_t *msg, str subj_s, str payload_s);
int fixup_publish_get_value(void **param, int param_no);
int fixup_publish_get_value_free(void **param, int param_no);

Expand Down

0 comments on commit 25d42b7

Please sign in to comment.