diff --git a/src/modules/kazoo/defs.h b/src/modules/kazoo/defs.h index ab23fbacecb..616d8ace3e5 100644 --- a/src/modules/kazoo/defs.h +++ b/src/modules/kazoo/defs.h @@ -30,6 +30,14 @@ #define DBK_DEFS_H_ #define BLF_MAX_DIALOGS 8 + +#define BLF_JSON_AMQP_RECEIVED "AMQP-Received" +#define BLF_JSON_AMQP_CONSUMER "AMQP-Consumer" +#define BLF_JSON_AMQP_PUBLISHED "AMQP-Published" +#define BLF_JSON_AMQP_SENT "AMQP-Sent" +#define BLF_JSON_AMQP_PIPE_IN "AMQP-Pipe-In" +#define BLF_JSON_AMQP_PIPE_OUT "AMQP-Pipe-Out" + #define BLF_JSON_PRES "Presentity" #define BLF_JSON_PRES_USER "Presentity-User" #define BLF_JSON_PRES_REALM "Presentity-Realm" @@ -42,6 +50,10 @@ #define BLF_JSON_TO_REALM "To-Realm" #define BLF_JSON_TO_URI "To-URI" #define BLF_JSON_CALLID "Call-ID" +#define BLF_JSON_DIALOGID "Dialog-ID" +#define BLF_JSON_SENDER "Sender-URI" +#define BLF_JSON_SWITCH_URI "Switch-URI" +#define BLF_JSON_ETAG "ETag" #define BLF_JSON_TOTAG "To-Tag" #define BLF_JSON_FROMTAG "From-Tag" #define BLF_JSON_STATE "State" @@ -57,6 +69,7 @@ #define BLF_JSON_TYPE "Type" #define BLF_JSON_MSG_ID "Msg-ID" #define BLF_JSON_DIRECTION "Direction" +#define BLF_JSON_BROKER_ZONE "AMQP-Broker-Zone" #define BLF_JSON_CONTACT "Contact" #define BLF_JSON_EVENT_PKG "Event-Package" diff --git a/src/modules/kazoo/kazoo.c b/src/modules/kazoo/kazoo.c index ee4213f436d..d3078fea989 100644 --- a/src/modules/kazoo/kazoo.c +++ b/src/modules/kazoo/kazoo.c @@ -41,6 +41,8 @@ #define DBK_DEFAULT_NO_CONSUMERS 1 #define DBK_DEFAULT_NO_WORKERS 8 +#define AMQP_WORKERS_RANKING PROC_XWORKER + static int mod_init(void); static int mod_child_init(int rank); static int fire_init_event(int rank); @@ -86,6 +88,7 @@ int dbk_consumer_loop_count = 10; int dbk_consumer_ack_loop_count = 20; int dbk_include_entity = 1; int dbk_pua_mode = 1; +db_locking_t kz_pua_lock_type = DB_LOCKING_WRITE; int dbk_use_hearbeats = 0; int dbk_single_consumer_on_reconnect = 1; int dbk_consume_messages_on_reconnect = 1; @@ -138,6 +141,9 @@ static cmd_export_t cmds[] = { {"kazoo_query", (cmd_function) kz_amqp_query, 4, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE}, {"kazoo_query", (cmd_function) kz_amqp_query_ex, 3, fixup_kz_amqp, fixup_kz_amqp_free, ANY_ROUTE}, {"kazoo_pua_publish", (cmd_function) kz_pua_publish, 1, 0, 0, ANY_ROUTE}, + {"kazoo_pua_publish_mwi", (cmd_function) kz_pua_publish_mwi, 1, 0, 0, ANY_ROUTE}, + {"kazoo_pua_publish_presence", (cmd_function) kz_pua_publish_presence, 1, 0, 0, ANY_ROUTE}, + {"kazoo_pua_publish_dialoginfo", (cmd_function) kz_pua_publish_dialoginfo, 1, 0, 0, ANY_ROUTE}, {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe, 1, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE}, {"kazoo_subscribe", (cmd_function) kz_amqp_subscribe_simple, 4, fixup_kz_amqp4, fixup_kz_amqp4_free, ANY_ROUTE}, @@ -192,6 +198,7 @@ static param_export_t params[] = { {"amqps_key", STR_PARAM, &kz_amqps_key.s}, {"amqps_verify_peer", INT_PARAM, &kz_amqps_verify_peer}, {"amqps_verify_hostname", INT_PARAM, &kz_amqps_verify_hostname}, + {"pua_lock_type", INT_PARAM, &kz_pua_lock_type}, {0, 0, 0} }; @@ -349,32 +356,21 @@ static int mod_child_init(int rank) kz_amqp_zone_ptr g; kz_amqp_server_ptr s; - fire_init_event(rank); + if (rank==PROC_INIT) + fire_init_event(rank); if (rank==PROC_INIT || rank==PROC_TCP_MAIN) return 0; -// if (rank>PROC_MAIN) -// kz_cmd_pipe = kz_cmd_pipe_fds[1]; - - if (rank==PROC_MAIN) { - /* - pid=fork_process(PROC_NOCHLDINIT, "AMQP Timer", 0); - if (pid<0) - return -1; - if(pid==0){ - return(kz_amqp_timeout_proc()); - } - */ - for(i=0; i < dbk_consumer_workers; i++) { - pid=fork_process(PROC_XWORKER, "AMQP Consumer Worker", 1); + pid=fork_process(AMQP_WORKERS_RANKING, "AMQP Consumer Worker", 1); if (pid<0) return -1; /* error */ if(pid==0){ if (cfg_child_init()) return -1; close(kz_worker_pipes_fds[i*2+1]); + cfg_update(); return(kz_amqp_consumer_worker_proc(kz_worker_pipes_fds[i*2])); } } @@ -388,6 +384,7 @@ static int mod_child_init(int rank) return -1; /* error */ if(pid==0){ if (cfg_child_init()) return -1; + cfg_update(); return(kz_amqp_consumer_proc(s)); } } @@ -400,12 +397,13 @@ static int mod_child_init(int rank) if(pid==0){ if (cfg_child_init()) return -1; close(kz_cmd_pipe_fds[1]); + cfg_update(); kz_amqp_publisher_proc(kz_cmd_pipe_fds[0]); } return 0; } - if(dbk_pua_mode == 1) { + if(rank == AMQP_WORKERS_RANKING && dbk_pua_mode == 1) { if (kz_pa_dbf.init==0) { LM_CRIT("child_init: database not bound\n"); diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index 2111e892d4f..f7fc8c1ff13 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -83,16 +83,40 @@ kz_amqp_zones_ptr kz_zones = NULL; kz_amqp_zone_ptr kz_primary_zone = NULL; -amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t exchange, amqp_bytes_t type, - amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments) { +amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, amqp_table_t arguments) +{ + LM_DBG("declare exchange %.*s , %.*s\n", + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + #if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 6 - return amqp_exchange_declare(state, channel, exchange, type, passive, durable, arguments); + return amqp_exchange_declare(state, channel, exchange->name, exchange->type, + exchange->passive, exchange->durable, + arguments); #else - return amqp_exchange_declare(state, channel, exchange, type, passive, durable, 0, 0, arguments); + return amqp_exchange_declare(state, channel, exchange->name, exchange->type, + exchange->passive, exchange->durable, + exchange->auto_delete, exchange->internal, + arguments); #endif } +amqp_queue_declare_ok_t * AMQP_CALL kz_amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_queue_ptr queue, amqp_table_t arguments) +{ + return amqp_queue_declare(state, channel, queue->name, queue->passive, queue->durable, queue->exclusive, queue->auto_delete, arguments); +} + +amqp_queue_bind_ok_t * AMQP_CALL kz_amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, amqp_table_t arguments) +{ + amqp_queue_bind_ok_t *ret = amqp_queue_bind(state, channel, queue->name, exchange->name, queue_bindings->routing, arguments); + + if(ret >= 0 && queue_bindings->next) { + return kz_amqp_queue_bind(state, channel, exchange, queue, queue_bindings->next, arguments); + } else { + return ret; + } +} + int set_non_blocking(int fd) { int flags; @@ -214,6 +238,22 @@ char *kz_amqp_bytes_dup(amqp_bytes_t bytes) return res; } +static inline str* kz_str_from_amqp_bytes(amqp_bytes_t src) +{ + char *dst_char = (char*)shm_malloc(sizeof(str)+src.len+1); + if (!dst_char) { + LM_ERR("error allocating shared memory for str"); + return NULL; + } + str* dst = (str*)dst_char; + dst->s = dst_char+sizeof(str); + + memcpy(dst->s, src.bytes, src.len); + dst->len = src.len; + dst->s[dst->len] = '\0'; + return dst; +} + char *kz_local_amqp_bytes_dup(amqp_bytes_t bytes) { char *res; @@ -321,6 +361,8 @@ void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr) shm_free(ptr->event_subkey); if(ptr->message_id) shm_free(ptr->message_id); + if(ptr->routing_key) + shm_free(ptr->routing_key); if(ptr->cmd) kz_amqp_free_pipe_cmd(ptr->cmd); shm_free(ptr); @@ -330,18 +372,20 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind) { if(bind == NULL) return; - if(bind->exchange.bytes) - kz_amqp_bytes_free(bind->exchange); - if(bind->exchange_type.bytes) - kz_amqp_bytes_free(bind->exchange_type); - if(bind->queue.bytes) - kz_amqp_bytes_free(bind->queue); - if(bind->routing_key.bytes) - kz_amqp_bytes_free(bind->routing_key); + if(bind->exchange) + kz_amqp_exchange_free(bind->exchange); + if(bind->exchange_bindings) + kz_amqp_exchange_bindings_free(bind->exchange_bindings); + if(bind->queue) + kz_amqp_queue_free(bind->queue); + if(bind->queue_bindings) + kz_amqp_routing_free(bind->queue_bindings); if(bind->event_key.bytes) kz_amqp_bytes_free(bind->event_key); if(bind->event_subkey.bytes) kz_amqp_bytes_free(bind->event_subkey); + if(bind->consistent_worker_key) + shm_free(bind->consistent_worker_key); shm_free(bind); } @@ -373,48 +417,21 @@ kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd() return cmd; } -kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* queue, str* routing_key, str* event_key, str* event_subkey ) +kz_amqp_bind_ptr kz_amqp_bind_alloc(kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr exchange_bindings, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, str* event_key, str* event_subkey ) { - kz_amqp_bind_ptr bind = NULL; + kz_amqp_bind_ptr bind = NULL; - bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind)); + bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind)); if(bind == NULL) { LM_ERR("error allocation memory for bind alloc\n"); goto error; } memset(bind, 0, sizeof(kz_amqp_bind)); - if(exchange != NULL) { - bind->exchange = kz_amqp_bytes_dup_from_str(exchange); - if (bind->exchange.bytes == NULL) { - LM_ERR("Out of memory allocating for exchange\n"); - goto error; - } - } - - if(exchange_type != NULL) { - bind->exchange_type = kz_amqp_bytes_dup_from_str(exchange_type); - if (bind->exchange_type.bytes == NULL) { - LM_ERR("Out of memory allocating for exchange type\n"); - goto error; - } - } - - if(queue != NULL) { - bind->queue = kz_amqp_bytes_dup_from_str(queue); - if (bind->queue.bytes == NULL) { - LM_ERR("Out of memory allocating for queue\n"); - goto error; - } - } - - if(routing_key != NULL) { - bind->routing_key = kz_amqp_bytes_dup_from_str(routing_key); - if (bind->routing_key.bytes == NULL) { - LM_ERR("Out of memory allocating for routing key\n"); - goto error; - } - } + bind->exchange = exchange; + bind->queue = queue; + bind->exchange_bindings = exchange_bindings; + bind->queue_bindings = queue_bindings; if(event_key != NULL) { bind->event_key = kz_amqp_bytes_dup_from_str(event_key); @@ -439,11 +456,6 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* q return NULL; } -kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key ) -{ - return kz_amqp_bind_alloc_ex(exchange, exchange_type, queue, routing_key, NULL, NULL ); -} - kz_amqp_zone_ptr kz_amqp_get_primary_zone() { if(kz_primary_zone == NULL) { kz_primary_zone = (kz_amqp_zone_ptr) shm_malloc(sizeof(kz_amqp_zone)); @@ -486,6 +498,12 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone) { return zone_ptr; } +kz_amqp_queue_ptr kz_amqp_targeted_queue(char *name) +{ + str queue = str_init(name); + return kz_amqp_queue_new(&queue); +} + int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx ) { kz_amqp_bind_ptr bind = NULL; @@ -501,16 +519,23 @@ int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx ) } memset(bind, 0, sizeof(kz_amqp_bind)); - bind->exchange = kz_amqp_bytes_dup_from_str(&rpl_exch); - bind->exchange_type = kz_amqp_bytes_dup_from_str(&rpl_exch_type); + bind->exchange = kz_amqp_exchange_new(&rpl_exch, &rpl_exch_type); + if(bind->exchange == NULL) { + LM_ERR("error allocation memory for reply\n"); + goto error; + } sprintf(serverid, "kamailio@%.*s-<%d-%d>", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx); - bind->queue = kz_amqp_bytes_dup_from_string(serverid); + bind->queue = kz_amqp_targeted_queue(serverid); + if(bind->queue == NULL) { + LM_ERR("error allocation memory for reply\n"); + goto error; + } sprintf(serverid, "kamailio@%.*s-<%d>-targeted-%d", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx); - bind->routing_key = kz_amqp_bytes_dup_from_string(serverid); + bind->queue_bindings = kz_amqp_routing_new(serverid); - if (bind->exchange.bytes == NULL || bind->routing_key.bytes == NULL || bind->queue.bytes == NULL) { + if (bind->queue_bindings == NULL) { LM_ERR("Out of memory allocating for exchange/routing_key\n"); goto error; } @@ -712,8 +737,10 @@ int kz_amqp_add_connection(modparam_t type, void* val) if(newConn->info.vhost == NULL) { newConn->info.vhost = KZ_URL_ROOT; +#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 6 } else if(newConn->info.vhost[0] == '/' && strlen(newConn->info.vhost) == 1) { // bug in amqp_parse_url ? newConn->info.vhost++; +#endif } kz_amqp_server_ptr server_ptr = (kz_amqp_server_ptr)shm_malloc(sizeof(kz_amqp_server)); @@ -741,7 +768,10 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) { if (!rmq) return; - kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host); + if(rmq->heartbeat) + kz_amqp_timer_destroy(&rmq->heartbeat); + + kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host, rmq->server->zone->zone); if (rmq->conn) { LM_DBG("close connection: %d rmq(%p)->conn(%p)\n", getpid(), (void *)rmq, rmq->conn); @@ -1469,41 +1499,379 @@ int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* return 1; }; -int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key) +void kz_amqp_queue_free(kz_amqp_queue_ptr queue) { - str exchange_s; - str exchange_type_s; - str queue_s; - str routing_key_s; + if(queue->name.bytes) + kz_amqp_bytes_free(queue->name); - if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) { - LM_ERR("cannot get exchange string value\n"); - return -1; + shm_free(queue); +} + +kz_amqp_queue_ptr kz_amqp_queue_new(str *name) +{ + kz_amqp_queue_ptr queue = (kz_amqp_queue_ptr) shm_malloc(sizeof(kz_amqp_queue)); + if(queue == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; } + memset(queue, 0, sizeof(kz_amqp_queue)); + queue->auto_delete = 1; - if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) { - LM_ERR("cannot get exchange type string value\n"); - return -1; + if(name != NULL) { + queue->name = kz_amqp_bytes_dup_from_str(name); + if (queue->name.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange\n"); + goto error; + } + } + + return queue; + +error: + kz_amqp_queue_free(queue); + return NULL; +} + +kz_amqp_queue_ptr kz_amqp_queue_from_json(str *name, json_object* json_obj) +{ + json_object* tmpObj = NULL; + kz_amqp_queue_ptr queue = kz_amqp_queue_new(name); + + if(queue == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; } - if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) { - LM_ERR("cannot get queue string value\n"); + tmpObj = kz_json_get_object(json_obj, "passive"); + if(tmpObj != NULL) { + queue->passive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "durable"); + if(tmpObj != NULL) { + queue->durable = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "exclusive"); + if(tmpObj != NULL) { + queue->exclusive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "auto_delete"); + if(tmpObj != NULL) { + queue->auto_delete = json_object_get_int(tmpObj); + } + + return queue; + +} + +void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange) +{ + if(exchange->name.bytes) + kz_amqp_bytes_free(exchange->name); + + if(exchange->type.bytes) + kz_amqp_bytes_free(exchange->type); + + shm_free(exchange); +} + +kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type) +{ + kz_amqp_exchange_ptr exchange = (kz_amqp_exchange_ptr) shm_malloc(sizeof(kz_amqp_exchange)); + if(exchange == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; + } + memset(exchange, 0, sizeof(kz_amqp_exchange)); + exchange->name = kz_amqp_bytes_dup_from_str(name); + if (exchange->name.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange\n"); + goto error; + } + + exchange->type = kz_amqp_bytes_dup_from_str(type); + if (exchange->type.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange type\n"); + goto error; + } + + LM_DBG("NEW exchange %.*s : %.*s : %.*s : %.*s\n", + (int)name->len, (char*)name->s, + (int)type->len, (char*)type->s, + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + + return exchange; + +error: + + kz_amqp_exchange_free(exchange); + return NULL; +} + +kz_amqp_exchange_ptr kz_amqp_exchange_from_json(str *name, json_object* json_obj) +{ + str type = STR_NULL; + kz_amqp_exchange_ptr exchange = NULL; + json_object* tmpObj = NULL; + + json_extract_field("type", type); + + LM_DBG("NEW JSON exchange %.*s : %.*s\n", + (int)name->len, (char*)name->s, + (int)type.len, (char*)type.s); + + + exchange = kz_amqp_exchange_new(name, &type); + if(exchange == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; + } + + tmpObj = kz_json_get_object(json_obj, "passive"); + if(tmpObj != NULL) { + exchange->passive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "durable"); + if(tmpObj != NULL) { + exchange->durable = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "auto_delete"); + if(tmpObj != NULL) { + exchange->auto_delete = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "internal"); + if(tmpObj != NULL) { + exchange->internal = json_object_get_int(tmpObj); + } + + return exchange; + +} + +void kz_amqp_routing_free(kz_amqp_routings_ptr routing) +{ + if(routing) { + if(routing->next) + kz_amqp_routing_free(routing->next); + shm_free(routing); + } +} + +kz_amqp_routings_ptr kz_amqp_routing_new(char* routing) +{ + kz_amqp_routings_ptr ptr = (kz_amqp_routings_ptr) shm_malloc(sizeof(kz_amqp_routings)); + memset(ptr, 0, sizeof(kz_amqp_routings)); + + ptr->routing = kz_amqp_bytes_dup_from_string(routing); + return ptr; +} + +kz_amqp_routings_ptr kz_amqp_routing_from_json(json_object* json_obj) +{ + kz_amqp_routings_ptr r, r1 = NULL, ret = NULL; + char *routing; + int len, n; + + if(json_obj == NULL) + return ret; + + switch(kz_json_get_type(json_obj)) + { + case json_type_string: + routing = (char*)json_object_get_string(json_obj); + ret = kz_amqp_routing_new(routing); + break; + + case json_type_array: + len = json_object_array_length(json_obj); + for(n=0; n < len; n++) { + routing = (char*)json_object_get_string(json_object_array_get_idx(json_obj, n)); + r = kz_amqp_routing_new(routing); + if(r1 != NULL) { + r1->next = r; + } else { + ret = r; + } + r1 = r; + } + break; + + default: + LM_DBG("type not handled in routing"); + break; + } + return ret; +} + +void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding) +{ + if(binding) { + if(binding->next) + kz_amqp_exchange_bindings_free(binding->next); + kz_amqp_exchange_free(binding->from_exchange); + kz_amqp_routing_free(binding->routing); + shm_free(binding); + } + +} + +kz_amqp_exchange_binding_ptr kz_amqp_exchange_binding_from_json(json_object* JObj) +{ +// struct json_object* tmpObj = NULL; + struct json_object* routingObj = NULL; + kz_amqp_exchange_ptr exchange; + kz_amqp_exchange_binding_ptr prv = NULL; + kz_amqp_exchange_binding_ptr ret = NULL; + if(JObj != NULL) { + json_foreach(JObj, k, v) { + str name = {k, strlen(k)}; + LM_DBG("exchange binding1 %s, %i , %s, %i : %.*s\n", k, (int) strlen(k), name.s, name.len, name.len, name.s); + exchange = kz_amqp_exchange_from_json(&name, v); + LM_DBG("exchange binding2 %s, %i : %.*s\n", k, (int) strlen(k), name.len, name.s); + LM_DBG("exchange binding3 %.*s : %.*s\n", + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + + routingObj = kz_json_get_object(v, "routing"); + if(routingObj != NULL) { + kz_amqp_exchange_binding_ptr binding = (kz_amqp_exchange_binding_ptr) shm_malloc(sizeof(kz_amqp_exchange_binding)); + memset(binding, 0, sizeof(kz_amqp_exchange_binding)); + binding->from_exchange = exchange; + binding->routing = kz_amqp_routing_from_json(routingObj); + if(binding->routing == NULL) { + LM_DBG("invalid routing"); + kz_amqp_exchange_bindings_free(binding); + binding = NULL; + } else { + if(ret == NULL) + ret = binding; + if(prv != NULL) + prv->next = binding; + } + } else { + kz_amqp_exchange_free(exchange); + } + } + } + + return ret; +} + +int kz_amqp_subscribe(struct sip_msg* msg, char* payload) +{ + str exchange_s = STR_NULL; + str queue_s = STR_NULL; + str payload_s = STR_NULL; + str key_s = STR_NULL; + str subkey_s = STR_NULL; + int no_ack = 1; + int federate = 0; + int consistent_worker = 0; + str* consistent_worker_key = NULL; + int wait_for_consumer_ack = 1; + kz_amqp_queue_ptr queue = NULL; + kz_amqp_exchange_ptr exchange = NULL; + kz_amqp_exchange_binding_ptr exchange_binding = NULL; + kz_amqp_routings_ptr routing = NULL; + str* event_key = NULL; + str* event_subkey = NULL; + + + + json_obj_ptr json_obj = NULL; + struct json_object* tmpObj = NULL; + + if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { + LM_ERR("cannot get payload value\n"); return -1; } - if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) { - LM_ERR("cannot get routing_key string value\n"); + json_obj = kz_json_parse(payload_s.s); + if (json_obj == NULL) return -1; + + + json_extract_field("exchange", exchange_s); + json_extract_field("queue", queue_s); + json_extract_field("event_key", key_s); + json_extract_field("event_subkey", subkey_s); + + if(key_s.len != 0) + event_key = &key_s; + + if(subkey_s.len != 0) + event_subkey = &subkey_s; + + tmpObj = kz_json_get_object(json_obj, "no_ack"); + if(tmpObj != NULL) { + no_ack = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack"); + if(tmpObj != NULL) { + wait_for_consumer_ack = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "federate"); + if(tmpObj != NULL) { + federate = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "consistent-worker"); + if(tmpObj != NULL) { + consistent_worker = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "consistent-worker-key"); + if(tmpObj != NULL) { + consistent_worker_key = kz_str_dup_from_char((char*)json_object_get_string(tmpObj)); + } + + tmpObj = kz_json_get_object(json_obj, "exchange-bindings"); + if(tmpObj != NULL) { + exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "routing"); + if(tmpObj != NULL) { + routing = kz_amqp_routing_from_json(tmpObj); + } + + if(routing == NULL) { + LM_INFO("creating empty routing key : %s\n", payload_s.s); + routing = kz_amqp_routing_new(""); + } + + tmpObj = kz_json_get_object(json_obj, "exchange-def"); + if(tmpObj == NULL) { + tmpObj = json_obj; } + exchange = kz_amqp_exchange_from_json(&exchange_s, tmpObj); - kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s); + tmpObj = kz_json_get_object(json_obj, "queue-def"); + if(tmpObj == NULL) { + tmpObj = json_obj; + } + queue = kz_amqp_queue_from_json(&queue_s, tmpObj); + + kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange, exchange_binding, queue, routing, event_key, event_subkey); if(bind == NULL) { LM_ERR("Could not allocate bind struct\n"); goto error; } - bind->auto_delete = 1; - bind->no_ack = 1; + bind->no_ack = no_ack; + bind->wait_for_consumer_ack = wait_for_consumer_ack; + bind->federate = federate; + bind->consistent_worker = consistent_worker; + bind->consistent_worker_key = consistent_worker_key; + kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); if(binding == NULL) { @@ -1522,102 +1890,64 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange binding->bind = bind; bindings_count++; - return 1; + if(json_obj != NULL) + json_object_put(json_obj); + + return 1; error: - if(binding != NULL) - shm_free(binding); + if(binding != NULL) + shm_free(binding); + + if(json_obj != NULL) + json_object_put(json_obj); return -1; } -int kz_amqp_subscribe(struct sip_msg* msg, char* payload) +int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key) { str exchange_s; str exchange_type_s; str queue_s; str routing_key_s; - str payload_s; - str key_s; - str subkey_s; - int passive = 0; - int durable = 0; - int exclusive = 0; - int auto_delete = 1; - int no_ack = 1; - int federate = 0; - int wait_for_consumer_ack = 1; + kz_amqp_exchange_ptr exchange_ptr = NULL; + kz_amqp_queue_ptr queue_ptr = NULL; + kz_amqp_routings_ptr routing_ptr = NULL; - json_obj_ptr json_obj = NULL; - struct json_object* tmpObj = NULL; - if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { - LM_ERR("cannot get payload value\n"); + if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) { + LM_ERR("cannot get exchange string value\n"); return -1; } - json_obj = kz_json_parse(payload_s.s); - if (json_obj == NULL) - return -1; - - - json_extract_field("exchange", exchange_s); - json_extract_field("type", exchange_type_s); - json_extract_field("queue", queue_s); - json_extract_field("routing", routing_key_s); - json_extract_field("event_key", key_s); - json_extract_field("event_subkey", subkey_s); - - tmpObj = kz_json_get_object(json_obj, "passive"); - if(tmpObj != NULL) { - passive = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "durable"); - if(tmpObj != NULL) { - durable = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "exclusive"); - if(tmpObj != NULL) { - exclusive = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "auto_delete"); - if(tmpObj != NULL) { - auto_delete = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) { + LM_ERR("cannot get exchange type string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "no_ack"); - if(tmpObj != NULL) { - no_ack = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) { + LM_ERR("cannot get queue string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack"); - if(tmpObj != NULL) { - wait_for_consumer_ack = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) { + LM_ERR("cannot get routing_key string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "federate"); - if(tmpObj != NULL) { - federate = json_object_get_int(tmpObj); - } + exchange_ptr = kz_amqp_exchange_new(&exchange_s, &exchange_type_s); + queue_ptr = kz_amqp_queue_new(&queue_s); + routing_ptr = kz_amqp_routing_new(routing_key_s.s); - kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s); + kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange_ptr, NULL, queue_ptr, routing_ptr, NULL, NULL); if(bind == NULL) { LM_ERR("Could not allocate bind struct\n"); goto error; } - bind->durable = durable; - bind->passive = passive; - bind->exclusive = exclusive; - bind->auto_delete = auto_delete; - bind->no_ack = no_ack; - bind->wait_for_consumer_ack = wait_for_consumer_ack; - bind->federate = federate; - + bind->no_ack = 1; kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); if(binding == NULL) { @@ -1636,24 +1966,18 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) binding->bind = bind; bindings_count++; - if(json_obj != NULL) - json_object_put(json_obj); - return 1; error: if(binding != NULL) shm_free(binding); - if(json_obj != NULL){ - json_object_put(json_obj); - } - return -1; } + #define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \ (C >= 'A' && C <= 'Z') || \ (C >= '0' && C <= '9') || \ @@ -1750,27 +2074,27 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx ) kz_amqp_bind_ptr bind = kz_conn->server->channels[idx].targeted; int ret = -1; - kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) + kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring targeted exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; goto error; } - amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, 0, 0, 0, 1, kz_amqp_empty_table); - if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) + kz_amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_table); + if (kz_amqp_error("Declaring targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - if (amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) + if (kz_amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0 - || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) + if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0 + || kz_amqp_error("Consuming targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } @@ -1780,71 +2104,59 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx ) return ret; } -int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) +int kz_amqp_bind_exchange(kz_amqp_conn_ptr kz_conn, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr bindings) { - int ret = -1; - amqp_bytes_t federated_exchange = {0, 0}; - amqp_bytes_t federated_routing_key = {0, 0}; - char _federated[100]; - - if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } - } + while(bindings != NULL) { + LM_DBG("DECLARE EXH BIND FOR %.*s\n", (int)exchange->name.len, (char*)exchange->name.bytes); + LM_DBG("DECLARE EXH BIND TO %.*s\n", (int)bindings->from_exchange->name.len, (char*)bindings->from_exchange->name.bytes); - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { - federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s); - kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; + kz_amqp_exchange_declare(kz_conn->conn, channel, bindings->from_exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring binded exchange", amqp_get_rpc_reply(kz_conn->conn))) + return -RET_AMQP_ERROR; + + kz_amqp_routings_ptr routings = bindings->routing; + while(routings) { + if (amqp_exchange_bind(kz_conn->conn, channel, exchange->name, bindings->from_exchange->name, routings->routing, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding exchange", amqp_get_rpc_reply(kz_conn->conn))) + return -RET_AMQP_ERROR; + routings = routings->next; } - } + bindings = bindings->next; + } + return 0; - amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table); - if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) +} + +int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) +{ + int ret = -1; + + LM_DBG("BINDING CONSUMER %i TO %.*s\n", idx, (int)bind->exchange->name.len, (char*)bind->exchange->name.bytes); + kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring consumer exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; goto error; } - if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } - } + if((ret = kz_amqp_bind_exchange(kz_conn, chan[idx].channel, bind->exchange, bind->exchange_bindings)) != 0) + goto error; - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { - sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes, - (bind->routing_key.len == 0 ? "" : "."), - (int)bind->routing_key.len, (char*)bind->routing_key.bytes - ); - federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated); - if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } + kz_amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_table); + if (kz_amqp_error("Declaring consumer queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + if (kz_amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding consumer queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; } - if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 + if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; @@ -1855,8 +2167,6 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int i chan[idx].consumer = bind; ret = idx; error: - kz_local_amqp_bytes_free(federated_exchange); - kz_local_amqp_bytes_free(federated_routing_key); return ret; } @@ -1892,14 +2202,14 @@ int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel } if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) { - json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->routing_key.bytes)); + json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->queue_bindings->routing.bytes)); amqp_bytes_free(payload); payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj))); } int amqpres = amqp_basic_publish(srv->producer->conn, srv->channels[idx].channel, exchange, routing_key, 0, 0, &props, payload); if ( amqpres != AMQP_STATUS_OK ) { - LM_ERR("Failed to publish\n"); + LM_ERR("Failed to publish %i : %s\n", amqpres, amqp_error_string2(amqpres)); ret = -1; goto error; } @@ -1955,7 +2265,7 @@ int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t * int kz_amqp_consumer_fire_event(char *eventkey) { - struct sip_msg *fmsg; + sip_msg_t *fmsg; struct run_act_ctx ctx; int rtb, rt; @@ -1967,9 +2277,7 @@ int kz_amqp_consumer_fire_event(char *eventkey) return -2; } LM_DBG("executing event_route[%s] (%d)\n", eventkey, rt); - if(faked_msg_init()<0) - return -2; - fmsg = faked_msg_next(); + fmsg = faked_msg_get_next(); rtb = get_route_type(); set_route_type(REQUEST_ROUTE); init_run_actions_ctx(&ctx); @@ -1996,7 +2304,16 @@ void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey) char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey); json_extract_field(key, ev_category); + if(ev_category.len == 0 && event_key) { + ev_category.s = event_key; + ev_category.len = strlen(event_key); + } + json_extract_field(subkey, ev_name); + if(ev_name.len == 0 && event_subkey) { + ev_name.s = event_subkey; + ev_name.len = strlen(event_subkey); + } sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); for (p=buffer ; *p; ++p) *p = tolower(*p); @@ -2071,13 +2388,13 @@ void kz_amqp_send_consumer_event(char* payload, int nextConsumer) kz_amqp_send_consumer_event_ex(payload, NULL, NULL, 0, 0, nextConsumer); } -void kz_amqp_fire_connection_event(char *event, char* host) +void kz_amqp_fire_connection_event(char *event, char* host, char* zone) { char* payload = (char*)shm_malloc(512); - sprintf(payload, "{ \"%.*s\" : \"connection\", \"%.*s\" : \"%s\", \"host\" : \"%s\" }", + sprintf(payload, "{ \"%.*s\" : \"connection\", \"%.*s\" : \"%s\", \"host\" : \"%s\", \"zone\" : \"%s\" }", dbk_consumer_event_key.len, dbk_consumer_event_key.s, dbk_consumer_event_subkey.len, dbk_consumer_event_subkey.s, - event, host + event, host, zone ); kz_amqp_send_consumer_event(payload, 1); } @@ -2254,7 +2571,7 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) if(kz_amqp_connection_open(rmq) != 0) goto error; - kz_amqp_fire_connection_event("open", rmq->server->connection->info.host); + kz_amqp_fire_connection_event("open", rmq->server->connection->info.host, rmq->server->zone->zone); for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { /* start cleanup */ rmq->server->channels[i].state = KZ_AMQP_CHANNEL_CLOSED; @@ -2273,7 +2590,13 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) } } - return 0; + if(dbk_use_hearbeats > 0) { + if(kz_amqp_timer_create(&rmq->heartbeat, dbk_use_hearbeats, kz_amqp_heartbeat_proc, rmq) != 0) { + LM_ERR("could not schedule heartbeats for the connection\n"); + } + } + + return 0; error: kz_amqp_handle_server_failure(rmq); @@ -2291,53 +2614,21 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg) return; } - if (connection->ev != NULL) { - event_del(connection->ev); - pkg_free(connection->ev); - connection->ev = NULL; - } - - close(fd); - pkg_free(connection->timer); - + kz_amqp_timer_destroy(&connection->reconnect); kz_amqp_connect(connection); } int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection) { + int res = 0; if(connection->state != KZ_AMQP_CONNECTION_CLOSED) connection->state = KZ_AMQP_CONNECTION_FAILURE; - int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - - if (timerfd == -1) { - LM_ERR("could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.\n"); - return -1; - } - - struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); - itime->it_interval.tv_sec = 0; - itime->it_interval.tv_nsec = 0; - - itime->it_value.tv_sec = 5; - itime->it_value.tv_nsec = 0; - - if (timerfd_settime(timerfd, 0, itime, NULL) == -1) - { - LM_ERR("could not set timer to reschedule connection. No further attempts will be made to reconnect this server.\n"); - return -1; + if((res = kz_amqp_timer_create(&connection->reconnect, 5, kz_amqp_reconnect_cb, connection)) != 0) { + LM_ERR("could not reschedule connection. No further attempts will be made to reconnect this server.\n"); } - LM_DBG("timerfd value is %d\n", timerfd); - struct event *timer_ev = pkg_malloc(sizeof(struct event)); - event_set(timer_ev, timerfd, EV_READ, kz_amqp_reconnect_cb, connection); - if(event_add(timer_ev, NULL) == -1) { - LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.\n", strerror(errno)); - return -1; - } - connection->ev = timer_ev; - connection->timer = itime; - return 0; + return res; } int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd) @@ -2526,14 +2817,13 @@ void kz_send_targeted_cmd(int server_id, amqp_bytes_t body) kz_amqp_cmd_ptr cmd = NULL; json_object* JObj = NULL; char* payload = kz_local_amqp_bytes_dup(body); - json_obj_ptr json_obj = NULL; if(payload == NULL) { LM_ERR("error allocating message payload\n"); goto error; } - json_obj = kz_json_parse(payload ); + json_obj_ptr json_obj = kz_json_parse(payload ); if (json_obj == NULL) { LM_ERR("error parsing json payload\n"); goto error; @@ -2587,21 +2877,37 @@ void kz_send_targeted_cmd(int server_id, amqp_bytes_t body) } -void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_amqp_bind_ptr bind) +void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* envelope, kz_amqp_bind_ptr bind) { char buffer[100]; kz_amqp_cmd_ptr cmd = NULL; kz_amqp_consumer_delivery_ptr ptr = NULL; + json_obj_ptr json_obj = NULL; + json_object* JObj = NULL; str* message_id = NULL; int idx = envelope->channel-1; - - json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes); + int worker = 0; + int _kz_server_id = server_ptr->id; + int msg_size = envelope->message.body.len; + char *json_data = pkg_malloc(msg_size + 1); + if(!json_data) { + LM_ERR("no more package memory available. needed %d\n", msg_size + 1); + return; + } + memset(json_data, 0, msg_size + 1); + memcpy(json_data, (char*)envelope->message.body.bytes, msg_size); + json_obj = kz_json_parse(json_data); + pkg_free(json_data); if (json_obj == NULL) { LM_ERR("error parsing json body\n"); return; } - json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); + json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone)); + json_object_object_add(json_obj, BLF_JSON_AMQP_RECEIVED, json_object_new_int(time(NULL))); + + + JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); if(JObj != NULL) { const char* _kz_server_id_str = json_object_get_string(JObj); sprintf(buffer, "consumer://%d/%s", _kz_server_id, _kz_server_id_str); @@ -2609,6 +2915,8 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string(buffer)); } + json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone)); + JObj = kz_json_get_object(json_obj, BLF_JSON_MSG_ID); if(JObj != NULL) { message_id = kz_str_dup_from_char((char*)json_object_get_string(JObj)); @@ -2634,18 +2942,34 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ ptr->payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj)); ptr->cmd = cmd; ptr->message_id = message_id; + ptr->routing_key = kz_str_from_amqp_bytes(envelope->routing_key); if(bind) { ptr->event_key = kz_amqp_bytes_dup(bind->event_key); ptr->event_subkey = kz_amqp_bytes_dup(bind->event_subkey); } - consumer++; - if(consumer >= dbk_consumer_workers) { - consumer = 0; - } + if(bind && bind->consistent_worker) { + str rk; + if(bind->consistent_worker_key != NULL && + (JObj = kz_json_get_object(json_obj, bind->consistent_worker_key->s)) != NULL) { + rk.s = (char*)json_object_get_string(JObj); + rk.len = strlen(rk.s); + } else { + rk.s = (char*)envelope->routing_key.bytes; + rk.len = (int)envelope->routing_key.len; + } + worker = core_hash(&rk, NULL, dbk_consumer_workers); + LM_DBG("computed worker for %.*s is %d\n", rk.len, rk.s, worker); + } else { + consumer++; + if(consumer >= dbk_consumer_workers) { + consumer = 0; + } + worker = consumer; + } - if (write(kz_worker_pipes[consumer], &ptr, sizeof(ptr)) != sizeof(ptr)) { + if (write(kz_worker_pipes[worker], &ptr, sizeof(ptr)) != sizeof(ptr)) { LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), ptr->payload); goto error; } @@ -2701,7 +3025,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) sleep(3); continue; } - kz_amqp_fire_connection_event("open", server_ptr->connection->info.host); + kz_amqp_fire_connection_event("open", server_ptr->connection->info.host, server_ptr->zone->zone); /* reset channels */ @@ -2768,7 +3092,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) case AMQP_RESPONSE_NORMAL: idx = envelope.channel-1; if(idx < channel_base) { - kz_amqp_send_worker_event(server_ptr->id, &envelope, NULL); + kz_amqp_send_worker_event(server_ptr, &envelope, NULL); } else { idx = idx - channel_base; if(!consumer_channels[idx].consumer->no_ack ) { @@ -2778,7 +3102,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) } } if(OK) - kz_amqp_send_worker_event(server_ptr->id, &envelope, consumer_channels[idx].consumer); + kz_amqp_send_worker_event(server_ptr, &envelope, consumer_channels[idx].consumer); } break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -2847,3 +3171,111 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe) return 0; } +void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer) +{ + kz_amqp_timer_ptr timer = *pTimer; + if (timer->ev != NULL) { + event_del(timer->ev); + pkg_free(timer->ev); + timer->ev = NULL; + } + close(timer->fd); + pkg_free(timer->timer); + pkg_free(timer); + *pTimer = NULL; +} + +int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data) +{ + kz_amqp_timer_ptr timer = NULL; + struct itimerspec *itime = NULL; + struct event *timer_ev = NULL; + int timerfd = 0; + + timer = (kz_amqp_timer_ptr) pkg_malloc(sizeof(kz_amqp_timer)); + if (!timer) { + LM_ERR("could not allocate timer struct.\n"); + goto error; + } + memset(timer, 0, sizeof(kz_amqp_timer)); + + timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (timerfd == -1) { + LM_ERR("could not create timer.\n"); + goto error; + } + + itime = pkg_malloc(sizeof(struct itimerspec)); + if (!itime) { + LM_ERR("could not allocate itimerspec struct.\n"); + goto error; + } + itime->it_interval.tv_sec = 0; + itime->it_interval.tv_nsec = 0; + itime->it_value.tv_sec = seconds; + itime->it_value.tv_nsec = 0; + + if (timerfd_settime(timerfd, 0, itime, NULL) == -1) { + LM_ERR("could not set timer for %i seconds in %i\n", seconds, timerfd); + goto error; + } + + LM_DBG("timerfd value is %d\n", timerfd); + timer_ev = pkg_malloc(sizeof(struct event)); + if (!timer_ev) { + LM_ERR("could not allocate event struct.\n"); + goto error; + } + event_set(timer_ev, timerfd, EV_READ | EV_PERSIST, callback, data); + if (event_add(timer_ev, NULL) == -1) { + LM_ERR("event_add failed while creating timer (%s).\n", strerror(errno)); + goto error; + } + + timer->ev = timer_ev; + timer->timer = itime; + timer->fd = timerfd; + *pTimer = timer; + + return 0; + +error: + + if (timer_ev) + pkg_free(timer_ev); + + if (itime) + pkg_free(itime); + + if (timerfd > 0) + close(timerfd); + + if (timer) + pkg_free(timer); + + *pTimer = NULL; + + return -1; +} + +void kz_amqp_heartbeat_proc(int fd, short event, void *arg) +{ + int res; + amqp_frame_t heartbeat; + kz_amqp_conn_ptr connection = (kz_amqp_conn_ptr) arg; + LM_DBG("sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id); + if (connection->state != KZ_AMQP_CONNECTION_OPEN) { + kz_amqp_timer_destroy(&connection->heartbeat); + return; + } + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; + res = amqp_send_frame(connection->conn, &heartbeat); + if (res != AMQP_STATUS_OK) { + LM_ERR("error sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id); + kz_amqp_timer_destroy(&connection->heartbeat); + kz_amqp_handle_server_failure(connection); + return; + } + timerfd_settime(connection->heartbeat->fd, 0, connection->heartbeat->timer, NULL); +} diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index aa07640914b..b4a88fffb70 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -80,26 +80,23 @@ extern int dbk_consumer_workers; typedef struct kz_amqp_connection_t { kz_amqp_connection_info info; char* url; -// struct kz_amqp_connection_t* next; } kz_amqp_connection, *kz_amqp_connection_ptr; -/* -typedef struct { - kz_amqp_connection_ptr current; - kz_amqp_connection_ptr head; - kz_amqp_connection_ptr tail; -} kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr; -*/ +typedef struct kz_amqp_timer_t { + struct event *ev; + struct itimerspec *timer; + int fd; +} kz_amqp_timer, *kz_amqp_timer_ptr; + typedef struct kz_amqp_conn_t { struct kz_amqp_server_t* server; amqp_connection_state_t conn; kz_amqp_connection_state state; - struct event *ev; - struct itimerspec *timer; + kz_amqp_timer_ptr reconnect; + kz_amqp_timer_ptr heartbeat; amqp_socket_t *socket; amqp_channel_t channel_count; amqp_channel_t channel_counter; -// struct kz_amqp_conn_t* next; } kz_amqp_conn, *kz_amqp_conn_ptr; typedef struct { @@ -133,10 +130,6 @@ typedef struct { amqp_channel_t channel; struct timeval timeout; - /* timer */ -// struct event *timer_ev; -// int timerfd; - /* async */ char *cb_route; char *err_route; @@ -173,23 +166,58 @@ typedef struct { char* event_key; char* event_subkey; str* message_id; + str* routing_key; kz_amqp_cmd_ptr cmd; } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr; typedef struct { - amqp_bytes_t exchange; - amqp_bytes_t exchange_type; - amqp_bytes_t routing_key; - amqp_bytes_t queue; - amqp_bytes_t event_key; - amqp_bytes_t event_subkey; + amqp_bytes_t name; + amqp_bytes_t type; + amqp_boolean_t passive; + amqp_boolean_t durable; + amqp_boolean_t auto_delete; + amqp_boolean_t internal; +} kz_amqp_exchange, *kz_amqp_exchange_ptr; + +typedef struct { + amqp_bytes_t name; amqp_boolean_t passive; amqp_boolean_t durable; amqp_boolean_t exclusive; amqp_boolean_t auto_delete; +} kz_amqp_queue, *kz_amqp_queue_ptr; + +typedef struct kz_amqp_routings_t { + amqp_bytes_t routing; + struct kz_amqp_routings_t* next; +} kz_amqp_routings, *kz_amqp_routings_ptr; + +typedef struct kz_amqp_exchange_binding_t { + kz_amqp_exchange_ptr from_exchange; + kz_amqp_routings_ptr routing; + struct kz_amqp_exchange_binding_t* next; +} kz_amqp_exchange_binding, *kz_amqp_exchange_binding_ptr; + +typedef struct { +// amqp_bytes_t exchange; +// amqp_bytes_t exchange_type; + kz_amqp_exchange_ptr exchange; + kz_amqp_exchange_binding_ptr exchange_bindings; + kz_amqp_queue_ptr queue; + kz_amqp_routings_ptr queue_bindings; +// amqp_bytes_t routing_key; +// amqp_bytes_t queue; + amqp_bytes_t event_key; + amqp_bytes_t event_subkey; +// amqp_boolean_t passive; +// amqp_boolean_t durable; +// amqp_boolean_t exclusive; +// amqp_boolean_t auto_delete; amqp_boolean_t no_ack; amqp_boolean_t wait_for_consumer_ack; amqp_boolean_t federate; + amqp_boolean_t consistent_worker; + str* consistent_worker_key; } kz_amqp_bind, *kz_amqp_bind_ptr; typedef struct { @@ -278,10 +306,22 @@ kz_amqp_zone_ptr kz_amqp_get_zones(); kz_amqp_zone_ptr kz_amqp_get_zone(char* zone); kz_amqp_zone_ptr kz_amqp_add_zone(char* zone); -void kz_amqp_fire_connection_event(char *event, char* host); +void kz_amqp_fire_connection_event(char *event, char* host, char* zone); void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd); +void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer); +int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data); +void kz_amqp_heartbeat_proc(int fd, short event, void *arg); + +void kz_amqp_queue_free(kz_amqp_queue_ptr exchange); +void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange); +void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding); +void kz_amqp_routing_free(kz_amqp_routings_ptr routing); +kz_amqp_queue_ptr kz_amqp_queue_new(str *name); +kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type); +kz_amqp_routings_ptr kz_amqp_routing_new(char* routing); + static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) { amqp_connection_close_t *mconn; @@ -327,3 +367,4 @@ static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) #endif /* KZ_AMQP_H_ */ + diff --git a/src/modules/kazoo/kz_json.c b/src/modules/kazoo/kz_json.c index df3e0f0582b..3f51cbf574d 100644 --- a/src/modules/kazoo/kz_json.c +++ b/src/modules/kazoo/kz_json.c @@ -37,24 +37,22 @@ #include "../../core/pvar.h" #include "../../core/usr_avp.h" -# define json_foreach_key(obj,key) \ - char *key;\ - struct lh_entry *entry ## key; \ - struct lh_entry *entry_next ## key = NULL; \ - for(entry ## key = json_object_get_object(obj)->head; \ - (entry ## key ? ( \ - key = (char*)entry ## key->k, \ - entry_next ## key = entry ## key->next, \ - entry ## key) : 0); \ - entry ## key = entry_next ## key) static str kz_pv_str_empty = {"", 0}; -char** str_split(char* a_str, const char a_delim) +enum json_type kz_json_get_type(struct json_object *jso) { - char** result = 0; - size_t count = 0; + return json_object_get_type(jso); +} + +typedef str* json_key; +typedef json_key* json_keys; + +json_keys str_split(char* a_str, const char a_delim, int* c) +{ + json_keys result = 0; + int count = 0; char* tmp = a_str; char* last_comma = 0; char delim[2]; @@ -78,34 +76,43 @@ char** str_split(char* a_str, const char a_delim) /* Add space for terminating null string so caller knows where the list of returned strings ends. */ - count++; +// count++; + *c = count; + LM_DBG("COUNT %d\n", count); - result = pkg_malloc(sizeof(char*) * count); + result = pkg_malloc(sizeof(json_key) * count); + memset(result, 0, sizeof(json_key) * count); if (result) { - size_t idx = 0; + int idx = 0; char* token = strtok(a_str, delim); while (token) { + LM_DBG("TOKEN %d : %s\n", idx, token); + assert(idx < count); + + result[idx] = pkg_malloc(sizeof(str)); len = strlen(token); - char* ptr = pkg_malloc( (len+1) * sizeof(char)); - *(result + idx) = ptr; - memcpy(ptr, token, len); - ptr[len] = '\0'; + + result[idx]->len = len; + result[idx]->s = pkg_malloc((len + 1) * sizeof(char)); + strncpy(result[idx]->s, token, len); + result[idx]->s[len] = '\0'; + int i = 0; while(i < len) { - if(ptr[i] == kz_json_escape_char) - ptr[i] = '.'; + if(result[idx]->s[i] == kz_json_escape_char) + result[idx]->s[i] = '.'; i++; } + LM_DBG("TOKEN2 %d : %s\n", idx, result[idx]->s); token = strtok(0, delim); idx++; } - assert(idx == count - 1); - *(result + idx) = 0; + assert(idx == count); } return result; @@ -113,10 +120,12 @@ char** str_split(char* a_str, const char a_delim) struct json_object * kz_json_get_field_object(str* json, str* field) { - char** tokens; + json_keys keys; + json_key key; char* dup; - char f1[25], f2[25];//, f3[25]; - int i; + char* token; + char f1[250], f2[250];//, f3[25]; + int i, parts; dup = pkg_malloc(json->len+1); memcpy(dup, json->s, json->len); @@ -137,19 +146,22 @@ struct json_object * kz_json_get_field_object(str* json, str* field) dup = pkg_malloc(field->len+1); memcpy(dup, field->s, field->len); dup[field->len] = '\0'; - tokens = str_split(dup, '.'); + keys = str_split(dup, '.', &parts); pkg_free(dup); - if (tokens) + if (keys) { jtree = j; - for (i = 0; *(tokens + i); i++) + for (i = 0; i < parts; i++) { + key = keys[i]; + LM_DBG("TOKEN %d , %p, %p : %s\n", i, keys[i], key->s, key->s); + if(jtree != NULL) { - str field = str_init(*(tokens + i)); + //str field1 = str_init(token); // check for idx [] - int sresult = sscanf(field.s, "%[^[][%[^]]]", f1, f2); //, f3); - LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, field.s, f1, (sresult > 1? f2 : "(null)")); + int sresult = sscanf(key->s, "%[^[][%[^]]]", f1, f2); //, f3); + LM_DBG("CHECK IDX %d - %s , %s, %s\n", sresult, key->s, f1, (sresult > 1? f2 : "(null)")); jtree = kz_json_get_object(jtree, f1); if(jtree != NULL) { @@ -165,9 +177,15 @@ struct json_object * kz_json_get_field_object(str* json, str* field) } } } - pkg_free(*(tokens + i)); } - pkg_free(tokens); + + for(i = 0;i < parts; i++) { + LM_DBG("FREE %d\n", i); + pkg_free(keys[i]->s); + pkg_free(keys[i]); + } + + pkg_free(keys); } @@ -322,3 +340,4 @@ int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst) return 1; } + diff --git a/src/modules/kazoo/kz_json.h b/src/modules/kazoo/kz_json.h index 2239d977da1..c53d0a041af 100644 --- a/src/modules/kazoo/kz_json.h +++ b/src/modules/kazoo/kz_json.h @@ -36,8 +36,61 @@ int kz_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst); int kz_json_get_field_ex(str* json, str* field, pv_value_p dst_val); int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst); +enum json_type kz_json_get_type(struct json_object *jso); struct json_object* kz_json_parse(const char *str); struct json_object* kz_json_get_object(struct json_object* jso, const char *key); +#if defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L + +# define json_foreach(obj,key,val) \ + char *key; \ + struct json_object *val __attribute__((__unused__)); \ + for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \ + ({ if(entry ## key) { \ + key = (char*)entry ## key->k; \ + val = (struct json_object*)entry ## key->v; \ + entry_next ## key = entry ## key->next; \ + } ; entry ## key; }); \ + entry ## key = entry_next ## key ) + +# define json_foreach_key(obj,key) \ + char *key; \ + for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \ + ({ if(entry ## key) { \ + key = (char*)entry ## key->k; \ + entry_next ## key = entry ## key->next; \ + } ; entry ## key; }); \ + entry ## key = entry_next ## key ) + +#else /* ANSI C or MSC */ + +# define json_foreach(obj,key,val) \ + char *key;\ + struct json_object *val; \ + struct lh_entry *entry ## key; \ + struct lh_entry *entry_next ## key = NULL; \ + for(entry ## key = json_object_get_object(obj)->head; \ + (entry ## key ? ( \ + key = (char*)entry ## key->k, \ + val = (struct json_object*)entry ## key->v, \ + entry_next ## key = entry ## key->next, \ + entry ## key) : 0); \ + entry ## key = entry_next ## key) + +# define json_foreach_key(obj,key) \ + char *key;\ + struct lh_entry *entry ## key; \ + struct lh_entry *entry_next ## key = NULL; \ + for(entry ## key = json_object_get_object(obj)->head; \ + (entry ## key ? ( \ + key = (char*)entry ## key->k, \ + entry_next ## key = entry ## key->next, \ + entry ## key) : 0); \ + entry ## key = entry_next ## key) + +#endif /* defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L */ + + + #endif /* KZ_JSON_H_ */ diff --git a/src/modules/kazoo/kz_pua.c b/src/modules/kazoo/kz_pua.c index a16106ea3bf..f512efe0f0e 100644 --- a/src/modules/kazoo/kz_pua.c +++ b/src/modules/kazoo/kz_pua.c @@ -43,6 +43,7 @@ extern db1_con_t *kz_pa_db; extern db_func_t kz_pa_dbf; extern str kz_presentity_table; +extern db_locking_t kz_pua_lock_type; int kz_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* sender, str* body, int expires, int reset) { @@ -125,7 +126,7 @@ int kz_pua_update_presentity(str* event, str* realm, str* user, str* etag, str* if (kz_pa_dbf.start_transaction) { - if (kz_pa_dbf.start_transaction(kz_pa_db, DB_LOCKING_WRITE) < 0) + if (kz_pa_dbf.start_transaction(kz_pa_db, kz_pua_lock_type) < 0) { LM_ERR("in start_transaction\n"); goto error; @@ -195,6 +196,8 @@ int kz_pua_publish_presence_to_presentity(struct json_object *json_obj) { str note = str_init("Available"); str status = str_presence_status_online; int expires = 0; + str sender = {0, 0}, + etag = { 0, 0 }; char *body = (char *)pkg_malloc(PRESENCE_BODY_BUFFER_SIZE); if(body == NULL) { @@ -203,311 +206,395 @@ int kz_pua_publish_presence_to_presentity(struct json_object *json_obj) { goto error; } - json_extract_field(BLF_JSON_FROM, from); - json_extract_field(BLF_JSON_FROM_USER, from_user); - json_extract_field(BLF_JSON_FROM_REALM, from_realm); - json_extract_field(BLF_JSON_TO, to); - json_extract_field(BLF_JSON_TO_USER, to_user); - json_extract_field(BLF_JSON_TO_REALM, to_realm); - json_extract_field(BLF_JSON_CALLID, callid); - json_extract_field(BLF_JSON_FROMTAG, fromtag); - json_extract_field(BLF_JSON_TOTAG, totag); - json_extract_field(BLF_JSON_DIRECTION, direction); - json_extract_field(BLF_JSON_STATE, state); - - struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); - if(ExpiresObj != NULL) { - expires = json_object_get_int(ExpiresObj); - if(expires > 0) - expires += (int)time(NULL); - } + json_extract_field(BLF_JSON_FROM, from); + json_extract_field(BLF_JSON_FROM_USER, from_user); + json_extract_field(BLF_JSON_FROM_REALM, from_realm); + json_extract_field(BLF_JSON_TO, to); + json_extract_field(BLF_JSON_TO_USER, to_user); + json_extract_field(BLF_JSON_TO_REALM, to_realm); + json_extract_field(BLF_JSON_CALLID, callid); + json_extract_field(BLF_JSON_FROMTAG, fromtag); + json_extract_field(BLF_JSON_TOTAG, totag); + json_extract_field(BLF_JSON_DIRECTION, direction); + json_extract_field(BLF_JSON_STATE, state); + + json_extract_field(BLF_JSON_ETAG, etag); + json_extract_field(BLF_JSON_SENDER, sender); + + if (sender.len == 0) { + json_extract_field(BLF_JSON_SWITCH_URI, sender); + } - if (!from_user.len || !to_user.len || !state.len) { - LM_ERR("missing one of From / To / State\n"); - goto error; - } + struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); + if (ExpiresObj != NULL) { + expires = json_object_get_int(ExpiresObj); + if (expires > 0) + expires += (int) time(NULL); + } - if (!strcmp(state.s, "early")) { - note = str_presence_note_busy; - activity = str_presence_act_busy; + if (!from_user.len || !to_user.len || !state.len) { + LM_ERR("missing one of From / To / State\n"); + goto error; + } - } else if (!strcmp(state.s, "confirmed")) { - note = str_presence_note_otp; - activity = str_presence_act_otp; + if (!strcmp(state.s, "early")) { + note = str_presence_note_busy; + activity = str_presence_act_busy; - } else if (!strcmp(state.s, "offline")) { - note = str_presence_note_offline; - status = str_presence_status_offline; + } else if (!strcmp(state.s, "confirmed")) { + note = str_presence_note_otp; + activity = str_presence_act_otp; - }; // else { - // note = str_presence_note_idle; -// } + } else if (!strcmp(state.s, "offline")) { + note = str_presence_note_offline; + status = str_presence_status_offline; + }; - sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s); + sprintf(body, PRESENCE_BODY, from_user.s, callid.s, status.s, note.s, activity.s, note.s); - presence_body.s = body; - presence_body.len = strlen(body); + presence_body.s = body; + presence_body.len = strlen(body); - if(dbk_pua_mode == 1) { - kz_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &presence_body, expires, 1); - } + if (sender.len == 0) { + sender = from; + } - error: + if (etag.len == 0) { + etag = callid; + } + + kz_pua_update_presentity(&event, &from_realm, &from_user, &etag, &sender, &presence_body, expires, 1); + +error: - if(body) - pkg_free(body); + if (body) + pkg_free(body); - return ret; + return ret; } -int kz_pua_publish_mwi_to_presentity(struct json_object *json_obj) { - int ret = 1; - str event = str_init("message-summary"); - str from = { 0, 0 }, to = { 0, 0 }; - str from_user = { 0, 0 }, to_user = { 0, 0 }; - str from_realm = { 0, 0 }, to_realm = { 0, 0 }; - str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 }; - str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 }, - mwi_new = { 0, 0 }, mwi_saved = { 0, 0 }, - mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 }, - mwi_account = { 0, 0 }, mwi_body = { 0, 0 }; - int expires = 0; +int kz_pua_publish_mwi_to_presentity(struct json_object *json_obj) +{ + int ret = 1; + str event = str_init("message-summary"); + str from = {0, 0}, + to = { 0, 0 }; + str from_user = { 0, 0 }, to_user = { 0, 0 }; + str from_realm = { 0, 0 }, to_realm = { 0, 0 }; + str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 }; + str mwi_user = { 0, 0 }, mwi_waiting = { 0, 0 }, mwi_new = { 0, 0 }, mwi_saved = { 0, 0 }, mwi_urgent = { 0, 0 }, mwi_urgent_saved = { 0, 0 }, mwi_account = { 0, 0 }, + mwi_body = { 0, 0 }; + int expires = 0; + str sender = { 0, 0 }, etag = { 0, 0 }; + + char *body = (char *) pkg_malloc(MWI_BODY_BUFFER_SIZE); + if (body == NULL) { + LM_ERR("Error allocating buffer for publish\n"); + ret = -1; + goto error; + } - char *body = (char *)pkg_malloc(MWI_BODY_BUFFER_SIZE); - if(body == NULL) { - LM_ERR("Error allocating buffer for publish\n"); - ret = -1; - goto error; - } + json_extract_field(BLF_JSON_FROM, from); + json_extract_field(BLF_JSON_FROM_USER, from_user); + json_extract_field(BLF_JSON_FROM_REALM, from_realm); + json_extract_field(BLF_JSON_TO, to); + json_extract_field(BLF_JSON_TO_USER, to_user); + json_extract_field(BLF_JSON_TO_REALM, to_realm); + json_extract_field(BLF_JSON_CALLID, callid); + json_extract_field(BLF_JSON_FROMTAG, fromtag); + json_extract_field(BLF_JSON_TOTAG, totag); + + json_extract_field(MWI_JSON_TO, mwi_user); + json_extract_field(MWI_JSON_WAITING, mwi_waiting); + json_extract_field(MWI_JSON_NEW, mwi_new); + json_extract_field(MWI_JSON_SAVED, mwi_saved); + json_extract_field(MWI_JSON_URGENT, mwi_urgent); + json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved); + json_extract_field(MWI_JSON_ACCOUNT, mwi_account); + + json_extract_field(BLF_JSON_ETAG, etag); + json_extract_field(BLF_JSON_SENDER, sender); + + struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); + if (ExpiresObj != NULL) { + expires = json_object_get_int(ExpiresObj); + if (expires > 0) + expires += (int) time(NULL); + } - json_extract_field(BLF_JSON_FROM, from); - json_extract_field(BLF_JSON_FROM_USER, from_user); - json_extract_field(BLF_JSON_FROM_REALM, from_realm); - json_extract_field(BLF_JSON_TO, to); - json_extract_field(BLF_JSON_TO_USER, to_user); - json_extract_field(BLF_JSON_TO_REALM, to_realm); - json_extract_field(BLF_JSON_CALLID, callid); - json_extract_field(BLF_JSON_FROMTAG, fromtag); - json_extract_field(BLF_JSON_TOTAG, totag); - - json_extract_field(MWI_JSON_TO, mwi_user); - json_extract_field(MWI_JSON_WAITING, mwi_waiting); - json_extract_field(MWI_JSON_NEW, mwi_new); - json_extract_field(MWI_JSON_SAVED, mwi_saved); - json_extract_field(MWI_JSON_URGENT, mwi_urgent); - json_extract_field(MWI_JSON_URGENT_SAVED, mwi_urgent_saved); - json_extract_field(MWI_JSON_ACCOUNT, mwi_account); - - struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); - if(ExpiresObj != NULL) { - expires = json_object_get_int(ExpiresObj); - if(expires > 0) - expires += (int)time(NULL); - } + sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s, mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s, mwi_saved.len, mwi_saved.s, mwi_urgent.len, + mwi_urgent.s, mwi_urgent_saved.len, mwi_urgent_saved.s); - sprintf(body, MWI_BODY, mwi_waiting.len, mwi_waiting.s, - mwi_account.len, mwi_account.s, mwi_new.len, mwi_new.s, - mwi_saved.len, mwi_saved.s, mwi_urgent.len, mwi_urgent.s, - mwi_urgent_saved.len, mwi_urgent_saved.s); + mwi_body.s = body; + mwi_body.len = strlen(body); - mwi_body.s = body; - mwi_body.len = strlen(body); + if (sender.len == 0) { + sender = from; + } - if(dbk_pua_mode == 1) { - kz_pua_update_presentity(&event, &from_realm, &from_user, &callid, &from, &mwi_body, expires, 1); - } + if (etag.len == 0) { + etag = callid; + } - error: + kz_pua_update_presentity(&event, &from_realm, &from_user, &etag, &from, &mwi_body, expires, 1); - if(body) - pkg_free(body); + error: + if (body) + pkg_free(body); - return ret; + return ret; } -int kz_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) { - int ret = 1; - str from = { 0, 0 }, to = { 0, 0 }, pres = {0, 0}; - str from_user = { 0, 0 }, to_user = { 0, 0 }, pres_user = { 0, 0 }; - str from_realm = { 0, 0 }, to_realm = { 0, 0 }, pres_realm = { 0, 0 }; - str from_uri = { 0, 0 }, to_uri = { 0, 0 }; - str callid = { 0, 0 }, fromtag = { 0, 0 }, totag = { 0, 0 }; - str state = { 0, 0 }; - str direction = { 0, 0 }; - char sender_buf[1024]; - str sender = {0, 0}; - str dialoginfo_body = {0 , 0}; - int expires = 0; - str event = str_init("dialog"); - int reset = 0; - char to_tag_buffer[100]; - char from_tag_buffer[100]; - - char *body = (char *)pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE); - if(body == NULL) { - LM_ERR("Error allocating buffer for publish\n"); - ret = -1; - goto error; - } +int kz_pua_publish_dialoginfo_to_presentity(struct json_object *json_obj) +{ + int ret = 1; + str from = { 0, 0 }, to = { 0, 0 }, pres = { 0, 0 }; + str from_user = { 0, 0 }, to_user = { 0, 0 }, pres_user = { 0, 0 }; + str from_realm = { 0, 0 }, to_realm = { 0, 0 }, pres_realm = { 0, 0 }; + str from_uri = { 0, 0 }, to_uri = { 0, 0 }; + str callid = { 0, 0 }, dialogid = { 0, 0 }; + str fromtag = { 0, 0 }, totag = { 0, 0 }; + str state = { 0, 0 }; + str direction = { 0, 0 }; + str dialoginfo_body = { 0, 0 }; + int expires = 0; + str event = str_init("dialog"); + int reset = 0; + char to_tag_buffer[100]; + char from_tag_buffer[100]; + char sender_buf[1024]; + str sender = { 0, 0 }, etag = { 0, 0 }; + + char *body = (char *) pkg_malloc(DIALOGINFO_BODY_BUFFER_SIZE); + if (body == NULL) { + LM_ERR("Error allocating buffer for publish\n"); + ret = -1; + goto error; + } + json_extract_field(BLF_JSON_PRES, pres); + json_extract_field(BLF_JSON_PRES_USER, pres_user); + json_extract_field(BLF_JSON_PRES_REALM, pres_realm); + json_extract_field(BLF_JSON_FROM, from); + json_extract_field(BLF_JSON_FROM_USER, from_user); + json_extract_field(BLF_JSON_FROM_REALM, from_realm); + json_extract_field(BLF_JSON_FROM_URI, from_uri); + json_extract_field(BLF_JSON_TO, to); + json_extract_field(BLF_JSON_TO_USER, to_user); + json_extract_field(BLF_JSON_TO_REALM, to_realm); + json_extract_field(BLF_JSON_TO_URI, to_uri); + json_extract_field(BLF_JSON_CALLID, callid); + json_extract_field(BLF_JSON_DIALOGID, dialogid); + json_extract_field(BLF_JSON_FROMTAG, fromtag); + json_extract_field(BLF_JSON_TOTAG, totag); + json_extract_field(BLF_JSON_DIRECTION, direction); + json_extract_field(BLF_JSON_STATE, state); + + json_extract_field(BLF_JSON_ETAG, etag); + json_extract_field(BLF_JSON_SENDER, sender); + + if (sender.len == 0) { + json_extract_field(BLF_JSON_SWITCH_URI, sender); + } - json_extract_field(BLF_JSON_PRES, pres); - json_extract_field(BLF_JSON_PRES_USER, pres_user); - json_extract_field(BLF_JSON_PRES_REALM, pres_realm); - json_extract_field(BLF_JSON_FROM, from); - json_extract_field(BLF_JSON_FROM_USER, from_user); - json_extract_field(BLF_JSON_FROM_REALM, from_realm); - json_extract_field(BLF_JSON_FROM_URI, from_uri); - json_extract_field(BLF_JSON_TO, to); - json_extract_field(BLF_JSON_TO_USER, to_user); - json_extract_field(BLF_JSON_TO_REALM, to_realm); - json_extract_field(BLF_JSON_TO_URI, to_uri); - json_extract_field(BLF_JSON_CALLID, callid); - json_extract_field(BLF_JSON_FROMTAG, fromtag); - json_extract_field(BLF_JSON_TOTAG, totag); - json_extract_field(BLF_JSON_DIRECTION, direction); - json_extract_field(BLF_JSON_STATE, state); - - struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); - if(ExpiresObj != NULL) { - expires = json_object_get_int(ExpiresObj); - if(expires > 0) - expires += (int)time(NULL); - } + struct json_object* ExpiresObj = kz_json_get_object(json_obj, BLF_JSON_EXPIRES); + if (ExpiresObj != NULL) { + expires = json_object_get_int(ExpiresObj); + if (expires > 0) + expires += (int) time(NULL); + } - ExpiresObj = kz_json_get_object(json_obj, "Flush-Level"); - if(ExpiresObj != NULL) { - reset = json_object_get_int(ExpiresObj); - } + ExpiresObj = kz_json_get_object(json_obj, "Flush-Level"); + if (ExpiresObj != NULL) { + reset = json_object_get_int(ExpiresObj); + } - if (!from.len || !to.len || !state.len) { - LM_ERR("missing one of From / To / State\n"); + if (!from.len || !to.len || !state.len) { + LM_ERR("missing one of From / To / State\n"); goto error; - } + } - if(!pres.len || !pres_user.len || !pres_realm.len) { - pres = from; - pres_user = from_user; - pres_realm = from_realm; - } + if (!pres.len || !pres_user.len || !pres_realm.len) { + pres = from; + pres_user = from_user; + pres_realm = from_realm; + } - if(!from_uri.len) - from_uri = from; + if (!from_uri.len) + from_uri = from; - if(!to_uri.len) - to_uri = to; + if (!to_uri.len) + to_uri = to; - if(fromtag.len > 0) { - fromtag.len = sprintf(from_tag_buffer, LOCAL_TAG, fromtag.len, fromtag.s); - fromtag.s = from_tag_buffer; - } + if (fromtag.len > 0) { + fromtag.len = sprintf(from_tag_buffer, LOCAL_TAG, fromtag.len, fromtag.s); + fromtag.s = from_tag_buffer; + } - if(totag.len > 0) { - totag.len = sprintf(to_tag_buffer, REMOTE_TAG, totag.len, totag.s); - totag.s = to_tag_buffer; - } + if (totag.len > 0) { + totag.len = sprintf(to_tag_buffer, REMOTE_TAG, totag.len, totag.s); + totag.s = to_tag_buffer; + } - if(callid.len) { - - if(dbk_include_entity) { - sprintf(body, DIALOGINFO_BODY, - pres.len, pres.s, - callid.len, callid.s, - callid.len, callid.s, - fromtag.len, fromtag.s, - totag.len, totag.s, - direction.len, direction.s, - state.len, state.s, - from_user.len, from_user.s, - from.len, from.s, - from_uri.len, from_uri.s, - to_user.len, to_user.s, - to.len, to.s, - to_uri.len, to_uri.s - ); - } else { - - sprintf(body, DIALOGINFO_BODY_2, - pres.len, pres.s, - callid.len, callid.s, - callid.len, callid.s, - fromtag.len, fromtag.s, - totag.len, totag.s, - direction.len, direction.s, - state.len, state.s, - from_user.len, from_user.s, - from.len, from.s, - to_user.len, to_user.s, - to.len, to.s - ); - } - - } else { - sprintf(body, DIALOGINFO_EMPTY_BODY, pres.len, pres.s); - } + if (dialogid.len == 0) { + dialogid = callid; + } - sprintf(sender_buf, "sip:%s",callid.s); - sender.s = sender_buf; - sender.len = strlen(sender_buf); + if (callid.len) { - dialoginfo_body.s = body; - dialoginfo_body.len = strlen(body); + if (dbk_include_entity) { + sprintf(body, DIALOGINFO_BODY, pres.len, pres.s, dialogid.len, dialogid.s, callid.len, callid.s, fromtag.len, fromtag.s, totag.len, totag.s, direction.len, + direction.s, state.len, state.s, from_user.len, from_user.s, from.len, from.s, from_uri.len, from_uri.s, to_user.len, to_user.s, to.len, to.s, + to_uri.len, to_uri.s); + } else { - if(dbk_pua_mode == 1) { - kz_pua_update_presentity(&event, &pres_realm, &pres_user, &callid, &sender, &dialoginfo_body, expires, reset); - } + sprintf(body, DIALOGINFO_BODY_2, pres.len, pres.s, dialogid.len, dialogid.s, callid.len, callid.s, fromtag.len, fromtag.s, totag.len, totag.s, direction.len, + direction.s, state.len, state.s, from_user.len, from_user.s, from.len, from.s, to_user.len, to_user.s, to.len, to.s); + } + + } else { + sprintf(body, DIALOGINFO_EMPTY_BODY, pres.len, pres.s); + } - error: + if (sender.len == 0) { + sprintf(sender_buf, "sip:%s", callid.s); + sender.s = sender_buf; + sender.len = strlen(sender_buf); + } + + if (etag.len == 0) { + etag = callid; + } - if(body) - pkg_free(body); + dialoginfo_body.s = body; + dialoginfo_body.len = strlen(body); + kz_pua_update_presentity(&event, &pres_realm, &pres_user, &etag, &sender, &dialoginfo_body, expires, reset); - return ret; + error: + + if (body) + pkg_free(body); + + return ret; } -int kz_pua_publish(struct sip_msg* msg, char *json) { - str event_name = { 0, 0 }, event_package = { 0, 0 }; - struct json_object *json_obj = NULL; - int ret = 1; +int kz_pua_publish(struct sip_msg* msg, char *json) +{ + str event_name = { 0, 0 }, event_package = { 0, 0 }; + struct json_object *json_obj = NULL; + int ret = 1; - if(dbk_pua_mode != 1) { - LM_ERR("pua_mode must be 1 to publish\n"); - ret = -1; - goto error; - } + if (dbk_pua_mode != 1) { + LM_ERR("pua_mode must be 1 to publish\n"); + ret = -1; + goto error; + } - /* extract info from json and construct xml */ - json_obj = kz_json_parse(json); - if (json_obj == NULL) { - ret = -1; - goto error; - } + /* extract info from json and construct xml */ + json_obj = kz_json_parse(json); + if (json_obj == NULL) { + ret = -1; + goto error; + } - json_extract_field(BLF_JSON_EVENT_NAME, event_name); - - if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) { - json_extract_field(BLF_JSON_EVENT_PKG, event_package); - if (event_package.len == str_event_dialog.len - && strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) { - ret = kz_pua_publish_dialoginfo_to_presentity(json_obj); - } else if (event_package.len == str_event_message_summary.len - && strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) { - ret = kz_pua_publish_mwi_to_presentity(json_obj); - } else if (event_package.len == str_event_presence.len - && strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) { - ret = kz_pua_publish_presence_to_presentity(json_obj); - } - } + json_extract_field(BLF_JSON_EVENT_NAME, event_name); -error: - if(json_obj) + if (event_name.len == 6 && strncmp(event_name.s, "update", 6) == 0) { + json_extract_field(BLF_JSON_EVENT_PKG, event_package); + if (event_package.len == str_event_dialog.len && strncmp(event_package.s, str_event_dialog.s, event_package.len) == 0) { + ret = kz_pua_publish_dialoginfo_to_presentity(json_obj); + } else if (event_package.len == str_event_message_summary.len && strncmp(event_package.s, str_event_message_summary.s, event_package.len) == 0) { + ret = kz_pua_publish_mwi_to_presentity(json_obj); + } else if (event_package.len == str_event_presence.len && strncmp(event_package.s, str_event_presence.s, event_package.len) == 0) { + ret = kz_pua_publish_presence_to_presentity(json_obj); + } + } + + error: if (json_obj) + json_object_put(json_obj); + + return ret; +} + +int kz_pua_publish_mwi(struct sip_msg* msg, char *json) +{ + struct json_object *json_obj = NULL; + int ret = 1; + + if (dbk_pua_mode != 1) { + LM_ERR("pua_mode must be 1 to publish\n"); + ret = -1; + goto error; + } + + /* extract info from json and construct xml */ + json_obj = kz_json_parse(json); + if (json_obj == NULL) { + ret = -1; + goto error; + } + + ret = kz_pua_publish_mwi_to_presentity(json_obj); + + error: if (json_obj) json_object_put(json_obj); return ret; } +int kz_pua_publish_presence(struct sip_msg* msg, char *json) +{ + struct json_object *json_obj = NULL; + int ret = 1; + + if (dbk_pua_mode != 1) { + LM_ERR("pua_mode must be 1 to publish\n"); + ret = -1; + goto error; + } + + /* extract info from json and construct xml */ + json_obj = kz_json_parse(json); + if (json_obj == NULL) { + ret = -1; + goto error; + } + + ret = kz_pua_publish_presence_to_presentity(json_obj); + + error: if (json_obj) + json_object_put(json_obj); + + return ret; +} + +int kz_pua_publish_dialoginfo(struct sip_msg* msg, char *json) +{ + struct json_object *json_obj = NULL; + int ret = 1; + + if (dbk_pua_mode != 1) { + LM_ERR("pua_mode must be 1 to publish\n"); + ret = -1; + goto error; + } + + /* extract info from json and construct xml */ + json_obj = kz_json_parse(json); + if (json_obj == NULL) { + ret = -1; + goto error; + } + + ret = kz_pua_publish_dialoginfo_to_presentity(json_obj); + + error: if (json_obj) + json_object_put(json_obj); + + return ret; +} diff --git a/src/modules/kazoo/kz_pua.h b/src/modules/kazoo/kz_pua.h index 2e6fbddb986..47e16f6234c 100644 --- a/src/modules/kazoo/kz_pua.h +++ b/src/modules/kazoo/kz_pua.h @@ -27,6 +27,9 @@ int kz_initialize_pua(); int kz_pua_publish(struct sip_msg* msg, char *json); +int kz_pua_publish_mwi(struct sip_msg* msg, char *json); +int kz_pua_publish_presence(struct sip_msg* msg, char *json); +int kz_pua_publish_dialoginfo(struct sip_msg* msg, char *json); #endif