diff --git a/modules/nsq/Makefile b/modules/nsq/Makefile
new file mode 100644
index 00000000000..dc685f8b67c
--- /dev/null
+++ b/modules/nsq/Makefile
@@ -0,0 +1,23 @@
+# $Id: $
+#
+# NSQ
+#
+#
+# WARNING: do not run this directly, it should be run by the master Makefile
+
+include ../../Makefile.defs
+
+auto_gen=
+NAME=nsq.so
+
+LIBS=-lnsq -lev -levbuffsock -lcurl -ljson-c
+DEFS+=-I$(LOCALBASE)/include -I/usr/local/include $(shell pkg-config --cflags json-c)
+
+DEFS+=-DKAMAILIO_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS=$(SERLIBPATH)/srdb2/srdb2 $(SERLIBPATH)/srdb1/srdb1
+SER_LIBS+=$(SERLIBPATH)/kmi/kmi
+SER_LIBS+=$(SERLIBPATH)/kcore/kcore
+
+include ../../Makefile.modules
diff --git a/modules/nsq/defs.h b/modules/nsq/defs.h
new file mode 100644
index 00000000000..7e75756ea02
--- /dev/null
+++ b/modules/nsq/defs.h
@@ -0,0 +1,106 @@
+/*
+ * defs.h
+ *
+ */
+
+#ifndef _NSQ_DEFS_H_
+#define _NSQ_DEFS_H_
+
+#define BLF_MAX_DIALOGS 8
+#define BLF_JSON_PRES "Presentity"
+#define BLF_JSON_PRES_USER "Presentity-User"
+#define BLF_JSON_PRES_REALM "Presentity-Realm"
+#define BLF_JSON_FROM "From"
+#define BLF_JSON_FROM_USER "From-User"
+#define BLF_JSON_FROM_REALM "From-Realm"
+#define BLF_JSON_FROM_URI "From-URI"
+#define BLF_JSON_TO "To"
+#define BLF_JSON_TO_USER "To-User"
+#define BLF_JSON_TO_REALM "To-Realm"
+#define BLF_JSON_TO_URI "To-URI"
+#define BLF_JSON_CALLID "Call-ID"
+#define BLF_JSON_TOTAG "To-Tag"
+#define BLF_JSON_FROMTAG "From-Tag"
+#define BLF_JSON_STATE "State"
+#define BLF_JSON_USER "User"
+#define BLF_JSON_QUEUE "Queue"
+#define BLF_JSON_EXPIRES "Expires"
+#define BLF_JSON_APP_NAME "App-Name"
+#define BLF_JSON_APP_VERSION "App-Version"
+#define BLF_JSON_NODE "Node"
+#define BLF_JSON_SERVERID "Server-ID"
+#define BLF_JSON_EVENT_CATEGORY "Event-Category"
+#define BLF_JSON_EVENT_NAME "Event-Name"
+#define BLF_JSON_TYPE "Type"
+#define BLF_JSON_MSG_ID "Msg-ID"
+#define BLF_JSON_DIRECTION "Direction"
+
+#define BLF_JSON_CONTACT "Contact"
+#define BLF_JSON_EVENT_PKG "Event-Package"
+#define MWI_JSON_WAITING "Messages-Waiting"
+#define MWI_JSON_NEW "Messages-New"
+#define MWI_JSON_SAVED "Messages-Saved"
+#define MWI_JSON_URGENT "Messages-Urgent"
+#define MWI_JSON_URGENT_SAVED "Messages-Urgent-Saved"
+#define MWI_JSON_ACCOUNT "Message-Account"
+#define MWI_JSON_FROM "From"
+#define MWI_JSON_TO "To"
+
+#define DIALOGINFO_BODY_BUFFER_SIZE 8192
+#define MWI_BODY_BUFFER_SIZE 2048
+#define PRESENCE_BODY_BUFFER_SIZE 4096
+
+#define MWI_BODY "Messages-Waiting: %.*s\r\nMessage-Account: %.*s\r\nVoice-Message: %.*s/%.*s (%.*s/%.*s)\r\n"
+#define PRESENCE_BODY "\
+ \
+\
+\
+%s\
+\
+\
+%s\
+\
+%s\
+%s\
+\
+"
+
+#define DIALOGINFO_EMPTY_BODY "\
+ \
+\
+"
+
+#define LOCAL_TAG "local-tag=\"%.*s\""
+#define REMOTE_TAG "remote-tag=\"%.*s\""
+
+#define DIALOGINFO_BODY "\
+\
+\
+"
+
+#define DIALOGINFO_BODY_2 "\
+\
+\
+"
+
+#endif /* _NSQ_DEFS_H_ */
diff --git a/modules/nsq/doc/Makefile b/modules/nsq/doc/Makefile
new file mode 100644
index 00000000000..6b631173310
--- /dev/null
+++ b/modules/nsq/doc/Makefile
@@ -0,0 +1,4 @@
+docs = nsq.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module
diff --git a/modules/nsq/doc/nsq.xml b/modules/nsq/doc/nsq.xml
new file mode 100644
index 00000000000..12635fe4ae8
--- /dev/null
+++ b/modules/nsq/doc/nsq.xml
@@ -0,0 +1,37 @@
+
+
+
+%docentities;
+
+]>
+
+
+
+ NSQ Module
+ &kamailioname;
+
+
+ Weave Communications
+ comm@getweave.com
+
+
+ Emmanuel
+ Schmidbauer
+ emmanuel@getweave.com
+
+
+
+ 2016
+ Weave Communications
+
+
+
+
+
+
+
+
+
diff --git a/modules/nsq/doc/nsq_admin.xml b/modules/nsq/doc/nsq_admin.xml
new file mode 100644
index 00000000000..980c79a523b
--- /dev/null
+++ b/modules/nsq/doc/nsq_admin.xml
@@ -0,0 +1,492 @@
+
+
+
+%docentities;
+
+]>
+
+
+
+ &adminguide;
+
+
+
+ Overview
+ The NSQ module an NSQ consumer. It exposes only consume capabilities into Kamailio.
+
+
+From a high-level, the purpose of the module might be for things like:
+
+
+
+Provide a real-time integration into your program, instead of your database, so you can overlay additional logic in your preferred language while also utilizing a message bus
+
+
+
+
+Utilize messaging to have a distributed messaging layer, such that machines processing requests/responses/events can go up/down or share the workload and your Kamailio node will still be happy
+
+
+
+
+
+
+
+supported operations are:
+
+
+
+subscribe to a Topic and Channel
+
+
+
+
+
+The NSQ module also has support to publish updates to presence module through the nsq_pua_publish function
+
+
+
+
+ How it works
+
+The module works with a main forked process that does the communication with NSQ for consuming messages. When it consumes a message it defers the process to a worker process so that it doesn't block this main process.
+
+
+ event routes
+
+The worker process issues an event-route where we can act on the received payload. The name of the event-route is composed by values extracted from the payload.
+
+
+ NSQ module will try to execute the event route from most significant to less significant.
+ define the event route like event_route[nsq:consumer-event[-payload_key_value[-payload_subkey_value]]]
+
+
+ we can set the key/subkey pair on a subscription base. check the payload on subscribe.
+
+
+ define the event route
+
+...
+modparam("nsq", "consumer_event_key", "Event-Category")
+modparam("nsq", "consumer_event_sub_key", "Event-Name")
+...
+
+event_route[nsq:consumer-event-presence-update]
+{
+# presence is the value extracted from Event-Category field in json payload
+# update is the value extracted from Event-Name field in json payload
+xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{kznsqjson,From})");
+...
+}
+
+event_route[nsq:consumer-event-presence]
+{
+# presence is the value extracted from Event-Category field in json payload
+xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+...
+}
+
+event_route[nsq:consumer-event-event-category-event-name]
+{
+# event-category is the name of the consumer_event_key parameter
+# event-name is the name of the consumer_event_sub_key parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[nsq:consumer-event-event-category]
+{
+# event-category is the name of the consumer_event_key parameter
+# this event route is executed if we can't find the previous
+...
+}
+
+event_route[nsq:consumer-event]
+{
+# this event route is executed if we can't find the previous
+}
+
+
+
+
+
+ aknowledge messages
+
+Consumed messages have the option of being acknowledge in two ways:
+
+
+
+immediately when received
+
+
+
+
+after processing by the worker
+
+
+
+
+
+
+
+
+
+
+ Dependencies
+
+ &kamailio; Modules
+
+ The following modules must be loaded before this module:
+
+
+
+ none.
+
+
+
+
+
+
+ External Libraries or Applications
+
+
+
+ libev.
+
+
+
+
+ libjson.
+
+
+
+
+ libevbuffsock.
+
+
+
+
+ libnsq.
+
+
+
+
+
+
+
+
+
+ Parameters
+
+ NSQ related
+
+ lookupd_address(str)
+
+ The nsqlookupd address.
+
+
+ Default value is 127.0.0.1
+
+
+ Set lookupd_address parameter
+
+...
+modparam("nsq", "lookupd_address", "nsqlookupd.mydomain.com")
+...
+
+
+
+
+ lookupd_port(int)
+
+ The nsqlookupd TCP port.
+
+
+ Default value is 4161.
+
+
+ Set lookupd_port parameter
+
+...
+modparam("nsq", "lookupd_port", 4161)
+...
+
+
+
+
+
+ nsqd_address(str)
+
+ The nsqd address. You can specify connecting directly to nsqd instead of using nsqlookupd.
+
+
+ Default value is 127.0.0.1
+
+
+ Set nsqd_address parameter
+
+...
+modparam("nsq", "nsqd_address", "nsqd.mydomain.com")
+...
+
+
+
+
+ nsqd_port(int)
+
+ The nsqd TCP port.
+
+
+ Default value is 4150.
+
+
+ Set nsqd_port parameter
+
+...
+modparam("nsq", "nsqd_port", 4150)
+...
+
+
+
+
+
+ consumer_use_nsqd(int)
+
+ Set to 1 if you'd like to connect to nsqd instead of nsqlookupd.
+
+
+ Default value is 0.
+
+
+ Set consumer_use_nsqd parameter
+
+...
+modparam("nsq", "consumer_use_nsqd", 1)
+...
+
+
+
+
+
+ consumer_event_key(str)
+
+ The default name of the field in json payload to compose the event name 1st part
+
+
+ Default value is Event-Category
.
+
+
+ Set consumer_event_key parameter
+
+...
+modparam("nsq", "consumer_event_key", "My-JSON-Field-Name")
+...
+
+
+
+
+
+ consumer_event_sub_key(str)
+
+ The default name of the field in json payload to compose the event name 2nd part
+
+
+ Default value is Event-Name
.
+
+
+ Set consumer_event_sub_key parameter
+
+...
+modparam("nsq", "consumer_event_sub_key", "My-JSON-SubField-Name")
+...
+
+
+
+
+
+ max_in_flight(int)
+
+ The number of messages the consumer can receive before nsqd expects a response.
+
+
+ Default value is 1.
+
+
+ Set max_in_flight parameter
+
+...
+modparam("nsq", "max_in_flight", 2)
+...
+
+
+
+
+
+ consumer_workers(int)
+
+ Number of consumer connections to NSQ per topic_channel.
+
+
+ Default value is 4.
+
+
+ Set consumer_workers parameter
+
+...
+modparam("nsq", "consumer_workers", 2)
+...
+
+
+
+
+
+ topic_channel(str)
+
+ The NSQ Topic and Channel. Delimiter-separated by :
. It be set multiple times to subscribe to multiple topics and channels. The value of consumer_workers is allocated per topic_channel.
+
+
+ Default value is Kamailio-Topic:Kamailio-Channel
.
+
+
+ Set topic_channel parameter
+
+...
+modparam("nsq", "topic_channel", "My-NSQ-Topic:My-NSQ-Channel")
+modparam("nsq", "topic_channel", "My-NSQ-Topic-2:My-NSQ-Channel-2")
+...
+
+
+
+
+
+
+
+
+
+ presence related
+
+ db_url(str)
+
+ The database for the presentity table.
+
+ If set, the nsq_pua_publish function will update the presentity status in the database.
+
+
+ Default value is NULL
.
+
+
+ Set db_url parameter
+
+...
+modparam("nsq", "db_url", "&defaultdb;")
+...
+
+
+
+
+
+ presentity_table(str)
+
+ The name of the presentity table in the database.
+
+
+ Default value is presentity
.
+
+
+ Set presentity_table parameter
+
+...
+modparam("nsq", "presentity_table", "my_presentity_table")
+...
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Functions
+
+
+ presence related
+
+
+ nsq_pua_publish(json_payload)
+
+
+ The function build presentity state from json_payload and updates presentity table.
+
+
+ This function can be used from ANY ROUTE.
+
+
+
+ nsq_pua_publish usage
+
+...
+event_route[nsq:consumer-event-presence-update]
+{
+ xlog("L_INFO", "received $(nsqE{nsq.json,Event-Package}) update for $(nsqE{nsq.json,From})");
+ nsq_pua_publish($nsqE);
+ pres_refresh_watchers("$(nsqE{nsq.json,From})", "$(nsqE{nsq.json,Event-Package})", 1);
+}
+...
+
+
+
+
+
+
+
+
+
+
+ Exported pseudo-variables
+
+
+
+ $nsqE
+ Contains the payload of a consumed message
+
+
+
+
+
+
+ Transformations
+ The prefix for nsq transformations is nsq.
+
+
+ json
+
+
+ nsq.json usage
+
+...
+#nsq_json("$var(amqp_result)", "Channels[0].switch_url", "$du");
+$du = $nsqE{nsq.json,Channels[0].switch_url};
+if($du != $null) {
+ xlog("L_INFO", "$ci|log|user channels found redirecting call to $du");
+ return;
+}
+...
+
+
+
+
+
+
+
+
+
+
+
diff --git a/modules/nsq/http.h b/modules/nsq/http.h
new file mode 100644
index 00000000000..a8bc40c0a49
--- /dev/null
+++ b/modules/nsq/http.h
@@ -0,0 +1,48 @@
+#ifndef __http_h
+#define __http_h
+
+#include
+#include
+
+struct HttpClient {
+ CURLM *multi;
+ struct ev_loop *loop;
+ struct ev_timer timer_event;
+ int still_running;
+};
+
+struct HttpResponse {
+ int status_code;
+ struct Buffer *data;
+};
+
+struct HttpRequest {
+ CURL *easy;
+ char *url;
+ struct HttpClient *httpc;
+ char error[CURL_ERROR_SIZE];
+ struct Buffer *data;
+ void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg);
+ void *cb_arg;
+};
+
+struct HttpSocket {
+ curl_socket_t sockfd;
+ CURL *easy;
+ int action;
+ long timeout;
+ struct ev_io ev;
+ int evset;
+ struct HttpClient *httpc;
+};
+
+struct HttpClient *new_http_client(struct ev_loop *loop);
+void free_http_client(struct HttpClient *httpc);
+struct HttpRequest *new_http_request(const char *url,
+ void (*callback)(struct HttpRequest *req, struct HttpResponse *resp, void *arg), void *cb_arg, char *data);
+void free_http_request(struct HttpRequest *req);
+struct HttpResponse *new_http_response(int status_code, void *data);
+void free_http_response(struct HttpResponse *resp);
+int http_client_get(struct HttpClient *httpc, struct HttpRequest *req);
+
+#endif
diff --git a/modules/nsq/nsq.h b/modules/nsq/nsq.h
new file mode 100644
index 00000000000..8473d36f8e7
--- /dev/null
+++ b/modules/nsq/nsq.h
@@ -0,0 +1,91 @@
+#ifndef __nsq_h
+#define __nsq_h
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "utlist.h"
+
+typedef enum {NSQ_FRAME_TYPE_RESPONSE, NSQ_FRAME_TYPE_ERROR, NSQ_FRAME_TYPE_MESSAGE} frame_type;
+struct NSQDConnection;
+struct NSQMessage;
+
+struct NSQReader {
+ char *topic;
+ char *channel;
+ void *ctx; //context for call back
+ int max_in_flight;
+ struct NSQDConnection *conns;
+ struct NSQLookupdEndpoint *lookupd;
+ struct ev_timer lookupd_poll_timer;
+ struct ev_loop *loop;
+ void *httpc;
+ void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn);
+ void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn);
+ void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx);
+};
+
+struct NSQReader *new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx,
+ void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+ void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+ void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx));
+void free_nsq_reader(struct NSQReader *rdr);
+int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port);
+int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port);
+void nsq_reader_set_loop(struct NSQReader *rdr, struct ev_loop *loop);
+void nsq_run(struct ev_loop *loop);
+
+struct NSQDConnection {
+ struct BufferedSocket *bs;
+ struct Buffer *command_buf;
+ uint32_t current_msg_size;
+ uint32_t current_frame_type;
+ char *current_data;
+ struct ev_loop *loop;
+ void (*connect_callback)(struct NSQDConnection *conn, void *arg);
+ void (*close_callback)(struct NSQDConnection *conn, void *arg);
+ void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg);
+ void *arg;
+ struct NSQDConnection *next;
+};
+
+struct NSQDConnection *new_nsqd_connection(struct ev_loop *loop, const char *address, int port,
+ void (*connect_callback)(struct NSQDConnection *conn, void *arg),
+ void (*close_callback)(struct NSQDConnection *conn, void *arg),
+ void (*msg_callback)(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg),
+ void *arg);
+void free_nsqd_connection(struct NSQDConnection *conn);
+int nsqd_connection_connect(struct NSQDConnection *conn);
+void nsqd_connection_disconnect(struct NSQDConnection *conn);
+
+void nsq_subscribe(struct Buffer *buf, const char *topic, const char *channel);
+void nsq_ready(struct Buffer *buf, int count);
+void nsq_finish(struct Buffer *buf, const char *id);
+void nsq_requeue(struct Buffer *buf, const char *id, int timeout_ms);
+void nsq_nop(struct Buffer *buf);
+
+struct NSQMessage {
+ int64_t timestamp;
+ uint16_t attempts;
+ char id[16+1];
+ size_t body_length;
+ char *body;
+};
+
+struct NSQMessage *nsq_decode_message(const char *data, size_t data_length);
+void free_nsq_message(struct NSQMessage *msg);
+
+struct NSQLookupdEndpoint {
+ char *address;
+ int port;
+ struct NSQLookupdEndpoint *next;
+};
+
+struct NSQLookupdEndpoint *new_nsqlookupd_endpoint(const char *address, int port);
+void free_nsqlookupd_endpoint(struct NSQLookupdEndpoint *nsqlookupd_endpoint);
+
+#endif
diff --git a/modules/nsq/nsq_json.c b/modules/nsq/nsq_json.c
new file mode 100644
index 00000000000..216a868763d
--- /dev/null
+++ b/modules/nsq/nsq_json.c
@@ -0,0 +1,302 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include "nsq_json.h"
+
+# define json_foreach_key(obj,key) \
+ char *key;\
+ struct lh_entry *entry ## key; \
+ struct lh_entry *entry_next ## key = NULL; \
+ for(entry ## key = json_object_get_object(obj)->head; \
+ (entry ## key ? ( \
+ key = (char*)entry ## key->k, \
+ entry_next ## key = entry ## key->next, \
+ entry ## key) : 0); \
+ entry ## key = entry_next ## key)
+
+
+
+static str nsq_pv_str_empty = {"", 0};
+
+char **str_split(char* a_str, const char a_delim)
+{
+ char** result = 0;
+ size_t count = 0;
+ char* tmp = a_str;
+ char* last_comma = 0;
+ char delim[2];
+ delim[0] = a_delim;
+ delim[1] = 0;
+ int len = 0;
+
+ /* Count how many elements will be extracted. */
+ while (*tmp) {
+ if (a_delim == *tmp) {
+ count++;
+ last_comma = tmp;
+ }
+ tmp++;
+ }
+
+ /* Add space for trailing token. */
+ count += last_comma < (a_str + strlen(a_str) - 1);
+
+ /* Add space for terminating null string so caller
+ knows where the list of returned strings ends. */
+ count++;
+
+ result = pkg_malloc(sizeof(char*) * count);
+
+ if (result) {
+ size_t idx = 0;
+ char* token = strtok(a_str, delim);
+
+ while (token) {
+ assert(idx < count);
+ len = strlen(token);
+ char* ptr = pkg_malloc( (len+1) * sizeof(char));
+ *(result + idx) = ptr;
+ memcpy(ptr, token, len);
+ ptr[len] = '\0';
+ int i = 0;
+ while(i < len) {
+ if (ptr[i] == nsq_json_escape_char)
+ ptr[i] = '.';
+ i++;
+ }
+ token = strtok(0, delim);
+ idx++;
+ }
+ assert(idx == count - 1);
+ *(result + idx) = 0;
+ }
+
+ return result;
+}
+
+struct json_object * nsq_json_get_field_object(str* json, str* field)
+{
+ char** tokens;
+ char* dup;
+ char f1[25], f2[25];//, f3[25];
+ int i;
+
+ dup = pkg_malloc(json->len+1);
+ memcpy(dup, json->s, json->len);
+ dup[json->len] = '\0';
+ struct json_object *j = json_tokener_parse(dup);
+ pkg_free(dup);
+
+ if (is_error(j)) {
+ LM_ERR("empty or invalid JSON\n");
+ return NULL;
+ }
+
+ struct json_object *jtree = NULL;
+ struct json_object *ret = NULL;
+
+ LM_DBG("getting json %.*s\n", field->len, field->s);
+
+ dup = pkg_malloc(field->len+1);
+ memcpy(dup, field->s, field->len);
+ dup[field->len] = '\0';
+ tokens = str_split(dup, '.');
+ pkg_free(dup);
+
+ if (tokens) {
+ jtree = j;
+ for (i = 0; *(tokens + i); i++) {
+ if (jtree != NULL) {
+ str field = str_init(*(tokens + i));
+ // check for idx []
+ int sresult = sscanf(field.s, "%[^[][%[^]]]", f1, f2); //, f3);
+ LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, field.s, f1, (sresult > 1? f2 : "(null)"));
+
+ jtree = nsq_json_get_object(jtree, f1);
+ if (jtree != NULL) {
+ char *value = (char*)json_object_get_string(jtree);
+ LM_DBG("JTREE OK %s\n", value);
+ }
+ if (jtree != NULL && sresult > 1 && json_object_is_type(jtree, json_type_array)) {
+ int idx = atoi(f2);
+ jtree = json_object_array_get_idx(jtree, idx);
+ if (jtree != NULL) {
+ char *value = (char*)json_object_get_string(jtree);
+ LM_DBG("JTREE IDX OK %s\n", value);
+ }
+ }
+ }
+ pkg_free(*(tokens + i));
+ }
+ pkg_free(tokens);
+ }
+
+
+ if (jtree != NULL)
+ ret = json_object_get(jtree);
+
+ json_object_put(j);
+
+ return ret;
+}
+
+
+int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val)
+{
+ struct json_object *jtree = nsq_json_get_field_object(json, field);
+
+
+ if (jtree != NULL) {
+ char *value = (char*)json_object_get_string(jtree);
+ int len = strlen(value);
+ dst_val->rs.s = pkg_malloc(len+1);
+ memcpy(dst_val->rs.s, value, len);
+ dst_val->rs.s[len] = '\0';
+ dst_val->rs.len = len;
+ dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+ dst_val->ri = 0;
+ json_object_put(jtree);
+ } else {
+ dst_val->flags = PV_VAL_NULL;
+ dst_val->rs = nsq_pv_str_empty;
+ dst_val->ri = 0;
+ }
+ return 1;
+}
+
+
+int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst)
+{
+ str json_s;
+ str field_s;
+ pv_spec_t *dst_pv;
+ pv_value_t dst_val;
+
+ if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
+ LM_ERR("cannot get json string value\n");
+ return -1;
+ }
+
+ if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
+ LM_ERR("cannot get field string value\n");
+ return -1;
+ }
+
+ if (nsq_json_get_field_ex(&json_s, &field_s, &dst_val) != 1)
+ return -1;
+
+ dst_pv = (pv_spec_t *)dst;
+ dst_pv->setf(msg, &dst_pv->pvp, (int)EQ_T, &dst_val);
+ if (dst_val.flags & PV_VAL_PKG) {
+ pkg_free(dst_val.rs.s);
+ } else if (dst_val.flags & PV_VAL_SHM) {
+ shm_free(dst_val.rs.s);
+ }
+
+ return 1;
+}
+
+struct json_object* nsq_json_parse(const char *str)
+{
+ struct json_tokener* tok;
+ struct json_object* obj;
+
+ tok = json_tokener_new();
+ if (!tok) {
+ LM_ERR("Error parsing json: could not allocate tokener\n");
+ return NULL;
+ }
+
+ obj = json_tokener_parse_ex(tok, str, -1);
+ if (tok->err != json_tokener_success) {
+ LM_ERR("Error parsing json: %s\n", json_tokener_error_desc(tok->err));
+ LM_ERR("%s\n", str);
+ if (obj != NULL) {
+ json_object_put(obj);
+ }
+ obj = NULL;
+ }
+
+ json_tokener_free(tok);
+ return obj;
+}
+
+struct json_object* nsq_json_get_object(struct json_object* jso, const char *key)
+{
+ struct json_object *result = NULL;
+ json_object_object_get_ex(jso, key, &result);
+ return result;
+}
+
+int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst)
+{
+ str json_s;
+ str field_s;
+ int_str keys_avp_name;
+ unsigned short keys_avp_type;
+ pv_spec_t *avp_spec;
+
+ if (fixup_get_svalue(msg, (gparam_p)json, &json_s) != 0) {
+ LM_ERR("cannot get json string value\n");
+ return -1;
+ }
+
+ if (fixup_get_svalue(msg, (gparam_p)field, &field_s) != 0) {
+ LM_ERR("cannot get field string value\n");
+ return -1;
+ }
+
+ if (dst == NULL){
+ LM_ERR("avp spec is null\n");
+ return -1;
+ }
+
+ avp_spec = (pv_spec_t *)dst;
+
+ if (avp_spec->type != PVT_AVP) {
+ LM_ERR("invalid avp spec\n");
+ return -1;
+ }
+
+ if (pv_get_avp_name(0, &avp_spec->pvp, &keys_avp_name, &keys_avp_type)!=0) {
+ LM_ERR("invalid AVP definition\n");
+ return -1;
+ }
+
+ struct json_object *jtree = nsq_json_get_field_object(&json_s, &field_s);
+
+ if (jtree != NULL) {
+ json_foreach_key(jtree, k) {
+ LM_DBG("ITERATING KEY %s\n", k);
+ int_str v1;
+ v1.s.s = k;
+ v1.s.len = strlen(k);
+ if (add_avp(AVP_VAL_STR|keys_avp_type, keys_avp_name, v1) < 0) {
+ LM_ERR("failed to create AVP\n");
+ json_object_put(jtree);
+ return -1;
+ }
+ }
+ json_object_put(jtree);
+ }
+
+ return 1;
+}
+
diff --git a/modules/nsq/nsq_json.h b/modules/nsq/nsq_json.h
new file mode 100644
index 00000000000..db1d56e24d8
--- /dev/null
+++ b/modules/nsq/nsq_json.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#ifndef __NSQ_JSON_H_
+#define __NSQ_JSON_H_
+
+#include "../../mod_fix.h"
+#include "../../lvalue.h"
+#include "../../parser/msg_parser.h"
+#include
+
+#define json_extract_field(json_name, field) do { \
+ struct json_object* obj = nsq_json_get_object(json_obj, json_name); \
+ field.s = (char*)json_object_get_string(obj); \
+ if (field.s == NULL) { \
+ LM_DBG("Json-c error - failed to extract field [%s]\n", json_name); \
+ field.s = ""; \
+ } else { \
+ field.len = strlen(field.s); \
+ } \
+ LM_DBG("%s: [%s]\n", json_name, field.s?field.s:"Empty"); \
+ } while (0);
+
+
+extern char nsq_json_escape_char;
+extern str nsq_event_key;
+extern str nsq_event_sub_key;
+
+int nsq_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst);
+int nsq_json_get_field_ex(str* json, str* field, pv_value_p dst_val);
+int nsq_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst);
+
+struct json_object* nsq_json_parse(const char *str);
+struct json_object* nsq_json_get_object(struct json_object* jso, const char *key);
+
+#endif /* __NSQ_JSON_H_ */
diff --git a/modules/nsq/nsq_mod.c b/modules/nsq/nsq_mod.c
new file mode 100644
index 00000000000..3db563eb26f
--- /dev/null
+++ b/modules/nsq/nsq_mod.c
@@ -0,0 +1,361 @@
+/*
+ * NSQ module interface
+ *
+ * Copyright (C) 2016 Weave Communications
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * History:
+ * --------
+ * 2016-03 first version (Weave Communications)
+ */
+
+#include "nsq_mod.h"
+
+MODULE_VERSION
+
+static tr_export_t mod_trans[] = {
+ { {"nsq", sizeof("nsq")-1}, nsq_tr_parse},
+ { { 0, 0 }, 0 }
+};
+
+static pv_export_t nsq_mod_pvs[] = {
+ {{"nsqE", (sizeof("nsqE")-1)}, PVT_OTHER, nsq_pv_get_event_payload, 0, 0, 0, 0, 0},
+ { {0, 0}, 0, 0, 0, 0, 0, 0, 0 }
+};
+
+static cmd_export_t cmds[] = {
+ {"nsq_pua_publish", (cmd_function) nsq_pua_publish, 1, 0, 0, ANY_ROUTE},
+ {0, 0, 0, 0, 0, 0}
+};
+
+static param_export_t params[]=
+{
+ {"consumer_workers", INT_PARAM, &dbn_consumer_workers},
+ {"max_in_flight", INT_PARAM, &nsq_max_in_flight},
+ {"lookupd_address", PARAM_STR, &nsq_lookupd_address},
+ {"lookupd_port", INT_PARAM, &lookupd_port},
+ {"consumer_use_nsqd", INT_PARAM, &consumer_use_nsqd}, // consume messages from nsqd instead of lookupd
+ {"topic_channel", PARAM_STRING|USE_FUNC_PARAM, (void*)nsq_add_topic_channel},
+ {"nsqd_address", PARAM_STR, &nsqd_address},
+ {"nsqd_port", INT_PARAM, &nsqd_port},
+ {"consumer_event_key", PARAM_STR, &nsq_event_key},
+ {"consumer_event_subkey", PARAM_STR, &nsq_event_sub_key},
+ {"pua_include_entity", INT_PARAM, &dbn_include_entity},
+ {"presentity_table", PARAM_STR, &nsq_presentity_table},
+ {"db_url", PARAM_STR, &nsq_db_url},
+ {"pua_mode", INT_PARAM, &dbn_pua_mode},
+ {"json_escape_char", PARAM_STR, &nsq_json_escape_str},
+ { 0, 0, 0 }
+};
+
+static void free_tc_list(nsq_topic_channel_t *tcl)
+{
+ nsq_topic_channel_t *tc, *prev_tc;
+ tc = tcl;
+ while (tc) {
+ prev_tc = tc;
+ tc = tc->next;
+ free(tc->topic);
+ free(tc->channel);
+ pkg_free(prev_tc);
+ }
+ tcl = NULL;
+}
+
+static int nsq_add_topic_channel(modparam_t type, void *val)
+{
+ nsq_topic_channel_t* tc;
+ size_t size;
+ char *channel = (char*)val;
+ char *topic;
+ char *sep = NULL;
+
+ sep = strchr(channel, ':');
+ if (!sep) {
+ topic = (char*)val;
+ channel = DEFAULT_CHANNEL;
+ LM_ERR("delimiter (\":\") not found inside topic_channel param, using default channel [%s]\n", channel);
+ } else {
+ topic = strsep(&channel, ":");
+ }
+ size = sizeof(nsq_topic_channel_t);
+ tc = (nsq_topic_channel_t*)pkg_malloc(size);
+ if (tc == NULL) {
+ LM_ERR("memory error!\n");
+ free_tc_list(tc_list);
+ return -1;
+ }
+ memset(tc, 0, size);
+ tc->topic = strdup(topic);
+ tc->channel = strdup(channel);
+ ++nsq_topic_channel_counter;
+
+ tc->next = tc_list;
+ tc_list = tc;
+
+ return 0;
+}
+
+struct module_exports exports = {
+ "nsq",
+ DEFAULT_DLFLAGS, /* dlopen flags */
+ cmds, /* Exported functions */
+ params, /* Exported parameters */
+ 0, /* exported statistics */
+ 0, /* exported MI functions */
+ nsq_mod_pvs, /* exported pseudo-variables */
+ 0, /* extra processes */
+ mod_init, /* module initialization function */
+ 0, /* response function*/
+ mod_destroy, /* destroy function */
+ mod_child_init /* per-child init function */
+};
+
+static int fire_init_event(int rank)
+{
+ struct sip_msg *fmsg;
+ struct run_act_ctx ctx;
+ int rtb, rt;
+
+ LM_DBG("rank is (%d)\n", rank);
+ if (rank!=PROC_INIT)
+ return 0;
+
+ rt = route_get(&event_rt, "nsq:mod-init");
+ if (rt>=0 && event_rt.rlist[rt]!=NULL) {
+ LM_DBG("executing event_route[nsq:mod-init] (%d)\n", rt);
+ if (faked_msg_init()<0)
+ return -1;
+ fmsg = faked_msg_next();
+ rtb = get_route_type();
+ set_route_type(REQUEST_ROUTE);
+ init_run_actions_ctx(&ctx);
+ run_top_route(event_rt.rlist[rt], fmsg, &ctx);
+ if (ctx.run_flags&DROP_R_F) {
+ LM_ERR("exit due to 'drop' in event route\n");
+ return -1;
+ }
+ set_route_type(rtb);
+ }
+
+ return 0;
+}
+
+static int mod_init(void)
+{
+ int i;
+ startup_time = (int) time(NULL);
+
+ if (dbn_pua_mode == 1) {
+ nsq_db_url.len = nsq_db_url.s ? strlen(nsq_db_url.s) : 0;
+ LM_DBG("db_url=%s/%d/%p\n", ZSW(nsq_db_url.s), nsq_db_url.len,nsq_db_url.s);
+ nsq_presentity_table.len = strlen(nsq_presentity_table.s);
+
+ if (nsq_db_url.len > 0) {
+
+ /* binding to database module */
+ if (db_bind_mod(&nsq_db_url, &nsq_pa_dbf)) {
+ LM_ERR("Database module not found\n");
+ return -1;
+ }
+
+ if (!DB_CAPABILITY(nsq_pa_dbf, DB_CAP_ALL)) {
+ LM_ERR("Database module does not implement all functions"
+ " needed by NSQ module\n");
+ return -1;
+ }
+
+ nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
+ if (!nsq_pa_db) {
+ LM_ERR("Connection to database failed\n");
+ return -1;
+ }
+
+ nsq_pa_dbf.close(nsq_pa_db);
+ nsq_pa_db = NULL;
+ }
+ }
+
+ LM_DBG("NSQ Workers per Topic/Channel: %d\n", dbn_consumer_workers);
+ if (!nsq_topic_channel_counter) {
+ nsq_topic_channel_counter = 1;
+ }
+ LM_DBG("NSQ Total Topic/Channel: %d\n", nsq_topic_channel_counter);
+ dbn_consumer_workers = dbn_consumer_workers * nsq_topic_channel_counter;
+ LM_DBG("NSQ Total Workers: %d\n", dbn_consumer_workers);
+ int total_workers = dbn_consumer_workers + 2;
+
+ register_procs(total_workers);
+ cfg_register_child(total_workers);
+
+ if (pipe(nsq_cmd_pipe_fds) < 0) {
+ LM_ERR("cmd pipe() failed\n");
+ return -1;
+ }
+
+ nsq_worker_pipes_fds = (int*) shm_malloc(sizeof(int) * (dbn_consumer_workers) * 2 );
+ nsq_worker_pipes = (int*) shm_malloc(sizeof(int) * dbn_consumer_workers);
+ for (i=0; i < dbn_consumer_workers; i++) {
+ nsq_worker_pipes_fds[i*2] = nsq_worker_pipes_fds[i*2+1] = -1;
+ if (pipe(&nsq_worker_pipes_fds[i*2]) < 0) {
+ LM_ERR("worker pipe(%d) failed\n", i);
+ return -1;
+ }
+ }
+
+ nsq_cmd_pipe = nsq_cmd_pipe_fds[1];
+ for (i=0; i < dbn_consumer_workers; i++) {
+ nsq_worker_pipes[i] = nsq_worker_pipes_fds[i*2+1];
+ }
+
+ return 0;
+}
+
+int mod_register(char *path, int *dlflags, void *p1, void *p2)
+{
+ if (nsq_tr_init_buffers() < 0) {
+ LM_ERR("failed to initialize transformations buffers\n");
+ return -1;
+ }
+ return register_trans_mod(path, mod_trans);
+}
+
+
+int set_non_blocking(int fd)
+{
+ int flags;
+
+ flags = fcntl(fd, F_GETFL);
+ if (flags < 0)
+ return flags;
+ flags |= O_NONBLOCK;
+ if (fcntl(fd, F_SETFL, flags) < 0)
+ return -1;
+
+ return 0;
+}
+
+/**
+ *
+ */
+int nsq_consumer_worker_proc(int cmd_pipe, char *topic, char *channel)
+{
+ struct ev_loop *loop;
+ loop = ev_default_loop(0);
+ struct NSQReader *rdr;
+ void *ctx = NULL; //(void *)(new TestNsqMsgContext());
+ static char address[128];
+
+ if (loop == NULL) {
+ LM_ERR("cannot get libev loop\n");
+ }
+ set_non_blocking(cmd_pipe);
+
+ LM_DBG("NSQ Worker connecting to NSQ Topic [%s] and NSQ Channel [%s]\n", topic, channel);
+ // setup the reader
+ rdr = new_nsq_reader(loop, topic, channel, (void *)ctx, NULL, NULL, nsq_message_handler);
+
+ if (consumer_use_nsqd == 0) {
+ snprintf(address, 128, "%.*s", nsq_lookupd_address.len, nsq_lookupd_address.s);
+ nsq_reader_add_nsqlookupd_endpoint(rdr, address, lookupd_port);
+ } else {
+ snprintf(address, 128, "%.*s", nsqd_address.len, nsqd_address.s);
+ nsq_reader_connect_to_nsqd(rdr, address, nsqd_port);
+ }
+
+ nsq_run(loop);
+ return 0;
+}
+
+/**
+ * @brief Initialize async module children
+ */
+static int mod_child_init(int rank)
+{
+ int pid;
+ int i;
+ int workers = dbn_consumer_workers / nsq_topic_channel_counter;
+
+ fire_init_event(rank);
+
+ if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
+ return 0;
+
+ if (rank==PROC_MAIN) {
+ nsq_topic_channel_t *tc;
+
+ tc = tc_list;
+ if (tc == NULL) {
+ LM_ERR("topic and channel not set, using defaults\n");
+ for(i = 0; i < workers; i++) {
+ pid=fork_process(i+1, "NSQ Consumer Worker", 1);
+ if (pid<0)
+ return -1; /* error */
+ if (pid==0){
+ close(nsq_worker_pipes_fds[i*2+1]);
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], DEFAULT_TOPIC, DEFAULT_CHANNEL));
+ }
+ }
+ } else {
+ while (tc) {
+ for(i = 0; i < workers; i++) {
+ pid=fork_process(i+1, "NSQ Consumer Worker", 1);
+ if (pid<0)
+ return -1; /* error */
+ if (pid==0){
+ close(nsq_worker_pipes_fds[i*2+1]);
+ return(nsq_consumer_worker_proc(nsq_worker_pipes_fds[i*2], tc->topic, tc->channel));
+ }
+ }
+ tc = tc->next;
+ }
+ }
+
+ return 0;
+ }
+
+ if (dbn_pua_mode == 1) {
+ if (nsq_pa_dbf.init == 0) {
+ LM_CRIT("child_init: database not bound\n");
+ return -1;
+ }
+ nsq_pa_db = nsq_pa_dbf.init(&nsq_db_url);
+ if (!nsq_pa_db) {
+ LM_ERR("child %d: unsuccessful connecting to database\n", rank);
+ return -1;
+ }
+
+ if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
+ LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank);
+ return -1;
+ }
+ LM_DBG("child %d: Database connection opened successfully\n", rank);
+ }
+
+ return 0;
+}
+
+
+/**
+ * destroy module function
+ */
+static void mod_destroy(void) {
+ free_tc_list(tc_list);
+ shm_free(nsq_worker_pipes_fds);
+ shm_free(nsq_worker_pipes);
+}
diff --git a/modules/nsq/nsq_mod.h b/modules/nsq/nsq_mod.h
new file mode 100644
index 00000000000..03c204718bb
--- /dev/null
+++ b/modules/nsq/nsq_mod.h
@@ -0,0 +1,68 @@
+#ifndef __NSQ_MOD_H_
+#define __NSQ_MOD_H_
+
+#include
+#include
+
+#include "../../cfg/cfg_struct.h"
+#include "../../lib/srdb1/db.h"
+#include "nsq_reader.h"
+#include "nsq_trans.h"
+#include "nsq_pua.h"
+
+#define DBN_DEFAULT_NO_WORKERS 4
+#define LOOKUPD_ADDRESS "127.0.0.1"
+#define CONSUMER_EVENT_KEY "Event-Category"
+#define CONSUMER_EVENT_SUB_KEY "Event-Name"
+#define DEFAULT_CHANNEL "Kamailio-Channel"
+#define DEFAULT_TOPIC "Kamailio-Topic"
+#define NSQD_ADDRESS "127.0.0.1"
+#define PRESENTITY_TABLE "presentity"
+
+typedef struct nsq_topic_channel
+{
+ char *topic;
+ char *channel;
+ struct nsq_topic_channel *next;
+} nsq_topic_channel_t;
+
+
+int nsq_workers = 1;
+int nsq_max_in_flight = 1;
+int consumer_use_nsqd = 0;
+str nsq_lookupd_address = str_init(LOOKUPD_ADDRESS);
+int lookupd_port = 4161;
+str nsq_event_key = str_init(CONSUMER_EVENT_KEY);
+str nsq_event_sub_key = str_init(CONSUMER_EVENT_SUB_KEY);
+str nsqd_address = str_init(NSQD_ADDRESS);
+int nsqd_port = 4150;
+int dbn_pua_mode = 1;
+int dbn_include_entity = 1;
+
+nsq_topic_channel_t *tc_list = NULL;
+str nsq_json_escape_str = str_init("%");
+char nsq_json_escape_char = '%';
+
+int nsq_topic_channel_counter = 0;
+int dbn_consumer_workers = DBN_DEFAULT_NO_WORKERS;
+int startup_time = 0;
+int *nsq_worker_pipes_fds = NULL;
+int *nsq_worker_pipes = NULL;
+int nsq_cmd_pipe = 0;
+int nsq_cmd_pipe_fds[2] = {-1,-1};
+
+/* database connection */
+db1_con_t *nsq_pa_db = NULL;
+db_func_t nsq_pa_dbf;
+str nsq_presentity_table = str_init(PRESENTITY_TABLE);
+str nsq_db_url = {NULL, 0};
+
+static int mod_init(void);
+static int mod_child_init(int);
+static int nsq_add_topic_channel(modparam_t type, void* val);
+static void free_tc_list(nsq_topic_channel_t *tc_list);
+static void mod_destroy(void);
+
+int nsq_pv_get_event_payload(struct sip_msg*, pv_param_t*, pv_value_t*);
+
+#endif
diff --git a/modules/nsq/nsq_pua.c b/modules/nsq/nsq_pua.c
new file mode 100644
index 00000000000..1fa8826bca2
--- /dev/null
+++ b/modules/nsq/nsq_pua.c
@@ -0,0 +1,521 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include "../presence/bind_presence.h"
+#include "defs.h"
+#include "nsq_json.h"
+#include "nsq_pua.h"
+
+extern db1_con_t *nsq_pa_db;
+extern db_func_t nsq_pa_dbf;
+extern str nsq_presentity_table;
+extern str nsq_db_url;
+
+extern int dbn_include_entity;
+extern int dbn_pua_mode;
+
+str str_event_message_summary = str_init("message-summary");
+str str_event_dialog = str_init("dialog");
+str str_event_presence = str_init("presence");
+
+str str_username_col = str_init("username");
+str str_domain_col = str_init("domain");
+str str_body_col = str_init("body");
+str str_expires_col = str_init("expires");
+str str_received_time_col = str_init("received_time");
+str str_presentity_uri_col = str_init("presentity_uri");
+str str_priority_col = str_init("priority");
+
+str str_event_col = str_init("event");
+str str_contact_col = str_init("contact");
+str str_callid_col = str_init("callid");
+str str_from_tag_col = str_init("from_tag");
+str str_to_tag_col = str_init("to_tag");
+str str_etag_col = str_init("etag");
+str str_sender_col = str_init("sender");
+
+str str_presence_note_busy = str_init("Busy");
+str str_presence_note_otp = str_init("On the Phone");
+str str_presence_note_idle = str_init("Idle");
+str str_presence_note_offline = str_init("Offline");
+str str_presence_act_busy = str_init("");
+str str_presence_act_otp = str_init("");
+str str_presence_status_offline = str_init("closed");
+str str_presence_status_online = str_init("open");
+
+str str_null_string = str_init("NULL");
+
+int nsq_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* sender, str* body, int expires, int reset)
+{
+ db_key_t query_cols[13];
+ db_op_t query_ops[13];
+ db_val_t query_vals[13];
+ int n_query_cols = 0;
+ int ret = -1;
+ int use_replace = 1;
+
+ query_cols[n_query_cols] = &str_event_col;
+ query_ops[n_query_cols] = OP_EQ;
+ query_vals[n_query_cols].type = DB1_STR;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *event;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_domain_col;
+ query_ops[n_query_cols] = OP_EQ;
+ query_vals[n_query_cols].type = DB1_STR;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *realm;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_username_col;
+ query_ops[n_query_cols] = OP_EQ;
+ query_vals[n_query_cols].type = DB1_STR;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *user;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_etag_col;
+ query_ops[n_query_cols] = OP_EQ;
+ query_vals[n_query_cols].type = DB1_STR;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *etag;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_sender_col;
+ query_vals[n_query_cols].type = DB1_STR;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *sender;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_body_col;
+ query_vals[n_query_cols].type = DB1_BLOB;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.str_val = *body;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_received_time_col;
+ query_vals[n_query_cols].type = DB1_INT;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.int_val = (int)time(NULL);
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_expires_col;
+ query_vals[n_query_cols].type = DB1_INT;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.int_val = expires;
+ n_query_cols++;
+
+ query_cols[n_query_cols] = &str_priority_col;
+ query_vals[n_query_cols].type = DB1_INT;
+ query_vals[n_query_cols].nul = 0;
+ query_vals[n_query_cols].val.int_val = 0;
+ n_query_cols++;
+
+ if (nsq_pa_dbf.use_table(nsq_pa_db, &nsq_presentity_table) < 0) {
+ LM_ERR("unsuccessful use_table [%.*s]\n", nsq_presentity_table.len, nsq_presentity_table.s);
+ goto error;
+ }
+
+ if (nsq_pa_dbf.replace == NULL || reset > 0) {
+ use_replace = 0;
+ LM_DBG("using delete/insert instead of replace\n");
+ }
+
+ if (nsq_pa_dbf.start_transaction) {
+ if (nsq_pa_dbf.start_transaction(nsq_pa_db, DB_LOCKING_WRITE) < 0) {
+ LM_ERR("in start_transaction\n");
+ goto error;
+ }
+ }
+
+ if (use_replace) {
+ if (nsq_pa_dbf.replace(nsq_pa_db, query_cols, query_vals, n_query_cols, 4, 0) < 0) {
+ LM_ERR("replacing record in database\n");
+ if (nsq_pa_dbf.abort_transaction) {
+ if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
+ LM_ERR("in abort_transaction\n");
+ }
+ }
+ goto error;
+ }
+ } else {
+ if (nsq_pa_dbf.delete(nsq_pa_db, query_cols, query_ops, query_vals, 4-reset) < 0) {
+ LM_ERR("deleting record in database\n");
+ if (nsq_pa_dbf.abort_transaction) {
+ if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0)
+ LM_ERR("in abort_transaction\n");
+ }
+ goto error;
+ }
+ if (nsq_pa_dbf.insert(nsq_pa_db, query_cols, query_vals, n_query_cols) < 0) {
+ LM_ERR("replacing record in database\n");
+ if (nsq_pa_dbf.abort_transaction) {
+ if (nsq_pa_dbf.abort_transaction(nsq_pa_db) < 0) {
+ LM_ERR("in abort_transaction\n");
+ }
+ }
+ goto error;
+ }
+ }
+
+ if (nsq_pa_dbf.end_transaction) {
+ if (nsq_pa_dbf.end_transaction(nsq_pa_db) < 0) {
+ LM_ERR("in end_transaction\n");
+ goto error;
+ }
+ }
+
+error:
+
+ return ret;
+}
+
+int nsq_pua_publish_presence_to_presentity(struct json_object *json_obj) {
+ int ret = 1;
+ str from = { 0, 0 }, to = { 0, 0 };
+ str from_user = { 0, 0 }, to_user = { 0, 0 };
+ str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+ str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+ str state = { 0, 0 };
+ str direction = { 0, 0 };
+ str event = str_init("presence");
+ str presence_body = { 0, 0 };
+ str activity = str_init("");
+ str note = str_init("Available");
+ str status = str_presence_status_online;
+ int expires = 0;
+
+ char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE);
+ if (body == NULL) {
+ LM_ERR("Error allocating buffer for publish\n");
+ ret = -1;
+ goto error;
+ }
+
+ json_extract_field(BLF_JSON_FROM, from);
+ json_extract_field(BLF_JSON_FROM_USER, from_user);
+ json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+ json_extract_field(BLF_JSON_TO, to);
+ json_extract_field(BLF_JSON_TO_USER, to_user);
+ json_extract_field(BLF_JSON_TO_REALM, to_realm);
+ json_extract_field(BLF_JSON_CALLID, callid);
+ json_extract_field(BLF_JSON_FROMTAG, fromtag);
+ json_extract_field(BLF_JSON_TOTAG, totag);
+ json_extract_field(BLF_JSON_DIRECTION, direction);
+ json_extract_field(BLF_JSON_STATE, state);
+
+ struct json_object *ExpiresObj = nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+ if (ExpiresObj != NULL) {
+ expires = json_object_get_int(ExpiresObj);
+ if (expires > 0)
+ expires += (int)time(NULL);
+ }
+
+ if (!from_user.len || !to_user.len || !state.len) {
+ LM_ERR("missing one of From / To / State\n");
+ goto error;
+ }
+
+ if (!strcmp(state.s, "early")) {
+ note = str_presence_note_busy;
+ activity = str_presence_act_busy;
+
+ } else if (!strcmp(state.s, "confirmed")) {
+ note = str_presence_note_otp;
+ activity = str_presence_act_otp;
+
+ } else if (!strcmp(state.s, "offline")) {
+ note = str_presence_note_offline;
+ status = str_presence_status_offline;
+
+ }; // else {
+ // note = str_presence_note_idle;
+ //}
+
+
+ sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s);
+
+ presence_body.s = body;
+ presence_body.len = strlen(body);
+
+ if (dbn_pua_mode == 1) {
+ nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &presence_body, expires, 1);
+ }
+
+ error:
+
+ if (body)
+ pkg_free(body);
+
+ return ret;
+
+}
+
+int nsq_pua_publish_mwi_to_presentity(struct json_object *json_obj) {
+ int ret = 1;
+ str event = str_init("message-summary");
+ str from = { 0, 0 }, to = { 0, 0 };
+ str from_user = { 0, 0 }, to_user = { 0, 0 };
+ str from_realm = { 0, 0 }, to_realm = { 0, 0 };
+ str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+ str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 },
+ mwi_new = { 0, 0 }, mwi_saved = { 0, 0 },
+ mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 },
+ mwi_account = { 0, 0 }, mwi_body = { 0, 0 };
+ int expires = 0;
+
+ char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE);
+ if (body == NULL) {
+ LM_ERR("Error allocating buffer for publish\n");
+ ret = -1;
+ goto error;
+ }
+
+ json_extract_field(BLF_JSON_FROM, from);
+ json_extract_field(BLF_JSON_FROM_USER, from_user);
+ json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+ json_extract_field(BLF_JSON_TO, to);
+ json_extract_field(BLF_JSON_TO_USER, to_user);
+ json_extract_field(BLF_JSON_TO_REALM, to_realm);
+ json_extract_field(BLF_JSON_CALLID, callid);
+ json_extract_field(BLF_JSON_FROMTAG, fromtag);
+ json_extract_field(BLF_JSON_TOTAG, totag);
+
+ json_extract_field(MWI_JSON_TO, mwi_user);
+ json_extract_field(MWI_JSON_WAITING, mwi_waiting);
+ json_extract_field(MWI_JSON_NEW, mwi_new);
+ json_extract_field(MWI_JSON_SAVED, mwi_saved);
+ json_extract_field(MWI_JSON_URGENT, mwi_urgent);
+ json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved);
+ json_extract_field(MWI_JSON_ACCOUNT, mwi_account);
+
+ struct json_object *ExpiresObj = nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+ if (ExpiresObj != NULL) {
+ expires = json_object_get_int(ExpiresObj);
+ if (expires > 0)
+ expires += (int)time(NULL);
+ }
+
+ sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s,
+ mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s,
+ mwi_saved.len, mwi_saved.s, mwi_urgent.len, mwi_urgent.s,
+ mwi_urgent_saved.len, mwi_urgent_saved.s);
+
+ mwi_body.s = body;
+ mwi_body.len = strlen(body);
+
+ if (dbn_pua_mode == 1) {
+ nsq_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &mwi_body, expires, 1);
+ }
+
+ error:
+
+ if (body)
+ pkg_free(body);
+
+
+ return ret;
+}
+
+
+int nsq_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) {
+ int ret = 1;
+ str from = { 0, 0 }, to = { 0, 0 }, pres = {0, 0};
+ str from_user = { 0, 0 }, to_user = { 0, 0 }, pres_user = { 0, 0 };
+ str from_realm = { 0, 0 }, to_realm = { 0, 0 }, pres_realm = { 0, 0 };
+ str from_uri = { 0, 0 }, to_uri = { 0, 0 };
+ str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 };
+ str state = { 0, 0 };
+ str direction = { 0, 0 };
+ char sender_buf[1024];
+ str sender = {0, 0};
+ str dialoginfo_body = {0 , 0};
+ int expires = 0;
+ str event = str_init("dialog");
+ int reset = 0;
+ char to_tag_buffer[100];
+ char from_tag_buffer[100];
+
+ char *body = (char *)pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE);
+ if (body == NULL) {
+ LM_ERR("Error allocating buffer for publish\n");
+ ret = -1;
+ goto error;
+ }
+
+
+ json_extract_field(BLF_JSON_PRES, pres);
+ json_extract_field(BLF_JSON_PRES_USER, pres_user);
+ json_extract_field(BLF_JSON_PRES_REALM, pres_realm);
+ json_extract_field(BLF_JSON_FROM, from);
+ json_extract_field(BLF_JSON_FROM_USER, from_user);
+ json_extract_field(BLF_JSON_FROM_REALM, from_realm);
+ json_extract_field(BLF_JSON_FROM_URI, from_uri);
+ json_extract_field(BLF_JSON_TO, to);
+ json_extract_field(BLF_JSON_TO_USER, to_user);
+ json_extract_field(BLF_JSON_TO_REALM, to_realm);
+ json_extract_field(BLF_JSON_TO_URI, to_uri);
+ json_extract_field(BLF_JSON_CALLID, callid);
+ json_extract_field(BLF_JSON_FROMTAG, fromtag);
+ json_extract_field(BLF_JSON_TOTAG, totag);
+ json_extract_field(BLF_JSON_DIRECTION, direction);
+ json_extract_field(BLF_JSON_STATE, state);
+
+ struct json_object *ExpiresObj = nsq_json_get_object(json_obj, BLF_JSON_EXPIRES);
+ if (ExpiresObj != NULL) {
+ expires = json_object_get_int(ExpiresObj);
+ if (expires > 0)
+ expires += (int)time(NULL);
+ }
+
+ ExpiresObj = nsq_json_get_object(json_obj, "Flush-Level");
+ if (ExpiresObj != NULL) {
+ reset = json_object_get_int(ExpiresObj);
+ }
+
+ if (!from.len || !to.len || !state.len) {
+ LM_ERR("missing one of From / To / State\n");
+ goto error;
+ }
+
+ if (!pres.len || !pres_user.len || !pres_realm.len) {
+ pres = from;
+ pres_user = from_user;
+ pres_realm = from_realm;
+ }
+
+ if (!from_uri.len)
+ from_uri = from;
+
+ if (!to_uri.len)
+ to_uri = to;
+
+ if (fromtag.len > 0) {
+ fromtag.len = sprintf(from_tag_buffer, LOCAL_TAG, fromtag.len, fromtag.s);
+ fromtag.s = from_tag_buffer;
+ }
+
+ if (totag.len > 0) {
+ totag.len = sprintf(to_tag_buffer, REMOTE_TAG, totag.len, totag.s);
+ totag.s = to_tag_buffer;
+ }
+
+ if (callid.len) {
+
+ if (dbn_include_entity) {
+ sprintf(body, DIALOGINFO_BODY,
+ pres.len, pres.s,
+ callid.len, callid.s,
+ callid.len, callid.s,
+ fromtag.len, fromtag.s,
+ totag.len, totag.s,
+ direction.len, direction.s,
+ state.len, state.s,
+ from_user.len, from_user.s,
+ from.len, from.s,
+ from_uri.len, from_uri.s,
+ to_user.len, to_user.s,
+ to.len, to.s,
+ to_uri.len, to_uri.s
+ );
+ } else {
+
+ sprintf(body, DIALOGINFO_BODY_2,
+ pres.len, pres.s,
+ callid.len, callid.s,
+ callid.len, callid.s,
+ fromtag.len, fromtag.s,
+ totag.len, totag.s,
+ direction.len, direction.s,
+ state.len, state.s,
+ from_user.len, from_user.s,
+ from.len, from.s,
+ to_user.len, to_user.s,
+ to.len, to.s
+ );
+ }
+
+ } else {
+ sprintf(body, DIALOGINFO_EMPTY_BODY, pres.len, pres.s);
+ }
+
+
+ sprintf(sender_buf, "sip:%s", callid.s);
+ sender.s = sender_buf;
+ sender.len = strlen(sender_buf);
+
+ dialoginfo_body.s = body;
+ dialoginfo_body.len = strlen(body);
+
+ if (dbn_pua_mode == 1) {
+ nsq_pua_update_presentity(&event, &pres_realm, &pres_user, &callid, &sender, &dialoginfo_body, expires, reset);
+ }
+
+ error:
+
+ if (body)
+ pkg_free(body);
+
+
+ return ret;
+}
+
+
+int nsq_pua_publish(struct sip_msg* msg, char *json) {
+ str event_name = { 0, 0 }, event_package = { 0, 0 };
+ struct json_object *json_obj = NULL;
+ int ret = 1;
+
+ if (dbn_pua_mode != 1) {
+ LM_ERR("pua_mode must be 1 to publish\n");
+ ret = -1;
+ goto error;
+ }
+
+ /* extract info from json and construct xml */
+ json_obj = nsq_json_parse(json);
+ if (json_obj == NULL) {
+ ret = -1;
+ goto error;
+ }
+
+ json_extract_field(BLF_JSON_EVENT_NAME, event_name);
+
+ if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) {
+ json_extract_field(BLF_JSON_EVENT_PKG, event_package);
+ if (event_package.len == str_event_dialog.len
+ && strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) {
+ ret = nsq_pua_publish_dialoginfo_to_presentity(json_obj);
+ } else if (event_package.len == str_event_message_summary.len
+ && strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) {
+ ret = nsq_pua_publish_mwi_to_presentity(json_obj);
+ } else if (event_package.len == str_event_presence.len
+ && strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) {
+ ret = nsq_pua_publish_presence_to_presentity(json_obj);
+ }
+ }
+
+error:
+ if (json_obj)
+ json_object_put(json_obj);
+
+ return ret;
+}
diff --git a/modules/nsq/nsq_pua.h b/modules/nsq/nsq_pua.h
new file mode 100644
index 00000000000..067b0295cbd
--- /dev/null
+++ b/modules/nsq/nsq_pua.h
@@ -0,0 +1,8 @@
+#ifndef __NSQ_PUA_H_
+#define __NSQ_PUA_H_
+
+
+int nsq_pua_publish(struct sip_msg* msg, char *json);
+
+
+#endif
diff --git a/modules/nsq/nsq_reader.c b/modules/nsq/nsq_reader.c
new file mode 100644
index 00000000000..d35b7007966
--- /dev/null
+++ b/modules/nsq/nsq_reader.c
@@ -0,0 +1,147 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#include "nsq_reader.h"
+
+char *eventData = NULL;
+
+typedef struct json_object *json_obj_ptr;
+
+int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
+{
+ return eventData == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, eventData);
+}
+
+int nsq_consumer_fire_event(char *routename)
+{
+ struct sip_msg *fmsg;
+ struct run_act_ctx ctx;
+ int rtb, rt;
+
+ LM_DBG("searching event_route[%s]\n", routename);
+ rt = route_get(&event_rt, routename);
+ if (rt < 0 || event_rt.rlist[rt] == NULL) {
+ LM_DBG("route %s does not exist\n", routename);
+ return -2;
+ }
+ LM_DBG("executing event_route[%s] (%d)\n", routename, rt);
+ if (faked_msg_init()<0) {
+ return -2;
+ }
+ fmsg = faked_msg_next();
+ rtb = get_route_type();
+ set_route_type(REQUEST_ROUTE);
+ init_run_actions_ctx(&ctx);
+ run_top_route(event_rt.rlist[rt], fmsg, 0);
+ set_route_type(rtb);
+
+ return 0;
+}
+
+int nsq_consumer_event(char *payload, char *channel, char *topic)
+{
+ json_obj_ptr json_obj = NULL;
+ int ret = 0;
+ str ev_name = {0, 0}, ev_category = {0, 0};
+ char *k = NULL;
+ char buffer[512];
+ char *p;
+
+ eventData = payload;
+
+ json_obj = nsq_json_parse(payload);
+ if (json_obj == NULL) {
+ return 0;
+ }
+
+ k = pkg_malloc(nsq_event_key.len+1);
+ memcpy(k, nsq_event_key.s, nsq_event_key.len);
+ k[nsq_event_key.len] = '\0';
+ json_extract_field(k, ev_category);
+ pkg_free(k);
+
+ k = pkg_malloc(nsq_event_sub_key.len+1);
+ memcpy(k, nsq_event_sub_key.s, nsq_event_sub_key.len);
+ k[nsq_event_sub_key.len] = '\0';
+ json_extract_field(k, ev_name);
+ pkg_free(k);
+
+ sprintf(buffer, "nsq:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s);
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+ if (nsq_consumer_fire_event(buffer) != 0) {
+ sprintf(buffer, "nsq:consumer-event-%.*s", ev_category.len, ev_category.s);
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+ if (nsq_consumer_fire_event(buffer) != 0) {
+ sprintf(buffer, "nsq:consumer-event-%.*s-%.*s", nsq_event_key.len, nsq_event_key.s, nsq_event_sub_key.len, nsq_event_sub_key.s);
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+ if (nsq_consumer_fire_event(buffer) != 0) {
+ sprintf(buffer, "nsq:consumer-event-%.*s", nsq_event_key.len, nsq_event_key.s);
+ for (p=buffer ; *p; ++p) *p = tolower(*p);
+ for (p=buffer ; *p; ++p) if(*p == '_') *p = '-';
+ if (nsq_consumer_fire_event(buffer) != 0) {
+ sprintf(buffer, "nsq:consumer-event");
+ if (nsq_consumer_fire_event(buffer) != 0) {
+ LM_ERR("nsq:consumer-event not found");
+ }
+ }
+ }
+ }
+ }
+
+ if(json_obj)
+ json_object_put(json_obj);
+
+ eventData = NULL;
+
+ return ret;
+}
+
+void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx)
+{
+ int ret = 0;
+
+ char *payload = (char*)shm_malloc(msg->body_length + 1);
+ if (!payload) {
+ LM_ERR("error allocating shared memory for payload");
+ }
+ strncpy(payload, msg->body, msg->body_length);
+ payload[msg->body_length] = 0;
+
+ ret = nsq_consumer_event(payload, rdr->channel, rdr->topic);
+
+ buffer_reset(conn->command_buf);
+
+ if (ret < 0) {
+ nsq_requeue(conn->command_buf, msg->id, 100);
+ } else {
+ nsq_finish(conn->command_buf, msg->id);
+ }
+ buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+ buffer_reset(conn->command_buf);
+ nsq_ready(conn->command_buf, rdr->max_in_flight);
+ buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+ free_nsq_message(msg);
+ shm_free(payload);
+}
diff --git a/modules/nsq/nsq_reader.h b/modules/nsq/nsq_reader.h
new file mode 100644
index 00000000000..ec784320eb6
--- /dev/null
+++ b/modules/nsq/nsq_reader.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+#ifndef __NSQ_READER_H_
+#define __NSQ_READER_H_
+
+#include
+
+#include "../../sr_module.h"
+#include "../../fmsg.h"
+#include "nsq.h"
+#include "nsq_json.h"
+
+
+int nsq_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);
+int nsq_consumer_fire_event(char *routename);
+int nsq_consumer_event(char *payload, char *channel, char *topic);
+
+void nsq_message_handler(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx);
+
+#endif /* __NSQ_READER_H_ */
diff --git a/modules/nsq/nsq_trans.c b/modules/nsq/nsq_trans.c
new file mode 100644
index 00000000000..cc3d84a13a9
--- /dev/null
+++ b/modules/nsq/nsq_trans.c
@@ -0,0 +1,450 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+/*! \file
+ * \brief Support for transformations
+ */
+
+#include "../../trim.h"
+#include "../../mod_fix.h"
+
+#include "nsq_trans.h"
+#include "nsq_json.h"
+
+
+/*! transformation buffer size */
+#define NSQ_TR_BUFFER_SIZE 65536
+#define NSQ_TR_BUFFER_SLOTS 4
+
+/*! transformation buffer */
+static char **_nsq_tr_buffer_list = NULL;
+
+static char *_nsq_tr_buffer = NULL;
+
+static int _nsq_tr_buffer_idx = 0;
+
+#define NSQ_TR_ALLOC_PARSE_SIZE 2048
+
+static pv_spec_t** _nsq_parse_specs = NULL;
+static tr_param_t** _nsq_parse_params = NULL;
+static int _nsq_tr_parse_spec = 0;
+static int _nsq_tr_parse_params = 0;
+
+
+/*!
+ *
+ */
+int nsq_tr_init_buffers(void)
+{
+ int i;
+
+ _nsq_tr_buffer_list = (char**)malloc(NSQ_TR_BUFFER_SLOTS * sizeof(char*));
+
+ if (_nsq_tr_buffer_list==NULL)
+ return -1;
+ for (i=0; irs.len> NSQ_TR_BUFFER_SIZE-1) { \
+ LM_ERR("result is too big\n"); \
+ return -1; \
+ } \
+ strncpy(_nsq_tr_buffer, val->rs.s, val->rs.len); \
+ val->rs.s = _nsq_tr_buffer; \
+ } while(0);
+
+void nsq_destroy_pv_value(pv_value_t *val)
+{
+ if (val->flags & PV_VAL_PKG)
+ pkg_free(val->rs.s);
+ else if (val->flags & PV_VAL_SHM)
+ shm_free(val->rs.s);
+ pkg_free(val);
+}
+
+void nsq_free_pv_value(pv_value_t *val )
+{
+ if (val->flags & PV_VAL_PKG)
+ pkg_free(val->rs.s);
+ else if (val->flags & PV_VAL_SHM)
+ shm_free(val->rs.s);
+}
+
+pv_value_t* nsq_alloc_pv_value() {
+ pv_value_t *v = (pv_value_t*) pkg_malloc(sizeof(pv_value_t));
+ if (v != NULL)
+ memset(v, 0, sizeof(pv_value_t));
+ return v;
+}
+
+#define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \
+ (C >= 'A' && C <= 'Z') || \
+ (C >= '0' && C <= '9') || \
+ (C == '-' || C == '~' || C == '_'))
+
+#define HI4(C) (C>>4)
+#define LO4(C) (C & 0x0F)
+
+#define hexint(C) (C < 10?('0' + C):('A'+ C - 10))
+
+char *nsq_util_encode(const str * key, char *dest) {
+ if ((key->len == 1) && (key->s[0] == '#' || key->s[0] == '*')) {
+ *dest++ = key->s[0];
+ return dest;
+ }
+ char *p, *end;
+ for (p = key->s, end = key->s + key->len; p < end; p++) {
+ if (KEY_SAFE(*p)) {
+ *dest++ = *p;
+ } else if (*p == '.') {
+ memcpy(dest, "\%2E", 3);
+ dest += 3;
+ } else if (*p == ' ') {
+ *dest++ = '+';
+ } else {
+ *dest++ = '%';
+ sprintf(dest, "%c%c", hexint(HI4(*p)), hexint(LO4(*p)));
+ dest += 2;
+ }
+ }
+ *dest = '\0';
+ return dest;
+}
+
+int nsq_encode_ex(str *unencoded, pv_value_p dst_val)
+{
+ char routing_key_buff[256];
+ memset(routing_key_buff,0, sizeof(routing_key_buff));
+ nsq_util_encode(unencoded, routing_key_buff);
+
+ int len = strlen(routing_key_buff);
+ dst_val->rs.s = pkg_malloc(len + 1);
+ memcpy(dst_val->rs.s, routing_key_buff, len);
+ dst_val->rs.s[len] = '\0';
+ dst_val->rs.len = len;
+ dst_val->flags = PV_VAL_STR | PV_VAL_PKG;
+
+ return 1;
+
+}
+
+/*!
+ * \brief Evaluate NSQ transformations
+ * \param msg SIP message
+ * \param tp transformation
+ * \param subtype transformation type
+ * \param val pseudo-variable
+ * \return 0 on success, -1 on error
+ */
+int nsq_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val)
+{
+
+ str sv;
+ pv_value_t *pv;
+ pv_value_t v;
+ str v2 = {0,0};
+ void* v1 = NULL;
+
+ if (val==NULL || (val->flags&PV_VAL_NULL))
+ return -1;
+
+
+ nsq_tr_set_crt_buffer();
+
+ switch (subtype) {
+ case TR_NSQ_ENCODE:
+ if (!(val->flags&PV_VAL_STR))
+ return -1;
+
+ pv = nsq_alloc_pv_value();
+ if (pv == NULL) {
+ LM_ERR("NSQ encode transform : no more private memory\n");
+ return -1;
+ }
+
+ if (nsq_encode_ex(&val->rs, pv ) != 1) {
+ LM_ERR("error encoding value\n");
+ nsq_destroy_pv_value(pv);
+ return -1;
+ }
+
+ strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
+ _nsq_tr_buffer[pv->rs.len] = '\0';
+
+ val->flags = PV_VAL_STR;
+ val->ri = 0;
+ val->rs.s = _nsq_tr_buffer;
+ val->rs.len = pv->rs.len;
+
+ nsq_destroy_pv_value(pv);
+ nsq_free_pv_value(val);
+
+ break;
+ case TR_NSQ_JSON:
+ if (!(val->flags&PV_VAL_STR))
+ return -1;
+
+ if (tp == NULL) {
+ LM_ERR("NSQ json transform invalid parameter\n");
+ return -1;
+ }
+
+ pv = nsq_alloc_pv_value();
+ if (pv == NULL) {
+ LM_ERR("NSQ encode transform : no more private memory\n");
+ return -1;
+ }
+
+
+ if (tp->type == TR_PARAM_STRING) {
+ v1 = tp->v.s.s;
+ if (fixup_spve_null(&v1, 1) != 0) {
+ LM_ERR("cannot get spve_value from TR_PARAM_STRING : %.*s\n", tp->v.s.len, tp->v.s.s);
+ return -1;
+ }
+ if (fixup_get_svalue(msg, (gparam_p)v1, &v2) != 0) {
+ LM_ERR("cannot get value from TR_PARAM_STRING\n");
+ fixup_free_spve_null(&v1, 1);
+ return -1;
+ }
+ fixup_free_spve_null(&v1, 1);
+ sv = v2;
+ } else {
+ if (pv_get_spec_value(msg, (pv_spec_p)tp->v.data, &v)!=0
+ || (!(v.flags&PV_VAL_STR)) || v.rs.len<=0) {
+ LM_ERR("value cannot get spec value in json transform\n");
+ nsq_destroy_pv_value(pv);
+ return -1;
+ }
+ sv = v.rs;
+ }
+
+
+ if (nsq_json_get_field_ex(&val->rs, &sv, pv ) != 1) {
+ LM_ERR("error getting json\n");
+ nsq_destroy_pv_value(pv);
+ return -1;
+ }
+
+ strncpy(_nsq_tr_buffer, pv->rs.s, pv->rs.len);
+ _nsq_tr_buffer[pv->rs.len] = '\0';
+
+ val->flags = PV_VAL_STR;
+ val->ri = 0;
+ val->rs.s = _nsq_tr_buffer;
+ val->rs.len = pv->rs.len;
+
+ nsq_destroy_pv_value(pv);
+ nsq_free_pv_value(val);
+
+ break;
+
+ default:
+ LM_ERR("unknown NSQ transformation subtype %d\n", subtype);
+ return -1;
+ }
+ return 0;
+}
+
+#define _nsq_tr_parse_sparam(_p, _p0, _tp, _spec, _ps, _in, _s) \
+ while(is_in_str(_p, _in) && (*_p==' ' || *_p=='\t' || *_p=='\n')) _p++; \
+ if(*_p==PV_MARKER) \
+ { /* pseudo-variable */ \
+ _spec = (pv_spec_t*)malloc(sizeof(pv_spec_t)); \
+ if(_spec==NULL) \
+ { \
+ LM_ERR("no more private memory!\n"); \
+ goto error; \
+ } \
+ _s.s = _p; _s.len = _in->s + _in->len - _p; \
+ _p0 = pv_parse_spec(&_s, _spec); \
+ if(_p0==NULL) \
+ { \
+ LM_ERR("invalid spec in substr transformation: %.*s!\n", \
+ _in->len, _in->s); \
+ goto error; \
+ } \
+ _p = _p0; \
+ _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
+ if(_tp==NULL) \
+ { \
+ LM_ERR("no more private memory!\n"); \
+ goto error; \
+ } \
+ memset(_tp, 0, sizeof(tr_param_t)); \
+ _tp->type = TR_PARAM_SPEC; \
+ _tp->v.data = (void*)_spec; \
+ _nsq_parse_specs[_nsq_tr_parse_spec++] = _spec; \
+ _nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
+ } else { /* string */ \
+ _ps = _p; \
+ while(is_in_str(_p, _in) && *_p!='\t' && *_p!='\n' \
+ && *_p!=TR_PARAM_MARKER && *_p!=TR_RBRACKET) \
+ _p++; \
+ if(*_p=='\0') \
+ { \
+ LM_ERR("invalid param in transformation: %.*s!!\n", \
+ _in->len, _in->s); \
+ goto error; \
+ } \
+ _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \
+ if(_tp==NULL) \
+ { \
+ LM_ERR("no more private memory!\n"); \
+ goto error; \
+ } \
+ memset(_tp, 0, sizeof(tr_param_t)); \
+ _tp->type = TR_PARAM_STRING; \
+ _tp->v.s.len = _p - _ps; \
+ _tp->v.s.s = (char*)malloc((tp->v.s.len+1)*sizeof(char)); \
+ strncpy(_tp->v.s.s, _ps, tp->v.s.len); \
+ _tp->v.s.s[tp->v.s.len] = '\0'; \
+ _nsq_parse_params[_nsq_tr_parse_params++] = _tp; \
+ }
+
+
+/*!
+ * \brief Helper fuction to parse a NSQ transformation
+ * \param in parsed string
+ * \param t transformation
+ * \return pointer to the end of the transformation in the string - '}', null on error
+ */
+char* nsq_tr_parse(str* in, trans_t *t)
+{
+ char *p;
+ char *p0;
+ char *ps;
+ str name;
+ str s;
+ pv_spec_t *spec = NULL;
+ tr_param_t *tp = NULL;
+
+ if(in==NULL || t==NULL)
+ return NULL;
+
+ p = in->s;
+ name.s = in->s;
+ t->type = TR_NSQ;
+ t->trf = nsq_tr_eval;
+
+ /* find next token */
+ while(is_in_str(p, in) && *p!=TR_PARAM_MARKER && *p!=TR_RBRACKET) p++;
+ if (*p=='\0') {
+ LM_ERR("invalid transformation: %.*s\n",
+ in->len, in->s);
+ goto error;
+ }
+ name.len = p - name.s;
+ trim(&name);
+
+ if (name.len==6 && strncasecmp(name.s, "encode", 6)==0) {
+ t->subtype = TR_NSQ_ENCODE;
+ goto done;
+ } else if (name.len==4 && strncasecmp(name.s, "json", 4)==0) {
+ t->subtype = TR_NSQ_JSON;
+ if (*p!=TR_PARAM_MARKER) {
+ LM_ERR("invalid json transformation: %.*s!\n", in->len, in->s);
+ goto error;
+ }
+ p++;
+ _nsq_tr_parse_sparam(p, p0, tp, spec, ps, in, s);
+ t->params = tp;
+ tp = 0;
+ while(*p && (*p==' ' || *p=='\t' || *p=='\n')) p++;
+ if (*p!=TR_RBRACKET) {
+ LM_ERR("invalid json transformation: %.*s!!\n",
+ in->len, in->s);
+ goto error;
+ }
+ goto done;
+ }
+
+ LM_ERR("unknown NSQ transformation: %.*s/%.*s/%d!\n", in->len, in->s,
+ name.len, name.s, name.len);
+error:
+ if(tp)
+ free(tp);
+ if(spec)
+ free(spec);
+ return NULL;
+done:
+ t->name = name;
+ return p;
+}
diff --git a/modules/nsq/nsq_trans.h b/modules/nsq/nsq_trans.h
new file mode 100644
index 00000000000..ff40158e536
--- /dev/null
+++ b/modules/nsq/nsq_trans.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * This file is part of Kamailio, a free SIP server.
+ *
+ * Kamailio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * Kamailio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ */
+
+/*! \file
+ * \brief Transformations support
+ */
+
+#ifndef _NSQ_TRANS_H_
+#define _NSQ_TRANS_H_
+
+#include "../../pvar.h"
+
+
+enum _nsq_tr_type { TR_NONE=0, TR_NSQ };
+enum _nsq_tr_subtype { TR_NSQ_NONE=0, TR_NSQ_ENCODE, TR_NSQ_JSON };
+
+char *nsq_tr_parse(str *in, trans_t *tr);
+
+int nsq_tr_init_buffers(void);
+void nsq_tr_clear_buffers(void);
+
+
+#endif
diff --git a/modules/nsq/reader.c b/modules/nsq/reader.c
new file mode 100644
index 00000000000..43ac23ed988
--- /dev/null
+++ b/modules/nsq/reader.c
@@ -0,0 +1,172 @@
+#include "nsq.h"
+#include "utlist.h"
+#include "http.h"
+#include "../../dprint.h"
+
+extern int nsq_max_in_flight;
+
+static void nsq_reader_connect_cb(struct NSQDConnection *conn, void *arg)
+{
+ struct NSQReader *rdr = (struct NSQReader *)arg;
+
+ if (rdr->connect_callback) {
+ rdr->connect_callback(rdr, conn);
+ }
+
+ // subscribe
+ buffer_reset(conn->command_buf);
+ nsq_subscribe(conn->command_buf, rdr->topic, rdr->channel);
+ buffered_socket_write_buffer(conn->bs, conn->command_buf);
+
+ // send initial RDY
+ buffer_reset(conn->command_buf);
+ nsq_ready(conn->command_buf, rdr->max_in_flight);
+ buffered_socket_write_buffer(conn->bs, conn->command_buf);
+}
+
+static void nsq_reader_msg_cb(struct NSQDConnection *conn, struct NSQMessage *msg, void *arg)
+{
+ struct NSQReader *rdr = (struct NSQReader *)arg;
+
+ //LM_ERR("nsq_reader_msg_cb()!\n");
+ if (rdr->msg_callback) {
+ msg->id[sizeof(msg->id)-1] = '\0';
+ rdr->msg_callback(rdr, conn, msg, rdr->ctx);
+ }
+}
+
+static void nsq_reader_close_cb(struct NSQDConnection *conn, void *arg)
+{
+ struct NSQReader *rdr = (struct NSQReader *)arg;
+
+ //LM_ERR("nsq_reader_close_cb()!\n");
+
+ if (rdr->close_callback) {
+ rdr->close_callback(rdr, conn);
+ }
+
+ LL_DELETE(rdr->conns, conn);
+
+ free_nsqd_connection(conn);
+}
+
+void nsq_lookupd_request_cb(struct HttpRequest *req, struct HttpResponse *resp, void *arg);
+
+static void nsq_reader_lookupd_poll_cb(EV_P_ struct ev_timer *w, int revents)
+{
+ struct NSQReader *rdr = (struct NSQReader *)w->data;
+ struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+ struct HttpRequest *req;
+ int i, idx, count = 0;
+ char buf[256];
+
+ //LM_ERR("nsq_reader_lookupd_poll_cb()!\n");
+
+ LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+ count++;
+ }
+ if (count == 0)
+ idx = 0;
+ else
+ idx = rand() % count;
+
+
+ i = 0;
+ LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+ if (i++ == idx) {
+ sprintf(buf, "http://%s:%d/lookup?topic=%s", nsqlookupd_endpoint->address,
+ nsqlookupd_endpoint->port, rdr->topic);
+ //LM_ERR("buf %s\n", buf);
+ req = new_http_request(buf, nsq_lookupd_request_cb, rdr, NULL);
+ http_client_get((struct HttpClient *)rdr->httpc, req);
+ break;
+ }
+ }
+
+ ev_timer_again(rdr->loop, &rdr->lookupd_poll_timer);
+}
+
+struct NSQReader* new_nsq_reader(struct ev_loop *loop, const char *topic, const char *channel, void *ctx,
+ void (*connect_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+ void (*close_callback)(struct NSQReader *rdr, struct NSQDConnection *conn),
+ void (*msg_callback)(struct NSQReader *rdr, struct NSQDConnection *conn, struct NSQMessage *msg, void *ctx))
+{
+ struct NSQReader *rdr;
+
+ rdr = (struct NSQReader *)malloc(sizeof(struct NSQReader));
+ rdr->topic = strdup(topic);
+ rdr->channel = strdup(channel);
+ rdr->max_in_flight = nsq_max_in_flight;
+ rdr->connect_callback = connect_callback;
+ rdr->close_callback = close_callback;
+ rdr->msg_callback = msg_callback;
+ rdr->ctx = ctx;
+ rdr->conns = NULL;
+ rdr->lookupd = NULL;
+ rdr->loop = loop;
+
+ rdr->httpc = new_http_client(rdr->loop);
+
+ //LM_ERR("new_nsq_reader(), nsq_max_in_flight = %d!\n", nsq_max_in_flight);
+
+ // TODO: configurable interval
+ ev_timer_init(&rdr->lookupd_poll_timer, nsq_reader_lookupd_poll_cb, 0., 5.);
+ rdr->lookupd_poll_timer.data = rdr;
+ ev_timer_again(rdr->loop, &rdr->lookupd_poll_timer);
+
+ return rdr;
+}
+
+void free_nsq_reader(struct NSQReader *rdr)
+{
+ struct NSQDConnection *conn;
+ struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+
+ //LM_ERR("free_nsq_reader()!\n");
+
+ if (rdr) {
+ // TODO: this should probably trigger disconnections and then keep
+ // trying to clean up until everything upstream is finished
+ LL_FOREACH(rdr->conns, conn) {
+ nsqd_connection_disconnect(conn);
+ }
+ LL_FOREACH(rdr->lookupd, nsqlookupd_endpoint) {
+ free_nsqlookupd_endpoint(nsqlookupd_endpoint);
+ }
+ free(rdr->topic);
+ free(rdr->channel);
+ free(rdr);
+ }
+}
+
+int nsq_reader_add_nsqlookupd_endpoint(struct NSQReader *rdr, const char *address, int port)
+{
+ struct NSQLookupdEndpoint *nsqlookupd_endpoint;
+
+ //LM_ERR("nsq_reader_add_nsqlookupd_endpoint(address = %s, port = %d)!\n", address, port);
+ nsqlookupd_endpoint = new_nsqlookupd_endpoint(address, port);
+ LL_APPEND(rdr->lookupd, nsqlookupd_endpoint);
+
+ return 1;
+}
+
+int nsq_reader_connect_to_nsqd(struct NSQReader *rdr, const char *address, int port)
+{
+ struct NSQDConnection *conn;
+ int rc;
+
+ //LM_ERR("nsq_reader_connect_to_nsqd()!\n");
+ conn = new_nsqd_connection(rdr->loop, address, port, nsq_reader_connect_cb, nsq_reader_close_cb, nsq_reader_msg_cb, rdr);
+ rc = nsqd_connection_connect(conn);
+ if (rc > 0) {
+ LL_APPEND(rdr->conns, conn);
+ }
+ return rc;
+}
+
+void nsq_run(struct ev_loop *loop)
+{
+ //LM_ERR("nsq_run()!\n");
+ srand(time(NULL));
+ ev_loop(loop, 0);
+}
diff --git a/modules/nsq/utlist.h b/modules/nsq/utlist.h
new file mode 100644
index 00000000000..6bccec7ada3
--- /dev/null
+++ b/modules/nsq/utlist.h
@@ -0,0 +1,728 @@
+/*
+Copyright (c) 2007-2013, Troy D. Hanson http://troydhanson.github.com/uthash/
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
+IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
+OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*/
+
+#ifndef UTLIST_H
+#define UTLIST_H
+
+#define UTLIST_VERSION 1.9.8
+
+#include
+
+/*
+ * This file contains macros to manipulate singly and doubly-linked lists.
+ *
+ * 1. LL_ macros: singly-linked lists.
+ * 2. DL_ macros: doubly-linked lists.
+ * 3. CDL_ macros: circular doubly-linked lists.
+ *
+ * To use singly-linked lists, your structure must have a "next" pointer.
+ * To use doubly-linked lists, your structure must "prev" and "next" pointers.
+ * Either way, the pointer to the head of the list must be initialized to NULL.
+ *
+ * ----------------.EXAMPLE -------------------------
+ * struct item {
+ * int id;
+ * struct item *prev, *next;
+ * }
+ *
+ * struct item *list = NULL:
+ *
+ * int main() {
+ * struct item *item;
+ * ... allocate and populate item ...
+ * DL_APPEND(list, item);
+ * }
+ * --------------------------------------------------
+ *
+ * For doubly-linked lists, the append and delete macros are O(1)
+ * For singly-linked lists, append and delete are O(n) but prepend is O(1)
+ * The sort macro is O(n log(n)) for all types of single/double/circular lists.
+ */
+
+/* These macros use decltype or the earlier __typeof GNU extension.
+ As decltype is only available in newer compilers (VS2010 or gcc 4.3+
+ when compiling c++ code), this code uses whatever method is needed
+ or, for VS2008 where neither is available, uses casting workarounds. */
+#ifdef _MSC_VER /* MS compiler */
+#if _MSC_VER >= 1600 && defined(__cplusplus) /* VS2010 or newer in C++ mode */
+#define LDECLTYPE(x) decltype(x)
+#else /* VS2008 or older (or VS2010 in C mode) */
+#define NO_DECLTYPE
+#define LDECLTYPE(x) char*
+#endif
+#else /* GNU, Sun and other compilers */
+#define LDECLTYPE(x) __typeof(x)
+#endif
+
+/* for VS2008 we use some workarounds to get around the lack of decltype,
+ * namely, we always reassign our tmp variable to the list head if we need
+ * to dereference its prev/next pointers, and save/restore the real head.*/
+#ifdef NO_DECLTYPE
+#define _SV(elt,list) _tmp = (char*)(list); {char **_alias = (char**)&(list); *_alias = (elt); }
+#define _NEXT(elt,list,next) ((char*)((list)->next))
+#define _NEXTASGN(elt,list,to,next) { char **_alias = (char**)&((list)->next); *_alias=(char*)(to); }
+/* #define _PREV(elt,list,prev) ((char*)((list)->prev)) */
+#define _PREVASGN(elt,list,to,prev) { char **_alias = (char**)&((list)->prev); *_alias=(char*)(to); }
+#define _RS(list) { char **_alias = (char**)&(list); *_alias=_tmp; }
+#define _CASTASGN(a,b) { char **_alias = (char**)&(a); *_alias=(char*)(b); }
+#else
+#define _SV(elt,list)
+#define _NEXT(elt,list,next) ((elt)->next)
+#define _NEXTASGN(elt,list,to,next) ((elt)->next)=(to)
+/* #define _PREV(elt,list,prev) ((elt)->prev) */
+#define _PREVASGN(elt,list,to,prev) ((elt)->prev)=(to)
+#define _RS(list)
+#define _CASTASGN(a,b) (a)=(b)
+#endif
+
+/******************************************************************************
+ * The sort macro is an adaptation of Simon Tatham's O(n log(n)) mergesort *
+ * Unwieldy variable names used here to avoid shadowing passed-in variables. *
+ *****************************************************************************/
+#define LL_SORT(list, cmp) \
+ LL_SORT2(list, cmp, next)
+
+#define LL_SORT2(list, cmp, next) \
+do { \
+ LDECLTYPE(list) _ls_p; \
+ LDECLTYPE(list) _ls_q; \
+ LDECLTYPE(list) _ls_e; \
+ LDECLTYPE(list) _ls_tail; \
+ int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \
+ if (list) { \
+ _ls_insize = 1; \
+ _ls_looping = 1; \
+ while (_ls_looping) { \
+ _CASTASGN(_ls_p,list); \
+ list = NULL; \
+ _ls_tail = NULL; \
+ _ls_nmerges = 0; \
+ while (_ls_p) { \
+ _ls_nmerges++; \
+ _ls_q = _ls_p; \
+ _ls_psize = 0; \
+ for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \
+ _ls_psize++; \
+ _SV(_ls_q,list); _ls_q = _NEXT(_ls_q,list,next); _RS(list); \
+ if (!_ls_q) break; \
+ } \
+ _ls_qsize = _ls_insize; \
+ while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) { \
+ if (_ls_psize == 0) { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ } else if (_ls_qsize == 0 || !_ls_q) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ } else if (cmp(_ls_p,_ls_q) <= 0) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ } else { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ } \
+ if (_ls_tail) { \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list); \
+ } else { \
+ _CASTASGN(list,_ls_e); \
+ } \
+ _ls_tail = _ls_e; \
+ } \
+ _ls_p = _ls_q; \
+ } \
+ if (_ls_tail) { \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,NULL,next); _RS(list); \
+ } \
+ if (_ls_nmerges <= 1) { \
+ _ls_looping=0; \
+ } \
+ _ls_insize *= 2; \
+ } \
+ } \
+} while (0)
+
+
+#define DL_SORT(list, cmp) \
+ DL_SORT2(list, cmp, prev, next)
+
+#define DL_SORT2(list, cmp, prev, next) \
+do { \
+ LDECLTYPE(list) _ls_p; \
+ LDECLTYPE(list) _ls_q; \
+ LDECLTYPE(list) _ls_e; \
+ LDECLTYPE(list) _ls_tail; \
+ int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \
+ if (list) { \
+ _ls_insize = 1; \
+ _ls_looping = 1; \
+ while (_ls_looping) { \
+ _CASTASGN(_ls_p,list); \
+ list = NULL; \
+ _ls_tail = NULL; \
+ _ls_nmerges = 0; \
+ while (_ls_p) { \
+ _ls_nmerges++; \
+ _ls_q = _ls_p; \
+ _ls_psize = 0; \
+ for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \
+ _ls_psize++; \
+ _SV(_ls_q,list); _ls_q = _NEXT(_ls_q,list,next); _RS(list); \
+ if (!_ls_q) break; \
+ } \
+ _ls_qsize = _ls_insize; \
+ while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) { \
+ if (_ls_psize == 0) { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ } else if (_ls_qsize == 0 || !_ls_q) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ } else if (cmp(_ls_p,_ls_q) <= 0) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ } else { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ } \
+ if (_ls_tail) { \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list); \
+ } else { \
+ _CASTASGN(list,_ls_e); \
+ } \
+ _SV(_ls_e,list); _PREVASGN(_ls_e,list,_ls_tail,prev); _RS(list); \
+ _ls_tail = _ls_e; \
+ } \
+ _ls_p = _ls_q; \
+ } \
+ _CASTASGN(list->prev, _ls_tail); \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,NULL,next); _RS(list); \
+ if (_ls_nmerges <= 1) { \
+ _ls_looping=0; \
+ } \
+ _ls_insize *= 2; \
+ } \
+ } \
+} while (0)
+
+#define CDL_SORT(list, cmp) \
+ CDL_SORT2(list, cmp, prev, next)
+
+#define CDL_SORT2(list, cmp, prev, next) \
+do { \
+ LDECLTYPE(list) _ls_p; \
+ LDECLTYPE(list) _ls_q; \
+ LDECLTYPE(list) _ls_e; \
+ LDECLTYPE(list) _ls_tail; \
+ LDECLTYPE(list) _ls_oldhead; \
+ LDECLTYPE(list) _tmp; \
+ int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \
+ if (list) { \
+ _ls_insize = 1; \
+ _ls_looping = 1; \
+ while (_ls_looping) { \
+ _CASTASGN(_ls_p,list); \
+ _CASTASGN(_ls_oldhead,list); \
+ list = NULL; \
+ _ls_tail = NULL; \
+ _ls_nmerges = 0; \
+ while (_ls_p) { \
+ _ls_nmerges++; \
+ _ls_q = _ls_p; \
+ _ls_psize = 0; \
+ for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \
+ _ls_psize++; \
+ _SV(_ls_q,list); \
+ if (_NEXT(_ls_q,list,next) == _ls_oldhead) { \
+ _ls_q = NULL; \
+ } else { \
+ _ls_q = _NEXT(_ls_q,list,next); \
+ } \
+ _RS(list); \
+ if (!_ls_q) break; \
+ } \
+ _ls_qsize = _ls_insize; \
+ while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) { \
+ if (_ls_psize == 0) { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ if (_ls_q == _ls_oldhead) { _ls_q = NULL; } \
+ } else if (_ls_qsize == 0 || !_ls_q) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ if (_ls_p == _ls_oldhead) { _ls_p = NULL; } \
+ } else if (cmp(_ls_p,_ls_q) <= 0) { \
+ _ls_e = _ls_p; _SV(_ls_p,list); _ls_p = \
+ _NEXT(_ls_p,list,next); _RS(list); _ls_psize--; \
+ if (_ls_p == _ls_oldhead) { _ls_p = NULL; } \
+ } else { \
+ _ls_e = _ls_q; _SV(_ls_q,list); _ls_q = \
+ _NEXT(_ls_q,list,next); _RS(list); _ls_qsize--; \
+ if (_ls_q == _ls_oldhead) { _ls_q = NULL; } \
+ } \
+ if (_ls_tail) { \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_ls_e,next); _RS(list); \
+ } else { \
+ _CASTASGN(list,_ls_e); \
+ } \
+ _SV(_ls_e,list); _PREVASGN(_ls_e,list,_ls_tail,prev); _RS(list); \
+ _ls_tail = _ls_e; \
+ } \
+ _ls_p = _ls_q; \
+ } \
+ _CASTASGN(list->prev,_ls_tail); \
+ _CASTASGN(_tmp,list); \
+ _SV(_ls_tail,list); _NEXTASGN(_ls_tail,list,_tmp,next); _RS(list); \
+ if (_ls_nmerges <= 1) { \
+ _ls_looping=0; \
+ } \
+ _ls_insize *= 2; \
+ } \
+ } \
+} while (0)
+
+/******************************************************************************
+ * singly linked list macros (non-circular) *
+ *****************************************************************************/
+#define LL_PREPEND(head,add) \
+ LL_PREPEND2(head,add,next)
+
+#define LL_PREPEND2(head,add,next) \
+do { \
+ (add)->next = head; \
+ head = add; \
+} while (0)
+
+#define LL_CONCAT(head1,head2) \
+ LL_CONCAT2(head1,head2,next)
+
+#define LL_CONCAT2(head1,head2,next) \
+do { \
+ LDECLTYPE(head1) _tmp; \
+ if (head1) { \
+ _tmp = head1; \
+ while (_tmp->next) { _tmp = _tmp->next; } \
+ _tmp->next=(head2); \
+ } else { \
+ (head1)=(head2); \
+ } \
+} while (0)
+
+#define LL_APPEND(head,add) \
+ LL_APPEND2(head,add,next)
+
+#define LL_APPEND2(head,add,next) \
+do { \
+ LDECLTYPE(head) _tmp; \
+ (add)->next=NULL; \
+ if (head) { \
+ _tmp = head; \
+ while (_tmp->next) { _tmp = _tmp->next; } \
+ _tmp->next=(add); \
+ } else { \
+ (head)=(add); \
+ } \
+} while (0)
+
+#define LL_DELETE(head,del) \
+ LL_DELETE2(head,del,next)
+
+#define LL_DELETE2(head,del,next) \
+do { \
+ LDECLTYPE(head) _tmp; \
+ if ((head) == (del)) { \
+ (head)=(head)->next; \
+ } else { \
+ _tmp = head; \
+ while (_tmp->next && (_tmp->next != (del))) { \
+ _tmp = _tmp->next; \
+ } \
+ if (_tmp->next) { \
+ _tmp->next = ((del)->next); \
+ } \
+ } \
+} while (0)
+
+/* Here are VS2008 replacements for LL_APPEND and LL_DELETE */
+#define LL_APPEND_VS2008(head,add) \
+ LL_APPEND2_VS2008(head,add,next)
+
+#define LL_APPEND2_VS2008(head,add,next) \
+do { \
+ if (head) { \
+ (add)->next = head; /* use add->next as a temp variable */ \
+ while ((add)->next->next) { (add)->next = (add)->next->next; } \
+ (add)->next->next=(add); \
+ } else { \
+ (head)=(add); \
+ } \
+ (add)->next=NULL; \
+} while (0)
+
+#define LL_DELETE_VS2008(head,del) \
+ LL_DELETE2_VS2008(head,del,next)
+
+#define LL_DELETE2_VS2008(head,del,next) \
+do { \
+ if ((head) == (del)) { \
+ (head)=(head)->next; \
+ } else { \
+ char *_tmp = (char*)(head); \
+ while ((head)->next && ((head)->next != (del))) { \
+ head = (head)->next; \
+ } \
+ if ((head)->next) { \
+ (head)->next = ((del)->next); \
+ } \
+ { \
+ char **_head_alias = (char**)&(head); \
+ *_head_alias = _tmp; \
+ } \
+ } \
+} while (0)
+#ifdef NO_DECLTYPE
+#undef LL_APPEND
+#define LL_APPEND LL_APPEND_VS2008
+#undef LL_DELETE
+#define LL_DELETE LL_DELETE_VS2008
+#undef LL_DELETE2
+#define LL_DELETE2_VS2008
+#undef LL_APPEND2
+#define LL_APPEND2 LL_APPEND2_VS2008
+#undef LL_CONCAT /* no LL_CONCAT_VS2008 */
+#undef DL_CONCAT /* no DL_CONCAT_VS2008 */
+#endif
+/* end VS2008 replacements */
+
+#define LL_FOREACH(head,el) \
+ LL_FOREACH2(head,el,next)
+
+#define LL_FOREACH2(head,el,next) \
+ for(el=head;el;el=(el)->next)
+
+#define LL_FOREACH_SAFE(head,el,tmp) \
+ LL_FOREACH_SAFE2(head,el,tmp,next)
+
+#define LL_FOREACH_SAFE2(head,el,tmp,next) \
+ for((el)=(head);(el) && (tmp = (el)->next, 1); (el) = tmp)
+
+#define LL_SEARCH_SCALAR(head,out,field,val) \
+ LL_SEARCH_SCALAR2(head,out,field,val,next)
+
+#define LL_SEARCH_SCALAR2(head,out,field,val,next) \
+do { \
+ LL_FOREACH2(head,out,next) { \
+ if ((out)->field == (val)) break; \
+ } \
+} while(0)
+
+#define LL_SEARCH(head,out,elt,cmp) \
+ LL_SEARCH2(head,out,elt,cmp,next)
+
+#define LL_SEARCH2(head,out,elt,cmp,next) \
+do { \
+ LL_FOREACH2(head,out,next) { \
+ if ((cmp(out,elt))==0) break; \
+ } \
+} while(0)
+
+#define LL_REPLACE_ELEM(head, el, add) \
+do { \
+ LDECLTYPE(head) _tmp; \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ (add)->next = (el)->next; \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ } else { \
+ _tmp = head; \
+ while (_tmp->next && (_tmp->next != (el))) { \
+ _tmp = _tmp->next; \
+ } \
+ if (_tmp->next) { \
+ _tmp->next = (add); \
+ } \
+ } \
+} while (0)
+
+#define LL_PREPEND_ELEM(head, el, add) \
+do { \
+ LDECLTYPE(head) _tmp; \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ (add)->next = (el); \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ } else { \
+ _tmp = head; \
+ while (_tmp->next && (_tmp->next != (el))) { \
+ _tmp = _tmp->next; \
+ } \
+ if (_tmp->next) { \
+ _tmp->next = (add); \
+ } \
+ } \
+} while (0) \
+
+
+/******************************************************************************
+ * doubly linked list macros (non-circular) *
+ *****************************************************************************/
+#define DL_PREPEND(head,add) \
+ DL_PREPEND2(head,add,prev,next)
+
+#define DL_PREPEND2(head,add,prev,next) \
+do { \
+ (add)->next = head; \
+ if (head) { \
+ (add)->prev = (head)->prev; \
+ (head)->prev = (add); \
+ } else { \
+ (add)->prev = (add); \
+ } \
+ (head) = (add); \
+} while (0)
+
+#define DL_APPEND(head,add) \
+ DL_APPEND2(head,add,prev,next)
+
+#define DL_APPEND2(head,add,prev,next) \
+do { \
+ if (head) { \
+ (add)->prev = (head)->prev; \
+ (head)->prev->next = (add); \
+ (head)->prev = (add); \
+ (add)->next = NULL; \
+ } else { \
+ (head)=(add); \
+ (head)->prev = (head); \
+ (head)->next = NULL; \
+ } \
+} while (0)
+
+#define DL_CONCAT(head1,head2) \
+ DL_CONCAT2(head1,head2,prev,next)
+
+#define DL_CONCAT2(head1,head2,prev,next) \
+do { \
+ LDECLTYPE(head1) _tmp; \
+ if (head2) { \
+ if (head1) { \
+ _tmp = (head2)->prev; \
+ (head2)->prev = (head1)->prev; \
+ (head1)->prev->next = (head2); \
+ (head1)->prev = _tmp; \
+ } else { \
+ (head1)=(head2); \
+ } \
+ } \
+} while (0)
+
+#define DL_DELETE(head,del) \
+ DL_DELETE2(head,del,prev,next)
+
+#define DL_DELETE2(head,del,prev,next) \
+do { \
+ assert((del)->prev != NULL); \
+ if ((del)->prev == (del)) { \
+ (head)=NULL; \
+ } else if ((del)==(head)) { \
+ (del)->next->prev = (del)->prev; \
+ (head) = (del)->next; \
+ } else { \
+ (del)->prev->next = (del)->next; \
+ if ((del)->next) { \
+ (del)->next->prev = (del)->prev; \
+ } else { \
+ (head)->prev = (del)->prev; \
+ } \
+ } \
+} while (0)
+
+
+#define DL_FOREACH(head,el) \
+ DL_FOREACH2(head,el,next)
+
+#define DL_FOREACH2(head,el,next) \
+ for(el=head;el;el=(el)->next)
+
+/* this version is safe for deleting the elements during iteration */
+#define DL_FOREACH_SAFE(head,el,tmp) \
+ DL_FOREACH_SAFE2(head,el,tmp,next)
+
+#define DL_FOREACH_SAFE2(head,el,tmp,next) \
+ for((el)=(head);(el) && (tmp = (el)->next, 1); (el) = tmp)
+
+/* these are identical to their singly-linked list counterparts */
+#define DL_SEARCH_SCALAR LL_SEARCH_SCALAR
+#define DL_SEARCH LL_SEARCH
+#define DL_SEARCH_SCALAR2 LL_SEARCH_SCALAR2
+#define DL_SEARCH2 LL_SEARCH2
+
+#define DL_REPLACE_ELEM(head, el, add) \
+do { \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ (add)->next = (el)->next; \
+ if ((el)->next == NULL) { \
+ (add)->prev = (add); \
+ } else { \
+ (add)->prev = (el)->prev; \
+ (add)->next->prev = (add); \
+ } \
+ } else { \
+ (add)->next = (el)->next; \
+ (add)->prev = (el)->prev; \
+ (add)->prev->next = (add); \
+ if ((el)->next == NULL) { \
+ (head)->prev = (add); \
+ } else { \
+ (add)->next->prev = (add); \
+ } \
+ } \
+} while (0)
+
+#define DL_PREPEND_ELEM(head, el, add) \
+do { \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ (add)->next = (el); \
+ (add)->prev = (el)->prev; \
+ (el)->prev = (add); \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ } else { \
+ (add)->prev->next = (add); \
+ } \
+} while (0) \
+
+
+/******************************************************************************
+ * circular doubly linked list macros *
+ *****************************************************************************/
+#define CDL_PREPEND(head,add) \
+ CDL_PREPEND2(head,add,prev,next)
+
+#define CDL_PREPEND2(head,add,prev,next) \
+do { \
+ if (head) { \
+ (add)->prev = (head)->prev; \
+ (add)->next = (head); \
+ (head)->prev = (add); \
+ (add)->prev->next = (add); \
+ } else { \
+ (add)->prev = (add); \
+ (add)->next = (add); \
+ } \
+(head)=(add); \
+} while (0)
+
+#define CDL_DELETE(head,del) \
+ CDL_DELETE2(head,del,prev,next)
+
+#define CDL_DELETE2(head,del,prev,next) \
+do { \
+ if ( ((head)==(del)) && ((head)->next == (head))) { \
+ (head) = 0L; \
+ } else { \
+ (del)->next->prev = (del)->prev; \
+ (del)->prev->next = (del)->next; \
+ if ((del) == (head)) (head)=(del)->next; \
+ } \
+} while (0)
+
+#define CDL_FOREACH(head,el) \
+ CDL_FOREACH2(head,el,next)
+
+#define CDL_FOREACH2(head,el,next) \
+ for(el=head;el;el=((el)->next==head ? 0L : (el)->next))
+
+#define CDL_FOREACH_SAFE(head,el,tmp1,tmp2) \
+ CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next)
+
+#define CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next) \
+ for((el)=(head), ((tmp1)=(head)?((head)->prev):NULL); \
+ (el) && ((tmp2)=(el)->next, 1); \
+ ((el) = (((el)==(tmp1)) ? 0L : (tmp2))))
+
+#define CDL_SEARCH_SCALAR(head,out,field,val) \
+ CDL_SEARCH_SCALAR2(head,out,field,val,next)
+
+#define CDL_SEARCH_SCALAR2(head,out,field,val,next) \
+do { \
+ CDL_FOREACH2(head,out,next) { \
+ if ((out)->field == (val)) break; \
+ } \
+} while(0)
+
+#define CDL_SEARCH(head,out,elt,cmp) \
+ CDL_SEARCH2(head,out,elt,cmp,next)
+
+#define CDL_SEARCH2(head,out,elt,cmp,next) \
+do { \
+ CDL_FOREACH2(head,out,next) { \
+ if ((cmp(out,elt))==0) break; \
+ } \
+} while(0)
+
+#define CDL_REPLACE_ELEM(head, el, add) \
+do { \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ if ((el)->next == (el)) { \
+ (add)->next = (add); \
+ (add)->prev = (add); \
+ (head) = (add); \
+ } else { \
+ (add)->next = (el)->next; \
+ (add)->prev = (el)->prev; \
+ (add)->next->prev = (add); \
+ (add)->prev->next = (add); \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ } \
+ } \
+} while (0)
+
+#define CDL_PREPEND_ELEM(head, el, add) \
+do { \
+ assert(head != NULL); \
+ assert(el != NULL); \
+ assert(add != NULL); \
+ (add)->next = (el); \
+ (add)->prev = (el)->prev; \
+ (el)->prev = (add); \
+ (add)->prev->next = (add); \
+ if ((head) == (el)) { \
+ (head) = (add); \
+ } \
+} while (0) \
+
+#endif /* UTLIST_H */
+