From 9b3a7227e2dc12823be28bf1b261de1ebdb3f954 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Thu, 2 Jan 2020 13:40:55 +0100 Subject: [PATCH 1/3] mqtt: module to support mqtt pubsub messaging --- src/modules/mqtt/Makefile | 37 ++ src/modules/mqtt/README | 349 ++++++++++++++ src/modules/mqtt/doc/Makefile | 4 + src/modules/mqtt/doc/mqtt.xml | 37 ++ src/modules/mqtt/doc/mqtt_admin.xml | 399 ++++++++++++++++ src/modules/mqtt/mqtt_dispatch.c | 711 ++++++++++++++++++++++++++++ src/modules/mqtt/mqtt_dispatch.h | 71 +++ src/modules/mqtt/mqtt_mod.c | 394 +++++++++++++++ 8 files changed, 2002 insertions(+) create mode 100644 src/modules/mqtt/Makefile create mode 100644 src/modules/mqtt/README create mode 100644 src/modules/mqtt/doc/Makefile create mode 100644 src/modules/mqtt/doc/mqtt.xml create mode 100644 src/modules/mqtt/doc/mqtt_admin.xml create mode 100644 src/modules/mqtt/mqtt_dispatch.c create mode 100644 src/modules/mqtt/mqtt_dispatch.h create mode 100644 src/modules/mqtt/mqtt_mod.c diff --git a/src/modules/mqtt/Makefile b/src/modules/mqtt/Makefile new file mode 100644 index 00000000000..dfe4be0cf4d --- /dev/null +++ b/src/modules/mqtt/Makefile @@ -0,0 +1,37 @@ +# +# WARNING: do not run this directly, it should be run by the master Makefile + +include ../../Makefile.defs +auto_gen= +NAME=mqtt.so + +ifeq ($(CROSS_COMPILE),) + BUILDER = $(shell which pkg-config) +ifneq ($(BUILDER),) + PKGLIBUV = $(shell $(BUILDER) --exists libmosquitto > /dev/null 2>&1 ; echo $$? ) +ifneq ($(PKGLIBUV),0) + BUILDER = +endif +endif +endif + +ifneq ($(BUILDER),) + DEFS += $(shell $(BUILDER) --cflags libmosquitto) + LIBS += $(shell $(BUILDER) --libs libmosquitto) + DEFS += $(shell $(BUILDER) --cflags libev) + LIBS += $(shell $(BUILDER) --libs libev) +else +ifneq (,$(findstring darwin,$(OS))) + DEFS += -I/opt/local/include -I$(LOCALBASE)/include + LIBS += -L/opt/local/lib -L$(LOCALBASE)/lib -lmosquitto -lev +else + DEFS += -I$(LOCALBASE)/include -I$(SYSBASE)/include + LIBS += -L$(LOCALBASE)/lib -L$(SYSBASE)/lib -lmosquitto -lev +endif +endif + + +SERLIBPATH=../../lib +SER_LIBS+=$(SERLIBPATH)/srutils/srutils + +include ../../Makefile.modules diff --git a/src/modules/mqtt/README b/src/modules/mqtt/README new file mode 100644 index 00000000000..92c720a0ebf --- /dev/null +++ b/src/modules/mqtt/README @@ -0,0 +1,349 @@ +MQTT Module + +Thomas Weber + + + +Edited by + +Thomas weber + + + + Copyright © 2019 Thomas Weber + __________________________________________________________________ + + Table of Contents + + 1. Admin Guide + + 1. Overview + 2. Dependencies + + 2.1. External Libraries or Applications + + 3. Parameters + + 3.1. host (str) + 3.2. port (int) + 3.3. keepalive (int) + 3.4. id (str) + 3.5. username (str) + 3.6. password (str) + 3.7. will_topic (str) + 3.8. will (str) + 3.9. event_callback (str) + + 4. Functions + + 4.1. mqtt_subscribe(topic, qos) + 4.2. mqtt_unsubscribe(topic) + 4.3. mqtt_publish(topic, message, qos) + + 5. Event routes + + 5.1. mqtt:connected + 5.2. mqtt:disconnected + 5.3. mqtt:message + + 6. Exported pseudo-variables + + List of Examples + + 1.1. Set host parameter + 1.2. Set port parameter + 1.3. Set keepalive parameter + 1.4. Set id parameter + 1.5. Set username parameter + 1.6. Set password parameter + 1.7. Set will_topic parameter + 1.8. Set will parameter + 1.9. Set event_callback parameter + 1.10. mqtt_subscribe usage + 1.11. mqtt_unsubscribe usage + 1.12. mqtt_publish usage + +Chapter 1. Admin Guide + + Table of Contents + + 1. Overview + 2. Dependencies + + 2.1. External Libraries or Applications + + 3. Parameters + + 3.1. host (str) + 3.2. port (int) + 3.3. keepalive (int) + 3.4. id (str) + 3.5. username (str) + 3.6. password (str) + 3.7. will_topic (str) + 3.8. will (str) + 3.9. event_callback (str) + + 4. Functions + + 4.1. mqtt_subscribe(topic, qos) + 4.2. mqtt_unsubscribe(topic) + 4.3. mqtt_publish(topic, message, qos) + + 5. Event routes + + 5.1. mqtt:connected + 5.2. mqtt:disconnected + 5.3. mqtt:message + + 6. Exported pseudo-variables + +1. Overview + + The MQTT module allows bidirectional publish/subscribe communication by + connection Kamailio to a MQTT Broker. + + Messages can be published from any point in the routing script. Also + subscription can be fully controlled by scripting commands. + +2. Dependencies + + 2.1. External Libraries or Applications + +2.1. External Libraries or Applications + + The following libraries or applications must be installed before + running Kamailio with this module loaded: + * libev - http://software.schmorp.de/pkg/libev + * libmosquitto - https://mosquitto.org + +3. Parameters + + 3.1. host (str) + 3.2. port (int) + 3.3. keepalive (int) + 3.4. id (str) + 3.5. username (str) + 3.6. password (str) + 3.7. will_topic (str) + 3.8. will (str) + 3.9. event_callback (str) + +3.1. host (str) + + MQTT Broker IP/Hostname + + No default, this parameter is mandatory. + + Example 1.1. Set host parameter +... +modparam("mqtt", "host", "1.2.3.4") +... + +3.2. port (int) + + MQTT Broker port number. + + Default value is 1883. + + Example 1.2. Set port parameter +... +modparam("mqtt", "port", 1883) +... + +3.3. keepalive (int) + + The number of seconds after which the broker should send a PING message + to the kamailio if no other messages have been exchanged in that time. + + Default value is 5. + + Example 1.3. Set keepalive parameter +... +modparam("mqtt", "keepalive", 5) +... + +3.4. id (str) + + String to use as the mqtt client id. If NULL, a random client id will + be generated. + + Default value is NULL. + + Example 1.4. Set id parameter +... +modparam("mqtt", "id", "kamailio123") +... + +3.5. username (str) + + The username to send as a string or NULL to disable authentication. + + Default value is NULL (no authentication). Must be used together with + password. + + Example 1.5. Set username parameter +... +modparam("mqtt", "username", "kamailio-user") +... + +3.6. password (str) + + The password to send as a string or NULL to disable authentication. + + Default value is NULL (no authentication). Must be used together with + username. + + Example 1.6. Set password parameter +... +modparam("mqtt", "password", "supers3cre7") +... + +3.7. will_topic (str) + + The topic on which to publish the mqtt will. + + Default value is NULL. Must be used together with will. + + Example 1.7. Set will_topic parameter +... +modparam("mqtt", "will_topic", "kamailio123") +... + +3.8. will (str) + + The mqtt will payload to be published. + + Default value is NULL. Must be used together with will_topic. + + Example 1.8. Set will parameter +... +modparam("mqtt", "will", "gone") +... + +3.9. event_callback (str) + + The name of the function in the kemi configuration file (embedded + scripting language such as Lua, Python, ...) to be executed instead of + event_route[...] blocks. + + The function receives a string parameter with the name of the event, + the values are: 'mqtt:connected', 'mqtt:disconnected', 'mqtt:message'. + + Default value is 'empty' (no function is executed for events). + + Example 1.9. Set event_callback parameter +... +modparam("mqtt", "event_callback", "ksr_mqtt_event") +... +-- event callback function implemented in Lua +function ksr_mqtt_event(evname) + KSR.info("===== mqtt module triggered event: " .. evname .. "\n"); + return 1; +end + +-- event callback function implemented in Python +function ksr_mqtt_event(self, msg, evname) + KSR.info("===== mqtt module triggered event: %s\n" % evname); + return 1; +end +... + +4. Functions + + 4.1. mqtt_subscribe(topic, qos) + 4.2. mqtt_unsubscribe(topic) + 4.3. mqtt_publish(topic, message, qos) + +4.1. mqtt_subscribe(topic, qos) + + Subscribe to the given topic. Mqtt qos levels 0, 1 and 2 can be used. + + The function is passing the task to mqtt dispatcher process, therefore + the SIP worker process is not blocked. + + Incoming messages for this topic are then handled by the same process + and exposed to the event_route[mqtt:message]. + + This function can be used from ANY_ROUTE. + + Example 1.10. mqtt_subscribe usage +... +mqtt_subscribe("kamailio/script", 0); +... + +4.2. mqtt_unsubscribe(topic) + + Unsubscribe to a previously subscribed topic. The mqtt broker will stop + forwarding messages for this topic. + + This function can be used from ANY_ROUTE. + + Example 1.11. mqtt_unsubscribe usage +... +mqtt_unsubscribe("kamailio/script"); +... + +4.3. mqtt_publish(topic, message, qos) + + Send out a message to a topic with a specified mqtt qos level (0, 1, + 2). Again the actual sending is done in a mqtt dispatcher process and + will not block the SIP worker. + + Example 1.12. mqtt_publish usage +... +mqtt_publish("kamailio/event", "some message", 0); +... + +5. Event routes + + 5.1. mqtt:connected + 5.2. mqtt:disconnected + 5.3. mqtt:message + +5.1. mqtt:connected + + If defined, the module calls event_route[mqtt:connected] when a + outgoing broker connection is established. + + MQTT subscriptions are not durable, so you should use this event route + to manage your subscriptions. +... +event_route[mqtt:connected] { + xlog("mqtt connected !\n"); + mqtt_subscribe("kamailio/script", 0); +} +... + +5.2. mqtt:disconnected + + If defined, the module calls event_route[mqtt:disconnected] when the + broker connection is lost. + + The module will automatically try to reconnect to the broker every 3 + seconds. +... +event_route[mqtt:disconnected] { + xlog("Lost mqtt connection !\n"); +} +... + +5.3. mqtt:message + + If defined, the module calls event_route[mqtt:message] when a message + is received from the broker. + + All incoming messages are handled in a single mqtt dispatcher process. +... +event_route[mqtt:message] { + xlog("mqtt message received: $mqtt(topic) -> $mqtt(msg) !\n"); +} +... + +6. Exported pseudo-variables + + * $mqtt(topic) - Received topic (only in mqtt:message) + * $mqtt(msg) - Received message (only in mqtt:message) + + Exported pseudo-variables are documented at + https://www.kamailio.org/wiki/. diff --git a/src/modules/mqtt/doc/Makefile b/src/modules/mqtt/doc/Makefile new file mode 100644 index 00000000000..4b5795f0869 --- /dev/null +++ b/src/modules/mqtt/doc/Makefile @@ -0,0 +1,4 @@ +docs = mqtt.xml + +docbook_dir = ../../../../doc/docbook +include $(docbook_dir)/Makefile.module diff --git a/src/modules/mqtt/doc/mqtt.xml b/src/modules/mqtt/doc/mqtt.xml new file mode 100644 index 00000000000..cc79ad0aed6 --- /dev/null +++ b/src/modules/mqtt/doc/mqtt.xml @@ -0,0 +1,37 @@ + + + +%docentities; + +]> + + + + MQTT Module + kamailio.org + + + Thomas + Weber + thomas.weber@pascom.net + + + Thomas + weber + thomas.weber@pascom.net + + + + 2019 + Thomas Weber + + + + + + + + diff --git a/src/modules/mqtt/doc/mqtt_admin.xml b/src/modules/mqtt/doc/mqtt_admin.xml new file mode 100644 index 00000000000..e6eb7ab895e --- /dev/null +++ b/src/modules/mqtt/doc/mqtt_admin.xml @@ -0,0 +1,399 @@ + + + +%docentities; + +]> + + + + + &adminguide; + +
+ Overview + + The MQTT module allows bidirectional publish/subscribe communication + by connection Kamailio to a MQTT Broker. + + + Messages can be published from any point in the routing script. + Also subscription can be fully controlled by scripting commands. + +
+ +
+ Dependencies +
+ External Libraries or Applications + + The following libraries or applications must be installed before running + &kamailio; with this module loaded: + + + + libev - http://software.schmorp.de/pkg/libev + + + + + libmosquitto - https://mosquitto.org + + + + +
+
+
+ Parameters +
+ <varname>host</varname> (str) + + MQTT Broker IP/Hostname + + + + No default, this parameter is mandatory. + + + + Set <varname>host</varname> parameter + +... +modparam("mqtt", "host", "1.2.3.4") +... + + +
+
+ <varname>port</varname> (int) + + MQTT Broker port number. + + + + Default value is 1883. + + + + Set <varname>port</varname> parameter + +... +modparam("mqtt", "port", 1883) +... + + +
+
+ <varname>keepalive</varname> (int) + + The number of seconds after which the broker should send a PING message to the kamailio + if no other messages have been exchanged in that time. + + + + Default value is 5. + + + + Set <varname>keepalive</varname> parameter + +... +modparam("mqtt", "keepalive", 5) +... + + +
+
+ <varname>id</varname> (str) + + String to use as the mqtt client id. + If NULL, a random client id will be generated. + + + + Default value is NULL. + + + + Set <varname>id</varname> parameter + +... +modparam("mqtt", "id", "kamailio123") +... + + +
+
+ <varname>username</varname> (str) + + The username to send as a string or NULL to disable authentication. + + + + Default value is NULL (no authentication). + Must be used together with password. + + + + Set <varname>username</varname> parameter + +... +modparam("mqtt", "username", "kamailio-user") +... + + +
+
+ <varname>password</varname> (str) + + The password to send as a string or NULL to disable authentication. + + + + Default value is NULL (no authentication). + Must be used together with username. + + + + Set <varname>password</varname> parameter + +... +modparam("mqtt", "password", "supers3cre7") +... + + +
+
+ <varname>will_topic</varname> (str) + + The topic on which to publish the mqtt will. + + + + Default value is NULL. Must be used together with will. + + + + Set <varname>will_topic</varname> parameter + +... +modparam("mqtt", "will_topic", "kamailio123") +... + + +
+
+ <varname>will</varname> (str) + + The mqtt will payload to be published. + + + + Default value is NULL. Must be used together with will_topic. + + + + Set <varname>will</varname> parameter + +... +modparam("mqtt", "will", "gone") +... + + +
+
+ <varname>event_callback</varname> (str) + + The name of the function in the kemi configuration file (embedded + scripting language such as Lua, Python, ...) to be executed instead + of event_route[...] blocks. + + + The function receives a string parameter with the name of the event, + the values are: 'mqtt:connected', 'mqtt:disconnected', + 'mqtt:message'. + + + + Default value is 'empty' (no function is executed for events). + + + + Set <varname>event_callback</varname> parameter + +... +modparam("mqtt", "event_callback", "ksr_mqtt_event") +... +-- event callback function implemented in Lua +function ksr_mqtt_event(evname) + KSR.info("===== mqtt module triggered event: " .. evname .. "\n"); + return 1; +end + +-- event callback function implemented in Python +function ksr_mqtt_event(self, msg, evname) + KSR.info("===== mqtt module triggered event: %s\n" % evname); + return 1; +end +... + + +
+
+ +
+ Functions +
+ + <function moreinfo="none">mqtt_subscribe(topic, qos)</function> + + + Subscribe to the given topic. Mqtt qos levels 0, 1 and 2 can be used. + + + The function is passing the task to mqtt dispatcher process, therefore + the SIP worker process is not blocked. + + + Incoming messages for this topic are then handled by the same process + and exposed to the event_route[mqtt:message]. + + + This function can be used from ANY_ROUTE. + + + <function>mqtt_subscribe</function> usage + +... +mqtt_subscribe("kamailio/script", 0); +... + + +
+ +
+ + <function moreinfo="none">mqtt_unsubscribe(topic)</function> + + + Unsubscribe to a previously subscribed topic. The mqtt broker will stop + forwarding messages for this topic. + + + This function can be used from ANY_ROUTE. + + + <function>mqtt_unsubscribe</function> usage + +... +mqtt_unsubscribe("kamailio/script"); +... + + +
+ +
+ + <function moreinfo="none">mqtt_publish(topic, message, qos)</function> + + + Send out a message to a topic with a specified mqtt qos level (0, 1, 2). + Again the actual sending is done in a mqtt dispatcher process and will not + block the SIP worker. + + + <function>mqtt_publish</function> usage + +... +mqtt_publish("kamailio/event", "some message", 0); +... + + +
+
+ +
+ Event routes +
+ + <function moreinfo="none">mqtt:connected</function> + + + If defined, the module calls event_route[mqtt:connected] + when a outgoing broker connection is established. + + + MQTT subscriptions are not durable, so you should use this + event route to manage your subscriptions. + + +... +event_route[mqtt:connected] { + xlog("mqtt connected !\n"); + mqtt_subscribe("kamailio/script", 0); +} +... + +
+
+ + <function moreinfo="none">mqtt:disconnected</function> + + + If defined, the module calls event_route[mqtt:disconnected] + when the broker connection is lost. + + + The module will automatically try to reconnect to the broker every 3 seconds. + + +... +event_route[mqtt:disconnected] { + xlog("Lost mqtt connection !\n"); +} +... + +
+
+ + <function moreinfo="none">mqtt:message</function> + + + If defined, the module calls event_route[mqtt:message] + when a message is received from the broker. + + + All incoming messages are handled in a single mqtt dispatcher + process. + + +... +event_route[mqtt:message] { + xlog("mqtt message received: $mqtt(topic) -> $mqtt(msg) !\n"); +} +... + +
+
+ +
+ Exported pseudo-variables + + + $mqtt(topic) - Received topic (only in mqtt:message) + + + $mqtt(msg) - Received message (only in mqtt:message) + + + + Exported pseudo-variables are documented at &kamwikilink;. + +
+ +
+ diff --git a/src/modules/mqtt/mqtt_dispatch.c b/src/modules/mqtt/mqtt_dispatch.c new file mode 100644 index 00000000000..a4cbdf2e9fa --- /dev/null +++ b/src/modules/mqtt/mqtt_dispatch.c @@ -0,0 +1,711 @@ +/** + * Copyright (C) 2019 Thomas Weber, pascom.net + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#include + +#include +#include + +#include "../../core/sr_module.h" +#include "../../core/fmsg.h" +#include "../../core/cfg/cfg_struct.h" +#include "../../core/kemi.h" +#include "../../core/pt.h" + +#include "mqtt_dispatch.h" + +// a socket pair to send data from any sip worker to the dispatcher +static int _mqtt_notify_sockets[2]; + +// time in seconds for periodic mqtt housekeeping +const static float _mqtt_timer_freq = 1.0; + +// in case of connectivity loss: how many ticks to wait until we retry +// actual reconnect time is _mqtt_timer_freq * _reconnect_wait_ticks +const static int _reconnect_wait_ticks = 3; + +// libmosquitto handle +static struct mosquitto *_mosquitto; + +// the dispatchers event loop +static struct ev_loop *loop; +// periodic timer for housekeeping and reconnects +static struct ev_timer timer_notify; +// select() on mosquitto socket to get a callback when data arrives +static struct ev_io socket_notify; + +// the kemi callback name, see mqtt_mod.c +extern str _mqtt_event_callback; + +void mqtt_socket_notify(struct ev_loop *loop, struct ev_io *watcher, int revents); +void mqtt_request_notify(struct ev_loop *loop, struct ev_io *watcher, int revents); +void mqtt_timer_notify(struct ev_loop *loop, ev_timer *timer, int revents); +void mqtt_on_connect(struct mosquitto *, void *, int); +void mqtt_on_disconnect(struct mosquitto *, void *, int); +void mqtt_on_message(struct mosquitto *, void *, const struct mosquitto_message *); +int mqtt_run_cfg_route( int rt, str *rtname, sip_msg_t *fake_message); +int mqtt_publish(str *topic, str *payload, int qos); +int mqtt_subscribe(str *topic, int qos); +int mqtt_unsubscribe(str *topic); + +// pointers for event routes, initialized in mqtt_init_environment() +typedef struct _mqtt_evroutes { + int connected; + str connected_name; + int disconnected; + str disconnected_name; + int msg_received; + str msg_received_name; +} mqtt_evroutes_t; +static mqtt_evroutes_t _mqtt_rts; + +/** + * Prepare event route pointers. + */ +void mqtt_init_environment() +{ + memset(&_mqtt_rts, 0, sizeof(mqtt_evroutes_t)); + + _mqtt_rts.connected_name.s = "mqtt:connected"; + _mqtt_rts.connected_name.len = strlen(_mqtt_rts.connected_name.s); + _mqtt_rts.connected = route_lookup(&event_rt, "mqtt:connected"); + if (_mqtt_rts.connected < 0 || event_rt.rlist[_mqtt_rts.connected] == NULL) + _mqtt_rts.connected = -1; + + _mqtt_rts.disconnected_name.s = "mqtt:disconnected"; + _mqtt_rts.disconnected_name.len = strlen(_mqtt_rts.disconnected_name.s); + _mqtt_rts.disconnected = route_lookup(&event_rt, "mqtt:disconnected"); + if (_mqtt_rts.disconnected < 0 || event_rt.rlist[_mqtt_rts.disconnected] == NULL) + _mqtt_rts.disconnected = -1; + + _mqtt_rts.msg_received_name.s = "mqtt:message"; + _mqtt_rts.msg_received_name.len = strlen(_mqtt_rts.msg_received_name.s); + _mqtt_rts.msg_received = route_lookup(&event_rt, "mqtt:message"); + if (_mqtt_rts.msg_received < 0 || event_rt.rlist[_mqtt_rts.msg_received] == NULL) + _mqtt_rts.msg_received = -1; +} + +/** + * Create the ipc socket pair for worker->dispatcher messaging. + */ +int mqtt_init_notify_sockets(void) +{ + if (socketpair(PF_UNIX, SOCK_STREAM, 0, _mqtt_notify_sockets) < 0) { + LM_ERR("opening notify stream socket pair\n"); + return -1; + } + LM_DBG("inter-process event notification sockets initialized: %d ~ %d\n", + _mqtt_notify_sockets[0], _mqtt_notify_sockets[1]); + return 0; +} + +/** + * Close the sending socket. + * Done for dispatcher process. + */ +void mqtt_close_notify_sockets_child(void) +{ + LM_DBG("closing the notification socket used by children\n"); + close(_mqtt_notify_sockets[1]); + _mqtt_notify_sockets[1] = -1; +} + +/** + * Close the receiving socket + * Done in all non-dispatcher processes. + */ +void mqtt_close_notify_sockets_parent(void) +{ + LM_DBG("closing the notification socket used by parent\n"); + close(_mqtt_notify_sockets[0]); + _mqtt_notify_sockets[0] = -1; +} + +/** + * Main loop of the dispatcher process (blocking) + */ +int mqtt_run_dispatcher(mqtt_dispatcher_cfg_t* cfg) +{ + int res; + struct ev_io request_notify; + + // prepare and init libmosquitto handle + LM_DBG("starting mqtt dispatcher processing\n"); + if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) { + LM_ERR("failed to init libmosquitto\n"); + return -1; + } + + _mosquitto = mosquitto_new(cfg->id, true, 0); + if (_mosquitto == 0) { + LM_ERR("failed to allocate mosquitto struct\n"); + return -1; + } + + if (cfg->will != NULL && cfg->will_topic != NULL) { + LM_DBG("setting will to [%s] -> [%s]\n", cfg->will_topic, cfg->will); + res = mosquitto_will_set(_mosquitto, cfg->will_topic, strlen(cfg->will), cfg->will, 0, false); + if (res != MOSQ_ERR_SUCCESS) { + LM_DBG("unable to set will: code=[%d]\n", res); + return -1; + } + } + + if (cfg->username != NULL && cfg->password != NULL) { + res = mosquitto_username_pw_set(_mosquitto, cfg->username, cfg->password); + if (res != MOSQ_ERR_SUCCESS) { + LM_DBG("unable to set password: code=[%d]\n", res); + return -1; + } + } + + // callback for arriving messages + mosquitto_message_callback_set(_mosquitto, mqtt_on_message); + // callback for outgoing connections + mosquitto_connect_callback_set(_mosquitto, mqtt_on_connect); + // callback for connection loss + mosquitto_disconnect_callback_set(_mosquitto, mqtt_on_disconnect); + + // prepare event loop + loop = ev_default_loop(0); + if(loop==NULL) { + LM_ERR("cannot get libev loop\n"); + return -1; + } + + // listen for data on internal ipc socket + ev_io_init(&request_notify, mqtt_request_notify, _mqtt_notify_sockets[0], EV_READ); + ev_io_start(loop, &request_notify); + + // periodic timer for mqtt keepalive + ev_timer_init(&timer_notify, mqtt_timer_notify, _mqtt_timer_freq, 0.); + ev_timer_start(loop, &timer_notify); + + res = mosquitto_connect(_mosquitto, cfg->host, cfg->port, cfg->keepalive); + if (res == MOSQ_ERR_INVAL) { + LM_ERR("invalid connect parameters\n"); + return -1; + } + if (res == MOSQ_ERR_ERRNO) { + // it's not a problem if the initial connection failed, + // we will retry periodically to reconnect + LM_DBG("mosquitto_connect() failed: %d %s\n",errno, strerror(errno)); + } + + // the actual main loop, it drives libev + while(1) { + ev_loop (loop, 0); + } + + return 0; +} + +/** + * libev notifies us because some data is waiting on the mosquitto socket. + */ +void mqtt_socket_notify(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + + if(EV_ERROR & revents) { + perror("received invalid event\n"); + return; + } + + // delegate mosquitto loop to read data from sockets + mqtt_timer_notify(loop,0, 0); + +} + +/** + * Periodic mqtt housekeeping. + */ +void mqtt_timer_notify(struct ev_loop *loop, ev_timer *timer, int revents) +{ + int res; + int recres; + static int wait_ticks = 0; + + // spend up to 30 secs in the mosquitto loop + // the loop will delegate work to mosquitto_on_..... callbacks. + res = mosquitto_loop(_mosquitto, 30, 1); + switch (res) { + case MOSQ_ERR_SUCCESS: + break; + case MOSQ_ERR_ERRNO: + LM_ERR("mosquitto_loop() failed: %s\n", strerror(errno)); + break; + case MOSQ_ERR_NO_CONN: + case MOSQ_ERR_CONN_LOST: + // is it time to reconnect? + if (wait_ticks > 0) { + // not yet... + wait_ticks --; + break; + } + LM_DBG("Reconnecting\n"); + recres = mosquitto_reconnect(_mosquitto); + if (recres != MOSQ_ERR_SUCCESS) { + LM_ERR("mosquitto_reconnect() failed: %d\n", recres); + // let's wait again N ticks + wait_ticks = _reconnect_wait_ticks; + } + break; + default: + LM_ERR("mosquitto_loop() failed: case %i\n", res); + break; + } + + if (timer != 0) { + // be sure to keep the timer going. + timer->repeat = _mqtt_timer_freq; + ev_timer_again (loop, timer); + } + +} + +/** + * libmosquitto established a connection. + */ +void mqtt_on_connect(struct mosquitto *mosquitto, void *userdata, int rc) +{ + int mosquitto_fd; + if (rc == 0) { + LM_DBG("mqtt connected\n"); + + // listen for incoming data on mqtt connection + mosquitto_fd = mosquitto_socket(_mosquitto); + // init/refresh the libev notifier + ev_io_init(&socket_notify, mqtt_socket_notify, mosquitto_fd, EV_READ); + ev_io_start(loop, &socket_notify); + + // tell the script about the connection + mqtt_run_cfg_route(_mqtt_rts.connected, &_mqtt_rts.connected_name, 0); + } else { + LM_DBG("mqtt connect error [%i]\n", rc); + } +} + +/** + * libmosquitto lost connection + */ +void mqtt_on_disconnect(struct mosquitto *mosquitto, void *userdata, int rc) +{ + LM_DBG("mqtt disconnected [rc %i]\n", rc); + // the mosquitto read socket is invalid now, so detach libev + ev_io_stop(loop, &socket_notify); + // tell the script about the disconnection + mqtt_run_cfg_route(_mqtt_rts.disconnected, &_mqtt_rts.disconnected_name, 0); +} + +/** + * libmosquitto received a messag + */ +void mqtt_on_message(struct mosquitto *mosquitto, void *userdata, const struct mosquitto_message *message) +{ + sip_msg_t *fmsg; + sip_msg_t tmsg; + + str topic, payload; + topic.s = message->topic; + topic.len = strlen(message->topic); + payload.s = (char*) message->payload; + payload.len = message->payloadlen; + LM_DBG("mqtt message [%s] -> [%s]\n", topic.s, payload.s); + + cfg_update(); + + fmsg = faked_msg_next(); + memcpy(&tmsg, fmsg, sizeof(sip_msg_t)); + fmsg = &tmsg; + // use hdr date as pointer for the mqtt-message, not used in faked msg + fmsg->date=(hdr_field_t*)message; + mqtt_run_cfg_route(_mqtt_rts.msg_received, &_mqtt_rts.msg_received_name, fmsg); +} + +/** + * Invoke a event route block + */ +int mqtt_run_cfg_route(int rt, str *rtname, sip_msg_t *fake_msg) +{ + int backup_rt; + struct run_act_ctx ctx; + sip_msg_t *fmsg; + sip_msg_t tmsg; + sr_kemi_eng_t *keng = NULL; + + // check for valid route pointer + if((rt<0) && (_mqtt_event_callback.s==NULL || _mqtt_event_callback.len<=0)) + return 0; + + // create empty fake message, if needed + if (fake_msg == NULL) { + fmsg = faked_msg_next(); + memcpy(&tmsg, fmsg, sizeof(sip_msg_t)); + fmsg = &tmsg; + } else { + fmsg = fake_msg; + } + backup_rt = get_route_type(); + set_route_type(EVENT_ROUTE); + init_run_actions_ctx(&ctx); + LM_DBG("Run route [%.*s] [%s]\n", rtname->len, rtname->s, my_desc()); + if(rt>=0) { + run_top_route(event_rt.rlist[rt], fmsg, 0); + } else { + keng = sr_kemi_eng_get(); + if(keng!=NULL) { + if(sr_kemi_route(keng, fmsg, EVENT_ROUTE, + &_mqtt_event_callback, rtname)<0) { + LM_ERR("error running event route kemi callback\n"); + } + } + } + set_route_type(backup_rt); + return 0; +} + +/** + * prepare $mqtt pv call + */ +int pv_parse_mqtt_name(pv_spec_t *sp, str *in) +{ + if(sp==NULL || in==NULL || in->len<=0) + return -1; + + switch(in->len) + { + case 3: + if(strncmp(in->s, "msg", 3)==0) + sp->pvp.pvn.u.isname.name.n = 1; + else goto error; + break; + case 5: + if(strncmp(in->s, "topic", 5)==0) + sp->pvp.pvn.u.isname.name.n = 0; + else goto error; + break; + default: + goto error; + } + sp->pvp.pvn.type = PV_NAME_INTSTR; + sp->pvp.pvn.u.isname.type = 0; + + return 0; + +error: + LM_ERR("unknown PV msrp name %.*s\n", in->len, in->s); + return -1; +} + +/** + * Populate $mqtt pv + */ +int pv_get_mqtt(sip_msg_t *msg, pv_param_t *param, pv_value_t *res) +{ + struct mosquitto_message* message; + str topic, payload; + + if(param==NULL || res==NULL) + return -1; + + + // check fake message date hdr, it should point to a mosquitto message + message = (struct mosquitto_message*)msg->date; + + if (message==NULL) { + return pv_get_null(msg, param, res); + } else { + topic.s = message->topic; + topic.len = strlen(message->topic); + payload.s = (char*) message->payload; + payload.len = message->payloadlen; + } + + // populate value depeding on the param name + // see pv_parse_mqtt_name() + switch(param->pvn.u.isname.name.n) + { + case 0: + return pv_get_strval(msg, param, res, &topic); + case 1: + return pv_get_strval(msg, param, res, &payload); + default: + return pv_get_null(msg, param, res); + } + + return 0; +} + +/** + * The pv $mqtt is read only, nothing to do here. + */ +int pv_set_mqtt(sip_msg_t *msg, pv_param_t *param, int op, + pv_value_t *val) +{ + return 0; +} + +/** + * + */ +int mqtt_prepare_publish(str *topic, str *payload, int qos) +{ + int len; + mqtt_request_t *request; + + if(topic->s==NULL || topic->len == 0) { + LM_ERR("invalid topic parameter\n"); + return -1; + } + if(payload->s==NULL || payload->len == 0) { + LM_ERR("invalid payload parameter\n"); + return -1; + } + if(qos<0 || qos>2) { + LM_ERR("invalid qos level\n"); + return -1; + } + + LM_DBG("publishing [%.*s] -> [%.*s]\n", + topic->len, topic->s, payload->len, payload->s); + + len = sizeof(mqtt_request_t); + len += topic->len+16; + len += payload->len+16; + + request = (mqtt_request_t*)shm_malloc(len); + if(request==NULL) { + LM_ERR("no more shared memory\n"); + return -1; + } + memset(request, 0, len); + request->type = PUBLISH; + request->qos = qos; + request->topic.s = (char*)request + sizeof(mqtt_request_t); + request->topic.len = snprintf(request->topic.s, topic->len+16, + "%.*s", + topic->len, topic->s); + request->payload.s = request->topic.s + topic->len+16; + request->payload.len = snprintf(request->payload.s, payload->len+16, + "%.*s", + payload->len, payload->s); + + if(_mqtt_notify_sockets[1]!=-1) { + len = write(_mqtt_notify_sockets[1], &request, sizeof(mqtt_request_t*)); + if(len<=0) { + shm_free(request); + LM_ERR("failed to pass the pointer to mqtt dispatcher\n"); + return -1; + } + } else { + cfg_update(); + mqtt_publish(topic, payload, qos); + shm_free(request); + } + + return 0; +} + +/** + * + */ +void mqtt_request_notify(struct ev_loop *loop, struct ev_io *watcher, int revents) +{ + mqtt_request_t *request = NULL; + int rlen; + + if(EV_ERROR & revents) { + perror("received invalid event\n"); + return; + } + + cfg_update(); + + /* read message from client */ + rlen = read(watcher->fd, &request, sizeof(mqtt_request_t*)); + + if(rlen != sizeof(mqtt_request_t*) || request==NULL) { + LM_ERR("cannot read the sip worker message\n"); + return; + } + + LM_DBG("received [%p] [%i] [%.*s]\n", request, + request->type, request->topic.len, request->topic.s); + switch(request->type) { + case PUBLISH: + mqtt_publish(&request->topic, &request->payload, request->qos); + break; + case SUBSCRIBE: + mqtt_subscribe(&request->topic, request->qos); + break; + case UNSUBSCRIBE: + mqtt_unsubscribe(&request->topic); + break; + default: + LM_ERR("unknown request [%d] from sip worker\n", request->type); + } + shm_free(request); +} + +/** + * + */ +int mqtt_publish(str *topic, str *payload,int qos) { + int res; + + LM_DBG("publish [%s] %s -> %s (%d)\n", my_desc(), topic->s, payload->s, payload->len); + res = mosquitto_publish(_mosquitto, NULL, topic->s, payload->len, payload->s, qos, false); + if (res != MOSQ_ERR_SUCCESS) { + LM_WARN("unable to publish [%s] -> [%s], rc=%d\n", topic->s, payload->s, res); + return -1; + } + return 0; +} + +/** + * + */ +int mqtt_prepare_subscribe(str *topic, int qos) +{ + int len; + mqtt_request_t *request; + + if(topic->s==NULL || topic->len == 0) { + LM_ERR("invalid topic parameter\n"); + return -1; + } + + if(qos<0 || qos>2) { + LM_ERR("invalid qos level\n"); + return -1; + } + + LM_DBG("prepare subscribe [%s] [%.*s]\n", my_desc(), topic->len, topic->s); + len = sizeof(mqtt_request_t); + len += topic->len+16; + + request = (mqtt_request_t*)shm_malloc(len); + if(request==NULL) { + LM_ERR("no more shared memory\n"); + return -1; + } + memset(request, 0, len); + request->type = SUBSCRIBE; + request->qos = qos; + request->topic.s = (char*)request + sizeof(mqtt_request_t); + request->topic.len = snprintf(request->topic.s, topic->len+16, + "%.*s", + topic->len, topic->s); + + if(_mqtt_notify_sockets[1]!=-1) { + len = write(_mqtt_notify_sockets[1], &request, sizeof(mqtt_request_t*)); + if(len<=0) { + shm_free(request); + LM_ERR("failed to pass the pointer to mqtt dispatcher\n"); + return -1; + } + } else { + cfg_update(); + mqtt_subscribe(topic, qos); + shm_free(request); + } + + return 0; + +} + +/** + * + */ +int mqtt_subscribe(str *topic, int qos) { + int res; + + LM_DBG("subscribe [%s] %s\n", my_desc(), topic->s); + res = mosquitto_subscribe(_mosquitto, NULL, topic->s, qos); + if (res != MOSQ_ERR_SUCCESS) { + LM_WARN("unable to subscribe [%s], rc=%d\n", topic->s, res); + return -1; + } + return 0; +} + +/** + * + */ +int mqtt_prepare_unsubscribe(str *topic) +{ + int len; + mqtt_request_t *request; + + if(topic->s==NULL || topic->len == 0) { + LM_ERR("invalid topic parameter\n"); + return -1; + } + + + LM_DBG("prepare unsubscribe [%s] [%.*s]\n", my_desc(), topic->len, topic->s); + len = sizeof(mqtt_request_t); + len += topic->len+16; + + request = (mqtt_request_t*)shm_malloc(len); + if(request==NULL) { + LM_ERR("no more shared memory\n"); + return -1; + } + memset(request, 0, len); + request->type = UNSUBSCRIBE; + request->topic.s = (char*)request + sizeof(mqtt_request_t); + request->topic.len = snprintf(request->topic.s, topic->len+16, + "%.*s", + topic->len, topic->s); + + if(_mqtt_notify_sockets[1]!=-1) { + len = write(_mqtt_notify_sockets[1], &request, sizeof(mqtt_request_t*)); + if(len<=0) { + shm_free(request); + LM_ERR("failed to pass the pointer to mqtt dispatcher\n"); + return -1; + } + } else { + cfg_update(); + mqtt_unsubscribe(topic); + shm_free(request); + } + + return 0; + +} + +/** + * + */ +int mqtt_unsubscribe(str *topic) { + int res; + + LM_DBG("unsubscribe %s\n", topic->s); + res = mosquitto_unsubscribe(_mosquitto, NULL, topic->s); + if (res != MOSQ_ERR_SUCCESS) { + LM_WARN("unable to subscribe [%s], rc=%d\n", topic->s, res); + return -1; + } + return 0; +} \ No newline at end of file diff --git a/src/modules/mqtt/mqtt_dispatch.h b/src/modules/mqtt/mqtt_dispatch.h new file mode 100644 index 00000000000..1a4ba4619f9 --- /dev/null +++ b/src/modules/mqtt/mqtt_dispatch.h @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2019 Thomas Weber, pascom.net + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + +#ifndef _MQTT_DISPATCH_ +#define _MQTT_DISPATCH_ + +#include "../../core/pvar.h" + +void mqtt_init_environment(); + +typedef struct mqtt_dispatcher_cfg { + char *host; + int port; + char *id; + char *username; + char *password; + int keepalive; + char *will_topic; + char *will; +} mqtt_dispatcher_cfg_t; + +int mqtt_init_notify_sockets(void); + +void mqtt_close_notify_sockets_child(void); + +void mqtt_close_notify_sockets_parent(void); + +int mqtt_run_dispatcher(mqtt_dispatcher_cfg_t* cfg); + +int pv_parse_mqtt_name(pv_spec_t *sp, str *in); +int pv_get_mqtt(sip_msg_t *msg, pv_param_t *param, pv_value_t *res); +int pv_set_mqtt(sip_msg_t *msg, pv_param_t *param, int op, + pv_value_t *val); + +int mqtt_prepare_publish(str *topic, str *payload, int qos); +int mqtt_prepare_subscribe(str *topic, int qos); +int mqtt_prepare_unsubscribe(str *topic); + +enum mqtt_request_type { + PUBLISH, + SUBSCRIBE, + UNSUBSCRIBE +}; + +typedef struct _mqtt_request { + enum mqtt_request_type type; + str topic; + str payload; + int qos; +} mqtt_request_t; + +#endif \ No newline at end of file diff --git a/src/modules/mqtt/mqtt_mod.c b/src/modules/mqtt/mqtt_mod.c new file mode 100644 index 00000000000..db53c809d7e --- /dev/null +++ b/src/modules/mqtt/mqtt_mod.c @@ -0,0 +1,394 @@ +/** + * Copyright (C) 2019 Thomas Weber, pascom.net + * + * This file is part of Kamailio, a free SIP server. + * + * This file is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version + * + * + * This file is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + */ + + +#include "../../core/sr_module.h" +#include "../../core/fmsg.h" +#include "../../core/mod_fix.h" +#include "../../core/cfg/cfg_struct.h" +#include "../../core/kemi.h" + +#include "mqtt_dispatch.h" + +MODULE_VERSION + +static char *_mqtt_host = NULL; +static int _mqtt_port = 1883; +static char *_mqtt_id = NULL; +static char *_mqtt_username = NULL; +static char *_mqtt_password = NULL; +static char *_mqtt_will = NULL; +static char *_mqtt_willtopic = NULL; +static int _mqtt_keepalive = 5; + +str _mqtt_event_callback = STR_NULL; +int _mqtt_dispatcher_pid = -1; + +static int mod_init(void); +static int child_init(int); + +static int cmd_mqtt_publish(sip_msg_t* msg, char* topic, char* payload, char* qos); +static int fixup_mqtt_publish(void** param, int param_no); +static int ki_mqtt_publish(sip_msg_t* msg, str* topic, str* payload, int qos); + +static int cmd_mqtt_subscribe(sip_msg_t* msg, char* topic, char* qos); +static int fixup_mqtt_subscribe(void** param, int param_no); +static int ki_mqtt_subscribe(sip_msg_t* msg, str* topic, int qos); + +static int cmd_mqtt_unsubscribe(sip_msg_t* msg, char* topic); +static int ki_mqtt_unsubscribe(sip_msg_t* msg, str* topic); + +static cmd_export_t cmds[]={ + {"mqtt_publish", (cmd_function)cmd_mqtt_publish, 3, fixup_mqtt_publish, + 0, ANY_ROUTE}, + {"mqtt_subscribe", (cmd_function)cmd_mqtt_subscribe, 2, fixup_mqtt_subscribe, + 0, ANY_ROUTE}, + {"mqtt_unsubscribe", (cmd_function)cmd_mqtt_unsubscribe, 1, fixup_spve_all, + 0, ANY_ROUTE}, + {0, 0, 0, 0, 0, 0} +}; + +static param_export_t params[]={ + {"host", PARAM_STRING, &_mqtt_host}, + {"port", INT_PARAM, &_mqtt_port}, + {"id", PARAM_STRING, &_mqtt_id}, + {"username", PARAM_STRING, &_mqtt_username}, + {"password", PARAM_STRING, &_mqtt_password}, + {"will_topic", PARAM_STRING, &_mqtt_willtopic}, + {"will", PARAM_STRING, &_mqtt_will}, + {"keepalive", INT_PARAM, &_mqtt_keepalive}, + {"event_callback", PARAM_STR, &_mqtt_event_callback}, + {0,0,0} +}; + +static pv_export_t mod_pvs[] = { + { {"mqtt", (sizeof("mqtt")-1)}, PVT_OTHER, pv_get_mqtt, + pv_set_mqtt, pv_parse_mqtt_name, 0, 0, 0}, + + { {0, 0}, 0, 0, 0, 0, 0, 0, 0 } +}; + +struct module_exports exports = { + "mqtt", /* module name */ + DEFAULT_DLFLAGS, /* dlopen flags */ + cmds, /* exported functions */ + params, /* exported parameters */ + 0, /* RPC method exports */ + mod_pvs, /* exported pseudo-variables */ + 0, /* response handling function */ + mod_init, /* module initialization function */ + child_init, /* per-child init function */ + 0 /* module destroy function */ +}; + +/*! + * \brief Module initialization function + * \return 0 on success, -1 on failure + */ +static int mod_init(void) +{ + if(faked_msg_init()<0) { + LM_ERR("failed to init faked sip message\n"); + return -1; + } + + if (_mqtt_host == NULL) { + LM_ERR("MQTT host parameter not set\n"); + return -1; + } + + /* add space for mqtt dispatcher */ + register_procs(1 ); + + /* add child to update local config framework structures */ + cfg_register_child(1); + + /* prepare some things for all processes */ + mqtt_init_environment(); + + return 0; +} + + +/** + * @brief Initialize async module children + */ +static int child_init(int rank) +{ + int pid; + + if (rank==PROC_INIT) { + if(mqtt_init_notify_sockets()<0) { + LM_ERR("failed to initialize notify sockets\n"); + return -1; + } + return 0; + } + + if (rank!=PROC_MAIN) { + if(_mqtt_dispatcher_pid!=getpid()) { + mqtt_close_notify_sockets_parent(); + } + return 0; + } + + pid=fork_process(PROC_NOCHLDINIT, "mqtt dispatcher", 1); + if (pid<0) + return -1; /* error */ + if(pid==0) { + /* child */ + _mqtt_dispatcher_pid = getpid(); + + /* do child init to allow execution of rpc like functions */ + if(init_child(PROC_RPC) < 0) { + LM_DBG("failed to do RPC child init for dispatcher\n"); + return -1; + } + /* initialize the config framework */ + if (cfg_child_init()) + return -1; + /* main function for dispatcher */ + mqtt_close_notify_sockets_child(); + + /* module parameter hand over to dispatcher */ + mqtt_dispatcher_cfg_t cfg; + cfg.host = _mqtt_host; + cfg.port = _mqtt_port; + cfg.id = _mqtt_id; + cfg.username = _mqtt_username; + cfg.password = _mqtt_password; + cfg.keepalive = _mqtt_keepalive; + cfg.will = _mqtt_will; + cfg.will_topic = _mqtt_willtopic; + + /* this process becomes the dispatcher, block now */ + return mqtt_run_dispatcher(&cfg); + } + + return 0; +} + +/** + * Send out a message to a topic with a specified mqtt qos level (0, 1, 2). + * Used in cfg script. + */ +static int cmd_mqtt_publish(sip_msg_t* msg, char* topic, char* payload, char* qos) +{ + str stopic; + str spayload; + unsigned int iqos=0; + + if(topic==0) { + LM_ERR("invalid topic\n"); + return -1; + } + + if(payload==0) { + LM_ERR("invalid payload\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t*)topic, &stopic)!=0) { + LM_ERR("unable to get topic\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t*)payload, &spayload)!=0) { + LM_ERR("unable to get payload\n"); + return -1; + } + + iqos = (unsigned int)(unsigned long)qos; + + // pass the request to the dispatcher + if(mqtt_prepare_publish(&stopic, &spayload, iqos)<0) { + LM_ERR("failed to prepare publish: [%.*s] -> [%.*s] (qos=%d)\n", stopic.len, stopic.s, spayload.len, spayload.s, iqos); + return -1; + } + + return 1; +} + +/** + * Send out a message to a topic with a specified mqtt qos level (0, 1, 2). + * Used in kemi script. + */ +static int ki_mqtt_publish(sip_msg_t* msg, str* topic, str* payload, int qos) +{ + int ret; + + ret = mqtt_prepare_publish(topic, payload, qos); + if (ret<0) return ret; + + return (ret+1); +} + +/** + * + */ +static int fixup_mqtt_publish(void** param, int param_no) +{ + switch (param_no) { + case 1: + return fixup_spve_spve(param, param_no); + case 2: + return fixup_spve_spve(param, param_no); + case 3: + return fixup_uint_uint(param, param_no); + default: + return -1; + } +} + +/** + * Subscribe to the given topic. + * Mqtt qos levels 0, 1 and 2 can be used. + * Used in cfg script. + */ +static int cmd_mqtt_subscribe(sip_msg_t* msg, char* topic, char* qos) +{ + str stopic; + unsigned int iqos=0; + + if(topic==0) { + LM_ERR("invalid topic\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t*)topic, &stopic)!=0) { + LM_ERR("unable to get topic\n"); + return -1; + } + + iqos = (unsigned int)(unsigned long)qos; + + // pass the request to the dispatcher + if(mqtt_prepare_subscribe(&stopic, iqos)<0) { + LM_ERR("failed to prepare subscribe: [%.*s] (qos=%d)\n", stopic.len, stopic.s, iqos); + return -1; + } + + return 1; +} + +/** + * Subscribe to the given topic. + * Mqtt qos levels 0, 1 and 2 can be used. + * Used in kemi script. + */ +static int ki_mqtt_subscribe(sip_msg_t* msg, str* topic, int qos) +{ + int ret; + + ret = mqtt_prepare_subscribe(topic, qos); + + if(ret<0) return ret; + + return (ret+1); +} + +/** + * + */ +static int fixup_mqtt_subscribe(void** param, int param_no) +{ + switch (param_no) { + case 1: + return fixup_spve_spve(param, param_no); + case 2: + return fixup_uint_uint(param, param_no); + default: + return -1; + } +} + +/** + * Unsubscribe to a previously subscribed topic. + * Used in cfg script. + */ +static int cmd_mqtt_unsubscribe(sip_msg_t* msg, char* topic) +{ + str stopic; + + if(topic==0) { + LM_ERR("invalid topic\n"); + return -1; + } + + if(fixup_get_svalue(msg, (gparam_t*)topic, &stopic)!=0) { + LM_ERR("unable to get topic\n"); + return -1; + } + // pass the request to the dispatcher + if(mqtt_prepare_unsubscribe(&stopic)<0) { + LM_ERR("failed to prepare unsubscribe: [%.*s]\n", stopic.len, stopic.s); + return -1; + } + + return 1; +} + +/** + * Unsubscribe to a previously subscribed topic. + * Used in kemi script. + */ +static int ki_mqtt_unsubscribe(sip_msg_t* msg, str* topic) +{ + int ret; + + ret = mqtt_prepare_unsubscribe(topic); + + if(ret<0) return ret; + + return (ret+1); +} +/** + * Define kemi compatible commands. + */ +/* clang-format off */ +static sr_kemi_t sr_kemi_corex_exports[] = { + { str_init("mqtt"), str_init("publish"), + SR_KEMIP_INT, ki_mqtt_publish, + { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_INT, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("mqtt"), str_init("subscribe"), + SR_KEMIP_INT, ki_mqtt_subscribe, + { SR_KEMIP_STR, SR_KEMIP_INT, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { str_init("mqtt"), str_init("unsubscribe"), + SR_KEMIP_INT, ki_mqtt_unsubscribe, + { SR_KEMIP_STR, SR_KEMIP_NONE, SR_KEMIP_NONE, + SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE } + }, + { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } } +}; + +/** + * Register in kemi framework + */ +int mod_register(char *path, int *dlflags, void *p1, void *p2) +{ + sr_kemi_modules_add(sr_kemi_corex_exports); + return 0; +} \ No newline at end of file From 81852e5ba1fa6983df44f591c40154ea5fcef6b8 Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Fri, 3 Jan 2020 17:05:14 +0100 Subject: [PATCH 2/3] mqtt: add tls support --- src/modules/mqtt/doc/mqtt_admin.xml | 176 +++++++++++++++++++++++++++- src/modules/mqtt/mqtt_dispatch.c | 31 ++++- src/modules/mqtt/mqtt_dispatch.h | 11 +- src/modules/mqtt/mqtt_mod.c | 71 +++++++---- 4 files changed, 260 insertions(+), 29 deletions(-) diff --git a/src/modules/mqtt/doc/mqtt_admin.xml b/src/modules/mqtt/doc/mqtt_admin.xml index e6eb7ab895e..791daa5920d 100644 --- a/src/modules/mqtt/doc/mqtt_admin.xml +++ b/src/modules/mqtt/doc/mqtt_admin.xml @@ -202,7 +202,7 @@ modparam("mqtt", "will_topic", "kamailio123") ... modparam("mqtt", "will", "gone") ... - +
@@ -243,6 +243,180 @@ end
+
+ <varname>ca_file</varname> (str) + + Path to a file containing the PEM encoded trusted CA certificate files. + + + + Default value is NULL. + Set either this parameter or ca_path if you want to connect via TLS. + + + + Set <varname>ca_file</varname> parameter + +... +modparam("mqtt", "ca_file", "/etc/ssl/certs/myca.pem") +... + + +
+
+ <varname>ca_path</varname> (str) + + Used to define a directory that contains PEM encoded CA certificates + that are trusted. For ca_path to work correctly, the certificates files must have ".pem" as the + file ending and you must run "openssl rehash /your/ca_path" each time you add/remove a certificate. + + + + Default value is NULL. + Set either this parameter or ca_file if you want to connect via TLS. + ca_file and ca_path are mutual exclusive. + + + + Set <varname>ca_path</varname> parameter + +... +modparam("mqtt", "ca_path", "/etc/ssl/certs") +... + + +
+
+ <varname>tls_method</varname> (str) + + The version of the SSL/TLS protocol to use as a string. + If NULL, the default value is used. + The default value and the available values depend on the version of openssl that libmosquitto was compiled against. + + + Possible values: + + + + + tlsv1.3 is available with openssl >= 1.1.1 together with libmosquitto v1.6.8 and newer. + + + + + For openssl >= 1.0.1, the available options are tlsv1.2, tlsv1.1 and tlsv1, with tlv1.2 as the default. + + + + + For openssl < 1.0.1, only tlsv1 is available. + + + + + + + Default value is NULL. + + + + Set <varname>tls_method</varname> parameter + +... +modparam("mqtt", "tls_method", "tlsv1.3") +... + + +
+
+ <varname>certificate</varname> (str) + + Path to a file containing the PEM encoded certificate file for a TLS client connection. + + + + Default value is NULL. + If NULL, private_key must also be NULL and no client certificate will be used. + + + + Set <varname>certificate</varname> parameter + +... +modparam("mqtt", "certificate", "/etc/ssl/certs/myclient.pem") +... + + +
+
+ <varname>private_key</varname> (str) + + Path to a file containing the PEM encoded private key for a TLS client connection. + + + + Default value is NULL. + If NULL, certificate must also be NULL and no client certificate will be used. + + + + Set <varname>private_key</varname> parameter + +... +modparam("mqtt", "private_key", "/etc/ssl/private/myclient.key") +... + + +
+
+ <varname>cipher_list</varname> (str) + + A string describing the ciphers available for use. See the + cipher(1) OpenSSL man page. + If NULL, the libssl default ciphers will be used. + + + + Default value is NULL. + + + + Set <varname>cipher_list</varname> parameter + +... +modparam("mqtt", "cipher_list", "HIGH") +... + + +
+
+ <varname>verify_certificate</varname> (str) + + Configure verification of the server certificate. + If value is set to 0, it is impossible to guarantee that the host you are connecting to is not + impersonating your server. + + + This can be useful in initial server testing, but makes it possible for a + malicious third party to impersonate your server through DNS spoofing, for example. + + + Do not disable verification in a real system as it makes the connection encryption pointless. + + + + Default value is 1. + + + + Set <varname>verify_certificate</varname> parameter + +... +modparam("mqtt", "verify_certificate", "0") +... + + +
diff --git a/src/modules/mqtt/mqtt_dispatch.c b/src/modules/mqtt/mqtt_dispatch.c index a4cbdf2e9fa..7a39ede29b7 100644 --- a/src/modules/mqtt/mqtt_dispatch.c +++ b/src/modules/mqtt/mqtt_dispatch.c @@ -145,7 +145,7 @@ void mqtt_close_notify_sockets_parent(void) */ int mqtt_run_dispatcher(mqtt_dispatcher_cfg_t* cfg) { - int res; + int res, cert_req; struct ev_io request_notify; // prepare and init libmosquitto handle @@ -200,6 +200,32 @@ int mqtt_run_dispatcher(mqtt_dispatcher_cfg_t* cfg) ev_timer_init(&timer_notify, mqtt_timer_notify, _mqtt_timer_freq, 0.); ev_timer_start(loop, &timer_notify); + + // prepare tls configuration if at least a ca is configured + if (cfg->ca_file != NULL || cfg->ca_path != NULL) { + LM_DBG("Preparing TLS connection"); + if (cfg->verify_certificate == 0) { + cert_req = 0; + } else if (cfg->verify_certificate == 1) { + cert_req = 1; + } else { + LM_ERR("invalid verify_certificate parameter\n"); + return -1; + } + res = mosquitto_tls_opts_set(_mosquitto, cert_req, cfg->tls_method, cfg->cipher_list); + if (res != MOSQ_ERR_SUCCESS) { + LM_ERR("invalid tls_method or cipher_list parameters\n"); + LM_ERR("mosquitto_tls_opts_set() failed: %d %s\n",errno, strerror(errno)); + return -1; + } + res = mosquitto_tls_set(_mosquitto, cfg->ca_file, cfg->ca_path, cfg->certificate, cfg->private_key, NULL); + if (res != MOSQ_ERR_SUCCESS) { + LM_ERR("invalid ca_file, ca_path, certificate or private_key parameters\n"); + LM_ERR("mosquitto_tls_set() failed: %d %s\n",errno, strerror(errno)); + return -1; + } + } + res = mosquitto_connect(_mosquitto, cfg->host, cfg->port, cfg->keepalive); if (res == MOSQ_ERR_INVAL) { LM_ERR("invalid connect parameters\n"); @@ -269,6 +295,9 @@ void mqtt_timer_notify(struct ev_loop *loop, ev_timer *timer, int revents) wait_ticks = _reconnect_wait_ticks; } break; + case MOSQ_ERR_TLS: + LM_ERR("mosquitto_loop() failed, tls error\n"); + break; default: LM_ERR("mosquitto_loop() failed: case %i\n", res); break; diff --git a/src/modules/mqtt/mqtt_dispatch.h b/src/modules/mqtt/mqtt_dispatch.h index 1a4ba4619f9..a7f109df63d 100644 --- a/src/modules/mqtt/mqtt_dispatch.h +++ b/src/modules/mqtt/mqtt_dispatch.h @@ -29,13 +29,20 @@ void mqtt_init_environment(); typedef struct mqtt_dispatcher_cfg { char *host; - int port; + int port; char *id; char *username; char *password; - int keepalive; + int keepalive; char *will_topic; char *will; + char *ca_file; + char *ca_path; + char *certificate; + char *private_key; + char *tls_method; + int verify_certificate; + char *cipher_list; } mqtt_dispatcher_cfg_t; int mqtt_init_notify_sockets(void); diff --git a/src/modules/mqtt/mqtt_mod.c b/src/modules/mqtt/mqtt_mod.c index db53c809d7e..0dad3a9dbf9 100644 --- a/src/modules/mqtt/mqtt_mod.c +++ b/src/modules/mqtt/mqtt_mod.c @@ -31,14 +31,21 @@ MODULE_VERSION -static char *_mqtt_host = NULL; -static int _mqtt_port = 1883; -static char *_mqtt_id = NULL; -static char *_mqtt_username = NULL; -static char *_mqtt_password = NULL; -static char *_mqtt_will = NULL; -static char *_mqtt_willtopic = NULL; -static int _mqtt_keepalive = 5; +static char *_mqtt_host = NULL; +static int _mqtt_port = 1883; +static char *_mqtt_id = NULL; +static char *_mqtt_username = NULL; +static char *_mqtt_password = NULL; +static char *_mqtt_will = NULL; +static char *_mqtt_willtopic = NULL; +static int _mqtt_keepalive = 5; +static char *_mqtt_ca_file = NULL; +static char *_mqtt_ca_path = NULL; +static char *_mqtt_certificate = NULL; +static char *_mqtt_private_key = NULL; +static char *_mqtt_tls_method = NULL; +static int _mqtt_verify_certificate = 1; +static char *_mqtt_cipher_list = NULL; str _mqtt_event_callback = STR_NULL; int _mqtt_dispatcher_pid = -1; @@ -68,15 +75,22 @@ static cmd_export_t cmds[]={ }; static param_export_t params[]={ - {"host", PARAM_STRING, &_mqtt_host}, - {"port", INT_PARAM, &_mqtt_port}, - {"id", PARAM_STRING, &_mqtt_id}, - {"username", PARAM_STRING, &_mqtt_username}, - {"password", PARAM_STRING, &_mqtt_password}, - {"will_topic", PARAM_STRING, &_mqtt_willtopic}, - {"will", PARAM_STRING, &_mqtt_will}, - {"keepalive", INT_PARAM, &_mqtt_keepalive}, - {"event_callback", PARAM_STR, &_mqtt_event_callback}, + {"host", PARAM_STRING, &_mqtt_host}, + {"port", INT_PARAM, &_mqtt_port}, + {"id", PARAM_STRING, &_mqtt_id}, + {"username", PARAM_STRING, &_mqtt_username}, + {"password", PARAM_STRING, &_mqtt_password}, + {"will_topic", PARAM_STRING, &_mqtt_willtopic}, + {"will", PARAM_STRING, &_mqtt_will}, + {"keepalive", INT_PARAM, &_mqtt_keepalive}, + {"event_callback", PARAM_STR, &_mqtt_event_callback}, + {"tls_method", PARAM_STRING, &_mqtt_tls_method}, + {"ca_file", PARAM_STRING, &_mqtt_ca_file}, + {"ca_path", PARAM_STRING, &_mqtt_ca_path}, + {"certificate", PARAM_STRING, &_mqtt_certificate}, + {"private_key", PARAM_STRING, &_mqtt_private_key}, + {"verify_certificate", INT_PARAM, &_mqtt_verify_certificate}, + {"cipher_list", PARAM_STRING, &_mqtt_cipher_list}, {0,0,0} }; @@ -171,14 +185,21 @@ static int child_init(int rank) /* module parameter hand over to dispatcher */ mqtt_dispatcher_cfg_t cfg; - cfg.host = _mqtt_host; - cfg.port = _mqtt_port; - cfg.id = _mqtt_id; - cfg.username = _mqtt_username; - cfg.password = _mqtt_password; - cfg.keepalive = _mqtt_keepalive; - cfg.will = _mqtt_will; - cfg.will_topic = _mqtt_willtopic; + cfg.host = _mqtt_host; + cfg.port = _mqtt_port; + cfg.id = _mqtt_id; + cfg.username = _mqtt_username; + cfg.password = _mqtt_password; + cfg.keepalive = _mqtt_keepalive; + cfg.will = _mqtt_will; + cfg.will_topic = _mqtt_willtopic; + cfg.ca_file = _mqtt_ca_file; + cfg.ca_path = _mqtt_ca_path; + cfg.certificate = _mqtt_certificate; + cfg.private_key = _mqtt_private_key; + cfg.tls_method = _mqtt_tls_method; + cfg.verify_certificate = _mqtt_verify_certificate; + cfg.cipher_list = _mqtt_cipher_list; /* this process becomes the dispatcher, block now */ return mqtt_run_dispatcher(&cfg); From ea699419df555b4453e1eb96659cfec8912ac3bc Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Fri, 3 Jan 2020 17:31:34 +0100 Subject: [PATCH 3/3] mqtt: expose received message qos level via pv --- src/modules/mqtt/doc/mqtt_admin.xml | 3 +++ src/modules/mqtt/mqtt_dispatch.c | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/modules/mqtt/doc/mqtt_admin.xml b/src/modules/mqtt/doc/mqtt_admin.xml index 791daa5920d..20277ff289c 100644 --- a/src/modules/mqtt/doc/mqtt_admin.xml +++ b/src/modules/mqtt/doc/mqtt_admin.xml @@ -563,6 +563,9 @@ event_route[mqtt:message] { $mqtt(msg) - Received message (only in mqtt:message) + + $mqtt(qos) - The received message QOS level: 0, 1 ,2 (only in mqtt:message) + Exported pseudo-variables are documented at &kamwikilink;. diff --git a/src/modules/mqtt/mqtt_dispatch.c b/src/modules/mqtt/mqtt_dispatch.c index 7a39ede29b7..a32d8c312dc 100644 --- a/src/modules/mqtt/mqtt_dispatch.c +++ b/src/modules/mqtt/mqtt_dispatch.c @@ -354,11 +354,13 @@ void mqtt_on_message(struct mosquitto *mosquitto, void *userdata, const struct m sip_msg_t tmsg; str topic, payload; + int qos; topic.s = message->topic; topic.len = strlen(message->topic); payload.s = (char*) message->payload; payload.len = message->payloadlen; - LM_DBG("mqtt message [%s] -> [%s]\n", topic.s, payload.s); + qos = message->qos; + LM_DBG("mqtt message [%s] -> [%s] (qos %d)\n", topic.s, payload.s, qos); cfg_update(); @@ -425,6 +427,8 @@ int pv_parse_mqtt_name(pv_spec_t *sp, str *in) case 3: if(strncmp(in->s, "msg", 3)==0) sp->pvp.pvn.u.isname.name.n = 1; + else if(strncmp(in->s, "qos", 3)==0) + sp->pvp.pvn.u.isname.name.n = 2; else goto error; break; case 5: @@ -452,6 +456,7 @@ int pv_get_mqtt(sip_msg_t *msg, pv_param_t *param, pv_value_t *res) { struct mosquitto_message* message; str topic, payload; + int qos; if(param==NULL || res==NULL) return -1; @@ -467,6 +472,7 @@ int pv_get_mqtt(sip_msg_t *msg, pv_param_t *param, pv_value_t *res) topic.len = strlen(message->topic); payload.s = (char*) message->payload; payload.len = message->payloadlen; + qos = message->qos; } // populate value depeding on the param name @@ -477,6 +483,8 @@ int pv_get_mqtt(sip_msg_t *msg, pv_param_t *param, pv_value_t *res) return pv_get_strval(msg, param, res, &topic); case 1: return pv_get_strval(msg, param, res, &payload); + case 2: + return pv_get_sintval(msg, param, res, qos); default: return pv_get_null(msg, param, res); }