diff --git a/src/modules/nats/defs.h b/src/modules/nats/defs.h new file mode 100644 index 00000000000..5f7dfafe0bd --- /dev/null +++ b/src/modules/nats/defs.h @@ -0,0 +1,92 @@ +/* + * NATS module interface + * + * Copyright (C) 2021 Voxcom Inc + * + * This file is part of Kamailio, a free SIP server. + * + * Kamailio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * Kamailio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + */ + +#ifndef __NATS_DEFS_H_ +#define __NATS_DEFS_H_ + +#include +#include + +#define NATS_DEFAULT_URL "nats://localhost:4222" +#define NATS_MAX_SERVERS 10 +#define NATS_URL_MAX_SIZE 256 +#define DEFAULT_NUM_PUB_WORKERS 2 + +typedef struct _nats_connection +{ + natsConnection *conn; + natsOptions *opts; + char *servers[NATS_MAX_SERVERS]; +} nats_connection, *nats_connection_ptr; + + +typedef struct _nats_evroutes +{ + int connected; + int disconnected; +} nats_evroutes_t; +static nats_evroutes_t _nats_rts; + +typedef struct _init_nats_sub +{ + char *sub; + char *queue_group; + struct _init_nats_sub *next; +} init_nats_sub, *init_nats_sub_ptr; + +typedef struct _init_nats_server +{ + char *url; + struct _init_nats_server *next; +} init_nats_server, *init_nats_server_ptr; + +typedef struct _nats_on_message +{ + int rt; +} nats_on_message, *nats_on_message_ptr; + +struct nats_consumer_worker +{ + char *subject; + char *queue_group; + int pid; + natsSubscription *subscription; + uv_loop_t *uvLoop; + nats_connection_ptr nc; + nats_on_message_ptr on_message; +}; +typedef struct nats_consumer_worker nats_consumer_worker_t; + +struct nats_pub_worker +{ + int pid; + int fd; + uv_loop_t *uvLoop; + uv_pipe_t pipe; + uv_poll_t poll; + nats_connection_ptr nc; +}; +typedef struct nats_pub_worker nats_pub_worker_t; + +#endif diff --git a/src/modules/nats/doc/nats_admin.xml b/src/modules/nats/doc/nats_admin.xml index 344e25e0d10..b530b6b9f18 100644 --- a/src/modules/nats/doc/nats_admin.xml +++ b/src/modules/nats/doc/nats_admin.xml @@ -117,6 +117,33 @@ modparam("nats", "nats_url", "nats://127.0.0.1:4222") modparam("nats", "nats_url", "nats://user1:pass1127.0.1.2:4222") // with auth modparam("nats", "nats_url", "nats://127.1.2.3:4222") ... + + + +
+ + <varname>num_publish_workers</varname> + (int) + + + The number of worker threads for publishing messages. + + + Usage: nats related. + + + Default value is 2. + + + + Set + <varname>num_publish_workers</varname> + parameter + + +... +modparam("nats", "num_publish_workers", 4) +...
@@ -146,6 +173,29 @@ modparam("nats", "subject_queue_group", "Kamailio-World:2020") modparam("nats", "subject_queue_group", "Kamailio-World:2021") // this will create two processes for the Kamailio-World subject modparam("nats", "subject_queue_group", "MyQueue1:2021") modparam("nats", "subject_queue_group", "MyQueue2:2021") +... + + + + +
+ Functions +
+ + <function moreinfo="none">nats_publish(subject, payload)</function> + + + Publishes the payload to subject. + + + + <function>nats_publish</function> + usage + + +... +$var(my_info) = "$ci=" + $ci + " $fU=" + $fU; +nats_publish("mysubject", "$var(my_info)"); # publish $var(my_info) to "mysubject" ... diff --git a/src/modules/nats/nats_mod.c b/src/modules/nats/nats_mod.c index a1374b20c68..05f71ea4438 100644 --- a/src/modules/nats/nats_mod.c +++ b/src/modules/nats/nats_mod.c @@ -22,6 +22,7 @@ * */ +#include "defs.h" #include "nats_mod.h" MODULE_VERSION @@ -29,23 +30,34 @@ MODULE_VERSION init_nats_sub_ptr _init_nats_sc = NULL; init_nats_server_ptr _init_nats_srv = NULL; nats_consumer_worker_t *nats_workers = NULL; -nats_connection_ptr _nats_connection = NULL; +nats_pub_worker_t *nats_pub_workers = NULL; +int nats_pub_workers_num = DEFAULT_NUM_PUB_WORKERS; + int _nats_proc_count; char *eventData = NULL; +int *nats_pub_worker_pipes_fds = NULL; +int *nats_pub_worker_pipes = NULL; + static pv_export_t nats_mod_pvs[] = { {{"natsData", (sizeof("natsData") - 1)}, PVT_OTHER, nats_pv_get_event_payload, 0, 0, 0, 0, 0}, {{0, 0}, 0, 0, 0, 0, 0, 0, 0}}; -static param_export_t params[] = {{"nats_url", PARAM_STRING | USE_FUNC_PARAM, - (void *)_init_nats_server_url_add}, +static param_export_t params[] = { + {"nats_url", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_server_url_add}, + {"num_publish_workers", INT_PARAM, &nats_pub_workers_num}, {"subject_queue_group", PARAM_STRING | USE_FUNC_PARAM, (void *)_init_nats_sub_add}}; +static cmd_export_t cmds[] = {{"nats_publish", (cmd_function)w_nats_publish_f, + 2, fixup_publish_get_value, + fixup_publish_get_value_free, ANY_ROUTE}, + {0, 0, 0, 0, 0, 0}}; + struct module_exports exports = { "nats", DEFAULT_DLFLAGS, /* dlopen flags */ - 0, /* Exported functions */ + cmds, /* Exported functions */ params, /* Exported parameters */ 0, /* exported MI functions */ nats_mod_pvs, /* exported pseudo-variables */ @@ -104,12 +116,13 @@ static void closedCB(natsConnection *nc, void *closure) } void nats_consumer_worker_proc( - nats_consumer_worker_t *worker, nats_connection_ptr c) + nats_consumer_worker_t *worker) { natsStatus s = NATS_OK; // create a loop natsLibuv_Init(); + worker->uvLoop = uv_default_loop(); if(worker->uvLoop != NULL) { natsLibuv_SetThreadLocalLoop(worker->uvLoop); @@ -119,23 +132,20 @@ void nats_consumer_worker_proc( if(s != NATS_OK) { LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s)); } - if((s = natsConnection_Connect(&worker->conn, c->opts)) != NATS_OK) { + if((s = natsConnection_Connect(&worker->nc->conn, worker->nc->opts)) + != NATS_OK) { LM_ERR("could not connect to nats servers [%s]\n", natsStatus_GetText(s)); } - s = natsOptions_SetEventLoop(c->opts, (void *)worker->uvLoop, + s = natsOptions_SetEventLoop(worker->nc->opts, (void *)worker->uvLoop, natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write, natsLibuv_Detach); if(s != NATS_OK) { LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s)); } - if(s) { - LM_ERR("error setting options [%s]\n", natsStatus_GetText(s)); - } - - s = natsConnection_QueueSubscribe(&worker->subscription, worker->conn, + s = natsConnection_QueueSubscribe(&worker->subscription, worker->nc->conn, worker->subject, worker->queue_group, onMsg, worker->on_message); if(s != NATS_OK) { LM_ERR("could not subscribe [%s]\n", natsStatus_GetText(s)); @@ -159,17 +169,30 @@ void nats_consumer_worker_proc( static int mod_init(void) { + int i = 0; + int total_procs = _nats_proc_count + nats_pub_workers_num; if(faked_msg_init() < 0) { LM_ERR("failed to init faked sip message\n"); return -1; } - nats_init_environment(); - _nats_connection = _init_nats_connection(); - if(nats_init_connection(_nats_connection) < 0) { - LM_ERR("failed to init nat connections\n"); - return -1; + register_procs(total_procs); + + nats_pub_worker_pipes_fds = + (int *)shm_malloc(sizeof(int) * (nats_pub_workers_num)*2); + nats_pub_worker_pipes = + (int *)shm_malloc(sizeof(int) * nats_pub_workers_num); + for(i = 0; i < nats_pub_workers_num; i++) { + nats_pub_worker_pipes_fds[i * 2] = + nats_pub_worker_pipes_fds[i * 2 + 1] = -1; + if(pipe(&nats_pub_worker_pipes_fds[i * 2]) < 0) { + LM_ERR("worker pipe(%d) failed\n", i); + return -1; + } + } + for(i = 0; i < nats_pub_workers_num; i++) { + nats_pub_worker_pipes[i] = nats_pub_worker_pipes_fds[i * 2 + 1]; } - register_procs(_nats_proc_count); + nats_workers = shm_malloc(_nats_proc_count * sizeof(nats_consumer_worker_t)); if(nats_workers == NULL) { @@ -177,6 +200,15 @@ static int mod_init(void) return -1; } memset(nats_workers, 0, _nats_proc_count * sizeof(nats_consumer_worker_t)); + + nats_pub_workers = + shm_malloc(nats_pub_workers_num * sizeof(nats_pub_worker_t)); + if(nats_pub_workers == NULL) { + LM_ERR("error in shm_malloc\n"); + return -1; + } + memset(nats_pub_workers, 0, + nats_pub_workers_num * sizeof(nats_pub_worker_t)); return 0; } @@ -186,6 +218,14 @@ int init_worker( int buffsize = strlen(subject) + 6; char routename[buffsize]; int rt; + nats_connection_ptr nc = NULL; + + nats_init_environment(); + nc = _init_nats_connection(); + if(nats_init_connection(nc) < 0) { + LM_ERR("failed to init nat connections\n"); + return -1; + } memset(worker, 0, sizeof(*worker)); worker->subject = shm_malloc(strlen(subject) + 1); @@ -208,18 +248,77 @@ int init_worker( return 0; } worker->on_message->rt = rt; + worker->nc = nc; return 0; } -void worker_loop(int id, nats_connection_ptr c) +int init_pub_worker( + nats_pub_worker_t *worker) +{ + nats_connection_ptr nc = NULL; + nc = _init_nats_connection(); + if(nats_init_connection(nc) < 0) { + LM_ERR("failed to init nat connections\n"); + return -1; + } + memset(worker, 0, sizeof(*worker)); + worker->nc = nc; + return 0; +} + +void worker_loop(int id) { nats_consumer_worker_t *worker = &nats_workers[id]; - nats_consumer_worker_proc(worker, c); + nats_consumer_worker_proc(worker); for(;;) { sleep(1000); } } +int _nats_pub_worker_proc( + nats_pub_worker_t *worker, int fd) +{ + natsStatus s = NATS_OK; + + natsLibuv_Init(); + worker->fd = fd; + worker->uvLoop = uv_default_loop(); + if(worker->uvLoop != NULL) { + natsLibuv_SetThreadLocalLoop(worker->uvLoop); + } else { + s = NATS_ERR; + } + if(s != NATS_OK) { + LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s)); + } + + if((s = natsConnection_Connect(&worker->nc->conn, worker->nc->opts)) + != NATS_OK) { + LM_ERR("could not connect to nats servers [%s]\n", + natsStatus_GetText(s)); + } + s = natsOptions_SetEventLoop(worker->nc->opts, (void *)worker->uvLoop, + natsLibuv_Attach, natsLibuv_Read, natsLibuv_Write, + natsLibuv_Detach); + if(s != NATS_OK) { + LM_ERR("could not set event loop [%s]\n", natsStatus_GetText(s)); + } + + uv_pipe_init(worker->uvLoop, &worker->pipe, 0); + uv_pipe_open(&worker->pipe, worker->fd); + if(uv_poll_init(worker->uvLoop, &worker->poll, worker->fd) < 0) { + LM_ERR("uv_poll_init failed\n"); + return 0; + } + uv_handle_set_data((uv_handle_t *)&worker->poll, (nats_pub_worker_t *)worker); + if(uv_poll_start(&worker->poll, UV_READABLE | UV_DISCONNECT, _nats_pub_worker_cb) + < 0) { + LM_ERR("uv_poll_start failed\n"); + return 0; + } + return uv_run(worker->uvLoop, UV_RUN_DEFAULT); +} + /** * @brief Initialize async module children */ @@ -233,28 +332,51 @@ static int mod_child_init(int rank) n = _init_nats_sc; while(n) { if(init_worker(&nats_workers[i], n->sub, n->queue_group) < 0) { - LM_ERR("failed to init struct for worker[%d]\n", i); + LM_ERR("failed to init struct for worker [%d]\n", i); return -1; } n = n->next; i++; } + + for(i = 0; i < nats_pub_workers_num; i++) { + if(init_pub_worker(&nats_pub_workers[i]) < 0) { + LM_ERR("failed to init struct for pub worker[%d]\n", i); + return -1; + } + } + return 0; } if(rank == PROC_MAIN) { for(i = 0; i < _nats_proc_count; i++) { - newpid = fork_process(PROC_RPC, "NATS WORKER", 1); + newpid = fork_process(PROC_RPC, "NATS Subscriber", 1); if(newpid < 0) { LM_ERR("failed to fork worker process %d\n", i); return -1; } else if(newpid == 0) { - worker_loop(i, _nats_connection); + worker_loop(i); } else { nats_workers[i].pid = newpid; } } - return 0; + + for(i = 0; i < nats_pub_workers_num; i++) { + newpid = fork_process(PROC_NOCHLDINIT, "NATS Publisher", 1); + if(newpid < 0) { + LM_ERR("failed to fork worker process %d\n", i); + return -1; + } else if(newpid == 0) { + if(cfg_child_init()) + return -1; + close(nats_pub_worker_pipes_fds[i * 2 + 1]); + cfg_update(); + return (_nats_pub_worker_proc(&nats_pub_workers[i], nats_pub_worker_pipes_fds[i * 2])); + } else { + nats_pub_workers[i].pid = newpid; + } + } } return 0; @@ -395,6 +517,10 @@ int nats_cleanup_init_servers() int nats_cleanup_connection(nats_connection_ptr c) { + if(c->conn != NULL) { + natsConnection_Close(c->conn); + natsConnection_Destroy(c->conn); + } if(c->opts != NULL) { natsOptions_Destroy(c->opts); } @@ -411,6 +537,7 @@ int nats_destroy_workers() { int i; nats_consumer_worker_t *worker; + nats_pub_worker_t *pub_worker; for(i = 0; i < _nats_proc_count; i++) { worker = &nats_workers[i]; if(worker != NULL) { @@ -418,10 +545,6 @@ int nats_destroy_workers() natsSubscription_Unsubscribe(worker->subscription); natsSubscription_Destroy(worker->subscription); } - if(worker->conn != NULL) { - natsConnection_Close(worker->conn); - natsConnection_Destroy(worker->conn); - } if(worker->uvLoop != NULL) { uv_loop_close(worker->uvLoop); } @@ -431,12 +554,30 @@ int nats_destroy_workers() if(worker->queue_group != NULL) { shm_free(worker->queue_group); } + if(worker->nc != NULL) { + if(nats_cleanup_connection(worker->nc) < 0) { + LM_ERR("could not cleanup worker connection\n"); + } + } if(worker->on_message != NULL) { shm_free(worker->on_message); } shm_free(worker); } } + + for(i = 0; i < nats_pub_workers_num; i++) { + pub_worker = &nats_pub_workers[i]; + if(pub_worker != NULL) { + if(pub_worker->nc != NULL) { + if(nats_cleanup_connection(pub_worker->nc) < 0) { + LM_ERR("could not cleanup worker connection\n"); + } + } + uv_poll_stop(&pub_worker->poll); + shm_free(pub_worker); + } + } return 0; } @@ -451,12 +592,15 @@ static void mod_destroy(void) if(nats_cleanup_init_sub() < 0) { LM_INFO("could not cleanup init data\n"); } - if(nats_cleanup_connection(_nats_connection) < 0) { - LM_INFO("could not cleanup connection\n"); - } if(nats_cleanup_init_servers() < 0) { LM_INFO("could not cleanup init server data\n"); } + if(nats_pub_worker_pipes_fds) { + shm_free(nats_pub_worker_pipes_fds); + } + if(nats_pub_worker_pipes) { + shm_free(nats_pub_worker_pipes); + } } int _init_nats_server_url_add(modparam_t type, void *val) diff --git a/src/modules/nats/nats_mod.h b/src/modules/nats/nats_mod.h index 747d059ba31..215c4f50d9e 100644 --- a/src/modules/nats/nats_mod.h +++ b/src/modules/nats/nats_mod.h @@ -26,63 +26,20 @@ #define __NATS_MOD_H_ #include -#include #include #include "../json/api.h" #include "../../core/cfg/cfg_struct.h" #include "../../core/fmsg.h" -#define NATS_DEFAULT_URL "nats://localhost:4222" -#define NATS_MAX_SERVERS 10 -#define NATS_URL_MAX_SIZE 256 - -typedef struct _nats_evroutes -{ - int connected; - int disconnected; -} nats_evroutes_t; -static nats_evroutes_t _nats_rts; - -typedef struct _init_nats_sub -{ - char *sub; - char *queue_group; - struct _init_nats_sub *next; -} init_nats_sub, *init_nats_sub_ptr; - -typedef struct _init_nats_server -{ - char *url; - struct _init_nats_server *next; -} init_nats_server, *init_nats_server_ptr; - -typedef struct _nats_on_message -{ - int rt; -} nats_on_message, *nats_on_message_ptr; - -typedef struct _nats_connection -{ - natsOptions *opts; - char *servers[NATS_MAX_SERVERS]; -} nats_connection, *nats_connection_ptr; - -struct nats_consumer_worker -{ - char *subject; - char *queue_group; - int pid; - natsConnection *conn; - natsSubscription *subscription; - uv_loop_t *uvLoop; - nats_on_message_ptr on_message; -}; -typedef struct nats_consumer_worker nats_consumer_worker_t; - static int mod_init(void); static int mod_child_init(int); static void mod_destroy(void); +extern int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload); +extern int fixup_publish_get_value(void **param, int param_no); +extern int fixup_publish_get_value_free(void **param, int param_no); +extern void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events); + int nats_run_cfg_route(int rt); void nats_init_environment(); @@ -100,7 +57,8 @@ int init_nats_sub_add(char *sub); int nats_cleanup_init_sub(); void nats_consumer_worker_proc( - nats_consumer_worker_t *worker, nats_connection_ptr c); + nats_consumer_worker_t *worker); int nats_pv_get_event_payload(struct sip_msg *, pv_param_t *, pv_value_t *); + #endif diff --git a/src/modules/nats/nats_pub.c b/src/modules/nats/nats_pub.c new file mode 100644 index 00000000000..e1d3cc181ec --- /dev/null +++ b/src/modules/nats/nats_pub.c @@ -0,0 +1,130 @@ +/* + * NATS module interface + * + * Copyright (C) 2021 Voxcom Inc + * + * This file is part of Kamailio, a free SIP server. + * + * Kamailio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * Kamailio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + */ + +#include "defs.h" +#include "nats_pub.h" + +extern int *nats_pub_worker_pipes; +extern int nats_pub_workers_num; +extern nats_pub_worker_t *nats_pub_workers; +int pub_worker = 0; + +int fixup_publish_get_value(void **param, int param_no) +{ + if(param_no == 1 || param_no == 2) { + return fixup_spve_null(param, 1); + } + LM_ERR("invalid parameter number <%d>\n", param_no); + return -1; +} + +int fixup_publish_get_value_free(void **param, int param_no) +{ + if(param_no == 1 || param_no == 2) { + fixup_free_spve_null(param, 1); + return 0; + } + LM_ERR("invalid parameter number <%d>\n", param_no); + return -1; +} + +nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload) +{ + nats_pub_delivery_ptr p = + (nats_pub_delivery_ptr)shm_malloc(sizeof(nats_pub_delivery)); + memset(p, 0, sizeof(nats_pub_delivery)); + + p->subject = shm_malloc(subject.len + 1); + strcpy(p->subject, subject.s); + p->subject[subject.len] = '\0'; + + p->payload = shm_malloc(payload.len + 1); + strcpy(p->payload, payload.s); + p->payload[payload.len] = '\0'; + + return p; +} + +static int _w_nats_publish_f(str subj, str payload, int worker) +{ + nats_pub_delivery_ptr ptr = _nats_pub_delivery_new(subj, payload); + if(write(nats_pub_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) { + LM_ERR("failed to publish message %d, write to " + "command pipe: %s\n", + getpid(), strerror(errno)); + } + return 1; +} + +int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload) +{ + str subj_s = STR_NULL; + str payload_s = STR_NULL; + if(fixup_get_svalue(msg, (gparam_t *)subj, &subj_s) < 0) { + LM_ERR("failed to get subj value\n"); + return -1; + } + if(fixup_get_svalue(msg, (gparam_t *)payload, &payload_s) < 0) { + LM_ERR("failed to get subj value\n"); + return -1; + } + + // round-robin pub workers + pub_worker++; + if(pub_worker >= nats_pub_workers_num) { + pub_worker = 0; + } + + return _w_nats_publish_f(subj_s, payload_s, pub_worker); +} + +void _nats_pub_worker_cb(uv_poll_t *handle, int status, int events) +{ + natsStatus s = NATS_OK; + nats_pub_delivery_ptr ptr; + nats_pub_worker_t *worker = + (nats_pub_worker_t *)uv_handle_get_data((uv_handle_t *)handle); + + if(read(worker->fd, &ptr, sizeof(ptr)) != sizeof(ptr)) { + LM_ERR("failed to read from command pipe: %s\n", strerror(errno)); + return; + } + if((s = natsConnection_PublishString(worker->nc->conn, ptr->subject, ptr->payload)) + != NATS_OK) { + LM_ERR("could not publish to subject [%s] payload [%s] error [%s]\n", ptr->subject, ptr->payload, + natsStatus_GetText(s)); + } + nats_pub_free_delivery_ptr(ptr); +} + +void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr) +{ + if(ptr == NULL) + return; + if(ptr->subject) + shm_free(ptr->subject); + if(ptr->payload) + shm_free(ptr->payload); + shm_free(ptr); +} diff --git a/src/modules/nats/nats_pub.h b/src/modules/nats/nats_pub.h new file mode 100644 index 00000000000..a509c6145aa --- /dev/null +++ b/src/modules/nats/nats_pub.h @@ -0,0 +1,44 @@ +/* + * NATS module interface + * + * Copyright (C) 2021 Voxcom Inc + * + * This file is part of Kamailio, a free SIP server. + * + * Kamailio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * Kamailio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + */ + +#ifndef __NATS_PUB_H_ +#define __NATS_PUB_H_ + +#include +#include "../../core/fmsg.h" +#include "../../core/mod_fix.h" + +typedef struct _nats_pub_delivery +{ + char *subject; + char *payload; +} nats_pub_delivery, *nats_pub_delivery_ptr; + +nats_pub_delivery_ptr _nats_pub_delivery_new(str subject, str payload); +void nats_pub_free_delivery_ptr(nats_pub_delivery_ptr ptr); +int w_nats_publish_f(sip_msg_t *msg, char *subj, char *payload); +int fixup_publish_get_value(void **param, int param_no); +int fixup_publish_get_value_free(void **param, int param_no); + +#endif