From 3240dd328db1f3552feae036a0b97ee4f95b2744 Mon Sep 17 00:00:00 2001 From: lazedo Date: Fri, 7 Apr 2017 15:06:32 +0000 Subject: [PATCH] kazoo: add exchange bindings --- src/modules/kazoo/kz_amqp.c | 746 +++++++++++++++++++++++++----------- src/modules/kazoo/kz_amqp.h | 54 ++- src/modules/kazoo/kz_json.c | 16 +- src/modules/kazoo/kz_json.h | 53 +++ 4 files changed, 626 insertions(+), 243 deletions(-) diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index 8dd1350ee42..46064aec795 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -83,16 +83,40 @@ kz_amqp_zones_ptr kz_zones = NULL; kz_amqp_zone_ptr kz_primary_zone = NULL; -amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, - amqp_bytes_t exchange, amqp_bytes_t type, - amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments) { +amqp_exchange_declare_ok_t * AMQP_CALL kz_amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, amqp_table_t arguments) +{ + LM_DBG("declare exchange %.*s , %.*s\n", + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + #if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 6 - return amqp_exchange_declare(state, channel, exchange, type, passive, durable, arguments); + return amqp_exchange_declare(state, channel, exchange->name, exchange->type, + exchange->passive, exchange->durable, + arguments); #else - return amqp_exchange_declare(state, channel, exchange, type, passive, durable, 0, 0, arguments); + return amqp_exchange_declare(state, channel, exchange->name, exchange->type, + exchange->passive, exchange->durable, + exchange->auto_delete, exchange->internal, + arguments); #endif } +amqp_queue_declare_ok_t * AMQP_CALL kz_amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_queue_ptr queue, amqp_table_t arguments) +{ + return amqp_queue_declare(state, channel, queue->name, queue->passive, queue->durable, queue->exclusive, queue->auto_delete, arguments); +} + +amqp_queue_bind_ok_t * AMQP_CALL kz_amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, amqp_table_t arguments) +{ + amqp_queue_bind_ok_t *ret = amqp_queue_bind(state, channel, queue->name, exchange->name, queue_bindings->routing, arguments); + + if(ret >= 0 && queue_bindings->next) { + return kz_amqp_queue_bind(state, channel, exchange, queue, queue_bindings->next, arguments); + } else { + return ret; + } +} + int set_non_blocking(int fd) { int flags; @@ -214,6 +238,22 @@ char *kz_amqp_bytes_dup(amqp_bytes_t bytes) return res; } +static inline str* kz_str_from_amqp_bytes(amqp_bytes_t src) +{ + char *dst_char = (char*)shm_malloc(sizeof(str)+src.len+1); + if (!dst_char) { + LM_ERR("error allocating shared memory for str"); + return NULL; + } + str* dst = (str*)dst_char; + dst->s = dst_char+sizeof(str); + + memcpy(dst->s, src.bytes, src.len); + dst->len = src.len; + dst->s[dst->len] = '\0'; + return dst; +} + char *kz_local_amqp_bytes_dup(amqp_bytes_t bytes) { char *res; @@ -321,6 +361,8 @@ void kz_amqp_free_consumer_delivery(kz_amqp_consumer_delivery_ptr ptr) shm_free(ptr->event_subkey); if(ptr->message_id) shm_free(ptr->message_id); + if(ptr->routing_key) + shm_free(ptr->routing_key); if(ptr->cmd) kz_amqp_free_pipe_cmd(ptr->cmd); shm_free(ptr); @@ -330,14 +372,14 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind) { if(bind == NULL) return; - if(bind->exchange.bytes) - kz_amqp_bytes_free(bind->exchange); - if(bind->exchange_type.bytes) - kz_amqp_bytes_free(bind->exchange_type); - if(bind->queue.bytes) - kz_amqp_bytes_free(bind->queue); - if(bind->routing_key.bytes) - kz_amqp_bytes_free(bind->routing_key); + if(bind->exchange) + kz_amqp_exchange_free(bind->exchange); + if(bind->exchange_bindings) + kz_amqp_exchange_bindings_free(bind->exchange_bindings); + if(bind->queue) + kz_amqp_queue_free(bind->queue); + if(bind->queue_bindings) + kz_amqp_routing_free(bind->queue_bindings); if(bind->event_key.bytes) kz_amqp_bytes_free(bind->event_key); if(bind->event_subkey.bytes) @@ -373,48 +415,21 @@ kz_amqp_cmd_ptr kz_amqp_alloc_pipe_cmd() return cmd; } -kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* queue, str* routing_key, str* event_key, str* event_subkey ) +kz_amqp_bind_ptr kz_amqp_bind_alloc(kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr exchange_bindings, kz_amqp_queue_ptr queue, kz_amqp_routings_ptr queue_bindings, str* event_key, str* event_subkey ) { - kz_amqp_bind_ptr bind = NULL; + kz_amqp_bind_ptr bind = NULL; - bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind)); + bind = (kz_amqp_bind_ptr)shm_malloc(sizeof(kz_amqp_bind)); if(bind == NULL) { LM_ERR("error allocation memory for bind alloc\n"); goto error; } memset(bind, 0, sizeof(kz_amqp_bind)); - if(exchange != NULL) { - bind->exchange = kz_amqp_bytes_dup_from_str(exchange); - if (bind->exchange.bytes == NULL) { - LM_ERR("Out of memory allocating for exchange\n"); - goto error; - } - } - - if(exchange_type != NULL) { - bind->exchange_type = kz_amqp_bytes_dup_from_str(exchange_type); - if (bind->exchange_type.bytes == NULL) { - LM_ERR("Out of memory allocating for exchange type\n"); - goto error; - } - } - - if(queue != NULL) { - bind->queue = kz_amqp_bytes_dup_from_str(queue); - if (bind->queue.bytes == NULL) { - LM_ERR("Out of memory allocating for queue\n"); - goto error; - } - } - - if(routing_key != NULL) { - bind->routing_key = kz_amqp_bytes_dup_from_str(routing_key); - if (bind->routing_key.bytes == NULL) { - LM_ERR("Out of memory allocating for routing key\n"); - goto error; - } - } + bind->exchange = exchange; + bind->queue = queue; + bind->exchange_bindings = exchange_bindings; + bind->queue_bindings = queue_bindings; if(event_key != NULL) { bind->event_key = kz_amqp_bytes_dup_from_str(event_key); @@ -439,11 +454,6 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc_ex(str* exchange, str* exchange_type, str* q return NULL; } -kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queue, str* routing_key ) -{ - return kz_amqp_bind_alloc_ex(exchange, exchange_type, queue, routing_key, NULL, NULL ); -} - kz_amqp_zone_ptr kz_amqp_get_primary_zone() { if(kz_primary_zone == NULL) { kz_primary_zone = (kz_amqp_zone_ptr) shm_malloc(sizeof(kz_amqp_zone)); @@ -486,6 +496,12 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone) { return zone_ptr; } +kz_amqp_queue_ptr kz_amqp_targeted_queue(char *name) +{ + str queue = str_init(name); + return kz_amqp_queue_new(&queue); +} + int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx ) { kz_amqp_bind_ptr bind = NULL; @@ -501,16 +517,23 @@ int kz_amqp_bind_init_targeted_channel(kz_amqp_server_ptr server, int idx ) } memset(bind, 0, sizeof(kz_amqp_bind)); - bind->exchange = kz_amqp_bytes_dup_from_str(&rpl_exch); - bind->exchange_type = kz_amqp_bytes_dup_from_str(&rpl_exch_type); + bind->exchange = kz_amqp_exchange_new(&rpl_exch, &rpl_exch_type); + if(bind->exchange == NULL) { + LM_ERR("error allocation memory for reply\n"); + goto error; + } sprintf(serverid, "kamailio@%.*s-<%d-%d>", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx); - bind->queue = kz_amqp_bytes_dup_from_string(serverid); + bind->queue = kz_amqp_targeted_queue(serverid); + if(bind->queue == NULL) { + LM_ERR("error allocation memory for reply\n"); + goto error; + } sprintf(serverid, "kamailio@%.*s-<%d>-targeted-%d", dbk_node_hostname.len, dbk_node_hostname.s, server->id, idx); - bind->routing_key = kz_amqp_bytes_dup_from_string(serverid); + bind->queue_bindings = kz_amqp_routing_new(serverid); - if (bind->exchange.bytes == NULL || bind->routing_key.bytes == NULL || bind->queue.bytes == NULL) { + if (bind->queue_bindings == NULL) { LM_ERR("Out of memory allocating for exchange/routing_key\n"); goto error; } @@ -1472,41 +1495,363 @@ int kz_amqp_query(struct sip_msg* msg, char* exchange, char* routing_key, char* return 1; }; -int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key) +void kz_amqp_queue_free(kz_amqp_queue_ptr queue) { - str exchange_s; - str exchange_type_s; - str queue_s; - str routing_key_s; + if(queue->name.bytes) + kz_amqp_bytes_free(queue->name); - if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) { - LM_ERR("cannot get exchange string value\n"); - return -1; + shm_free(queue); +} + +kz_amqp_queue_ptr kz_amqp_queue_new(str *name) +{ + kz_amqp_queue_ptr queue = (kz_amqp_queue_ptr) shm_malloc(sizeof(kz_amqp_queue)); + if(queue == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; } + memset(queue, 0, sizeof(kz_amqp_queue)); + queue->auto_delete = 1; - if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) { - LM_ERR("cannot get exchange type string value\n"); - return -1; + if(name != NULL) { + queue->name = kz_amqp_bytes_dup_from_str(name); + if (queue->name.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange\n"); + goto error; + } + } + + return queue; + +error: + kz_amqp_queue_free(queue); + return NULL; +} + +kz_amqp_queue_ptr kz_amqp_queue_from_json(str *name, json_object* json_obj) +{ + json_object* tmpObj = NULL; + kz_amqp_queue_ptr queue = kz_amqp_queue_new(name); + + if(queue == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; } - if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) { - LM_ERR("cannot get queue string value\n"); + tmpObj = kz_json_get_object(json_obj, "passive"); + if(tmpObj != NULL) { + queue->passive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "durable"); + if(tmpObj != NULL) { + queue->durable = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "exclusive"); + if(tmpObj != NULL) { + queue->exclusive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "auto_delete"); + if(tmpObj != NULL) { + queue->auto_delete = json_object_get_int(tmpObj); + } + + return queue; + +} + +void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange) +{ + if(exchange->name.bytes) + kz_amqp_bytes_free(exchange->name); + + if(exchange->type.bytes) + kz_amqp_bytes_free(exchange->type); + + shm_free(exchange); +} + +kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type) +{ + kz_amqp_exchange_ptr exchange = (kz_amqp_exchange_ptr) shm_malloc(sizeof(kz_amqp_exchange)); + if(exchange == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; + } + memset(exchange, 0, sizeof(kz_amqp_exchange)); + exchange->name = kz_amqp_bytes_dup_from_str(name); + if (exchange->name.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange\n"); + goto error; + } + + exchange->type = kz_amqp_bytes_dup_from_str(type); + if (exchange->type.bytes == NULL) { + LM_ERR("Out of memory allocating for exchange type\n"); + goto error; + } + + LM_DBG("NEW exchange %.*s : %.*s : %.*s : %.*s\n", + (int)name->len, (char*)name->s, + (int)type->len, (char*)type->s, + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + + return exchange; + +error: + + kz_amqp_exchange_free(exchange); + return NULL; +} + +kz_amqp_exchange_ptr kz_amqp_exchange_from_json(str *name, json_object* json_obj) +{ + str type = STR_NULL; + kz_amqp_exchange_ptr exchange = NULL; + json_object* tmpObj = NULL; + + json_extract_field("type", type); + + LM_DBG("NEW JSON exchange %.*s : %.*s\n", + (int)name->len, (char*)name->s, + (int)type.len, (char*)type.s); + + + exchange = kz_amqp_exchange_new(name, &type); + if(exchange == NULL) { + LM_ERR("NO MORE SHARED MEMORY!"); + return NULL; + } + + tmpObj = kz_json_get_object(json_obj, "passive"); + if(tmpObj != NULL) { + exchange->passive = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "durable"); + if(tmpObj != NULL) { + exchange->durable = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "auto_delete"); + if(tmpObj != NULL) { + exchange->auto_delete = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "internal"); + if(tmpObj != NULL) { + exchange->internal = json_object_get_int(tmpObj); + } + + return exchange; + +} + +void kz_amqp_routing_free(kz_amqp_routings_ptr routing) +{ + if(routing) { + if(routing->next) + kz_amqp_routing_free(routing->next); + shm_free(routing); + } +} + +kz_amqp_routings_ptr kz_amqp_routing_new(char* routing) +{ + kz_amqp_routings_ptr ptr = (kz_amqp_routings_ptr) shm_malloc(sizeof(kz_amqp_routings)); + memset(ptr, 0, sizeof(kz_amqp_routings)); + + ptr->routing = kz_amqp_bytes_dup_from_string(routing); + return ptr; +} + +kz_amqp_routings_ptr kz_amqp_routing_from_json(json_object* json_obj) +{ + kz_amqp_routings_ptr r, r1 = NULL, ret = NULL; + char *routing; + int len, n; + + if(json_obj == NULL) + return ret; + + switch(kz_json_get_type(json_obj)) + { + case json_type_string: + routing = (char*)json_object_get_string(json_obj); + ret = kz_amqp_routing_new(routing); + break; + + case json_type_array: + len = json_object_array_length(json_obj); + for(n=0; n < len; n++) { + routing = (char*)json_object_get_string(json_object_array_get_idx(json_obj, n)); + r = kz_amqp_routing_new(routing); + if(r1 != NULL) { + r1->next = r; + } else { + ret = r; + } + r1 = r; + } + break; + + default: + LM_DBG("type not handled in routing"); + break; + } + return ret; +} + +void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding) +{ + if(binding) { + if(binding->next) + kz_amqp_exchange_bindings_free(binding->next); + kz_amqp_exchange_free(binding->from_exchange); + kz_amqp_routing_free(binding->routing); + shm_free(binding); + } + +} + +kz_amqp_exchange_binding_ptr kz_amqp_exchange_binding_from_json(json_object* JObj) +{ +// struct json_object* tmpObj = NULL; + struct json_object* routingObj = NULL; + kz_amqp_exchange_ptr exchange; + kz_amqp_exchange_binding_ptr prv = NULL; + kz_amqp_exchange_binding_ptr ret = NULL; + if(JObj != NULL) { + json_foreach(JObj, k, v) { + str name = {k, strlen(k)}; + LM_DBG("exchange binding1 %s, %i , %s, %i : %.*s\n", k, (int) strlen(k), name.s, name.len, name.len, name.s); + exchange = kz_amqp_exchange_from_json(&name, v); + LM_DBG("exchange binding2 %s, %i : %.*s\n", k, (int) strlen(k), name.len, name.s); + LM_DBG("exchange binding3 %.*s : %.*s\n", + (int)exchange->name.len, (char*)exchange->name.bytes, + (int)exchange->type.len, (char*)exchange->type.bytes); + + routingObj = kz_json_get_object(v, "routing"); + if(routingObj != NULL) { + kz_amqp_exchange_binding_ptr binding = (kz_amqp_exchange_binding_ptr) shm_malloc(sizeof(kz_amqp_exchange_binding)); + memset(binding, 0, sizeof(kz_amqp_exchange_binding)); + binding->from_exchange = exchange; + binding->routing = kz_amqp_routing_from_json(routingObj); + if(binding->routing == NULL) { + LM_DBG("invalid routing"); + kz_amqp_exchange_bindings_free(binding); + binding = NULL; + } else { + if(ret == NULL) + ret = binding; + if(prv != NULL) + prv->next = binding; + } + } else { + kz_amqp_exchange_free(exchange); + } + } + } + + return ret; +} + +int kz_amqp_subscribe(struct sip_msg* msg, char* payload) +{ + str exchange_s = STR_NULL; + str queue_s = STR_NULL; + str payload_s = STR_NULL; + str key_s = STR_NULL; + str subkey_s = STR_NULL; + int no_ack = 1; + int federate = 0; + int consistent_worker = 0; + int wait_for_consumer_ack = 1; + kz_amqp_queue_ptr queue = NULL; + kz_amqp_exchange_ptr exchange = NULL; + kz_amqp_exchange_binding_ptr exchange_binding = NULL; + kz_amqp_routings_ptr routing = NULL; + str* event_key = NULL; + str* event_subkey = NULL; + + + + json_obj_ptr json_obj = NULL; + struct json_object* tmpObj = NULL; + + if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { + LM_ERR("cannot get payload value\n"); return -1; } - if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) { - LM_ERR("cannot get routing_key string value\n"); + json_obj = kz_json_parse(payload_s.s); + if (json_obj == NULL) return -1; + + + json_extract_field("exchange", exchange_s); + json_extract_field("queue", queue_s); + json_extract_field("event_key", key_s); + json_extract_field("event_subkey", subkey_s); + + if(key_s.len != 0) + event_key = &key_s; + + if(subkey_s.len != 0) + event_subkey = &subkey_s; + + tmpObj = kz_json_get_object(json_obj, "no_ack"); + if(tmpObj != NULL) { + no_ack = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack"); + if(tmpObj != NULL) { + wait_for_consumer_ack = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "federate"); + if(tmpObj != NULL) { + federate = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "consistent-worker"); + if(tmpObj != NULL) { + consistent_worker = json_object_get_int(tmpObj); + } + + tmpObj = kz_json_get_object(json_obj, "exchange-bindings"); + if(tmpObj != NULL) { + exchange_binding = kz_amqp_exchange_binding_from_json(tmpObj); } - kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s); + tmpObj = kz_json_get_object(json_obj, "routing"); + if(tmpObj != NULL) { + routing = kz_amqp_routing_from_json(tmpObj); + } + + if(routing == NULL) { + LM_ERR("invalid routing\n"); + goto error; + } + + exchange = kz_amqp_exchange_from_json(&exchange_s, json_obj); + queue = kz_amqp_queue_from_json(&queue_s, json_obj); + + kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange, exchange_binding, queue, routing, event_key, event_subkey); if(bind == NULL) { LM_ERR("Could not allocate bind struct\n"); goto error; } - bind->auto_delete = 1; - bind->no_ack = 1; + bind->no_ack = no_ack; + bind->wait_for_consumer_ack = wait_for_consumer_ack; + bind->federate = federate; + bind->consistent_worker = consistent_worker; + kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); if(binding == NULL) { @@ -1525,109 +1870,64 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange binding->bind = bind; bindings_count++; - return 1; + if(json_obj != NULL) + json_object_put(json_obj); + + return 1; error: - if(binding != NULL) - shm_free(binding); + if(binding != NULL) + shm_free(binding); + + if(json_obj != NULL) + json_object_put(json_obj); return -1; } -int kz_amqp_subscribe(struct sip_msg* msg, char* payload) +int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange_type, char* queue, char* routing_key) { str exchange_s; str exchange_type_s; str queue_s; str routing_key_s; - str payload_s; - str key_s; - str subkey_s; - int passive = 0; - int durable = 0; - int exclusive = 0; - int auto_delete = 1; - int no_ack = 1; - int federate = 0; - int wait_for_consumer_ack = 1; - int consistent_worker = 0; + kz_amqp_exchange_ptr exchange_ptr = NULL; + kz_amqp_queue_ptr queue_ptr = NULL; + kz_amqp_routings_ptr routing_ptr = NULL; - json_obj_ptr json_obj = NULL; - struct json_object* tmpObj = NULL; - if (fixup_get_svalue(msg, (gparam_p)payload, &payload_s) != 0) { - LM_ERR("cannot get payload value\n"); + if (fixup_get_svalue(msg, (gparam_p)exchange, &exchange_s) != 0) { + LM_ERR("cannot get exchange string value\n"); return -1; } - json_obj = kz_json_parse(payload_s.s); - if (json_obj == NULL) - return -1; - - - json_extract_field("exchange", exchange_s); - json_extract_field("type", exchange_type_s); - json_extract_field("queue", queue_s); - json_extract_field("routing", routing_key_s); - json_extract_field("event_key", key_s); - json_extract_field("event_subkey", subkey_s); - - tmpObj = kz_json_get_object(json_obj, "passive"); - if(tmpObj != NULL) { - passive = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "durable"); - if(tmpObj != NULL) { - durable = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "exclusive"); - if(tmpObj != NULL) { - exclusive = json_object_get_int(tmpObj); - } - - tmpObj = kz_json_get_object(json_obj, "auto_delete"); - if(tmpObj != NULL) { - auto_delete = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)exchange_type, &exchange_type_s) != 0) { + LM_ERR("cannot get exchange type string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "no_ack"); - if(tmpObj != NULL) { - no_ack = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)queue, &queue_s) != 0) { + LM_ERR("cannot get queue string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "wait_for_consumer_ack"); - if(tmpObj != NULL) { - wait_for_consumer_ack = json_object_get_int(tmpObj); - } + if (fixup_get_svalue(msg, (gparam_p)routing_key, &routing_key_s) != 0) { + LM_ERR("cannot get routing_key string value\n"); + return -1; + } - tmpObj = kz_json_get_object(json_obj, "federate"); - if(tmpObj != NULL) { - federate = json_object_get_int(tmpObj); - } + exchange_ptr = kz_amqp_exchange_new(&exchange_s, &exchange_type_s); + queue_ptr = kz_amqp_queue_new(&queue_s); + routing_ptr = kz_amqp_routing_new(routing_key_s.s); - tmpObj = kz_json_get_object(json_obj, "consistent-worker"); - if(tmpObj != NULL) { - consistent_worker = json_object_get_int(tmpObj); - } - - kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(&exchange_s, &exchange_type_s, &queue_s, &routing_key_s); + kz_amqp_bind_ptr bind = kz_amqp_bind_alloc(exchange_ptr, NULL, queue_ptr, routing_ptr, NULL, NULL); if(bind == NULL) { LM_ERR("Could not allocate bind struct\n"); goto error; } - bind->durable = durable; - bind->passive = passive; - bind->exclusive = exclusive; - bind->auto_delete = auto_delete; - bind->no_ack = no_ack; - bind->wait_for_consumer_ack = wait_for_consumer_ack; - bind->federate = federate; - bind->consistent_worker = consistent_worker; - + bind->no_ack = 1; kz_amqp_binding_ptr binding = shm_malloc(sizeof(kz_amqp_binding)); if(binding == NULL) { @@ -1646,24 +1946,18 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) binding->bind = bind; bindings_count++; - if(json_obj != NULL) - json_object_put(json_obj); - return 1; error: if(binding != NULL) shm_free(binding); - if(json_obj != NULL){ - json_object_put(json_obj); - } - return -1; } + #define KEY_SAFE(C) ((C >= 'a' && C <= 'z') || \ (C >= 'A' && C <= 'Z') || \ (C >= '0' && C <= '9') || \ @@ -1760,27 +2054,27 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx ) kz_amqp_bind_ptr bind = kz_conn->server->channels[idx].targeted; int ret = -1; - kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) + kz_amqp_exchange_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring targeted exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; goto error; } - amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, 0, 0, 0, 1, kz_amqp_empty_table); - if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) + kz_amqp_queue_declare(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_table); + if (kz_amqp_error("Declaring targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - if (amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) + if (kz_amqp_queue_bind(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0 - || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) + if (amqp_basic_consume(kz_conn->conn, kz_conn->server->channels[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, 1, 0, kz_amqp_empty_table) < 0 + || kz_amqp_error("Consuming targeted queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } @@ -1790,71 +2084,59 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx ) return ret; } -int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) +int kz_amqp_bind_exchange(kz_amqp_conn_ptr kz_conn, amqp_channel_t channel, kz_amqp_exchange_ptr exchange, kz_amqp_exchange_binding_ptr bindings) { - int ret = -1; - amqp_bytes_t federated_exchange = {0, 0}; - amqp_bytes_t federated_routing_key = {0, 0}; - char _federated[100]; - - if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } - } + while(bindings != NULL) { + LM_DBG("DECLARE EXH BIND FOR %.*s\n", (int)exchange->name.len, (char*)exchange->name.bytes); + LM_DBG("DECLARE EXH BIND TO %.*s\n", (int)bindings->from_exchange->name.len, (char*)bindings->from_exchange->name.bytes); - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { - federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s); - kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; + kz_amqp_exchange_declare(kz_conn->conn, channel, bindings->from_exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring binded exchange", amqp_get_rpc_reply(kz_conn->conn))) + return -RET_AMQP_ERROR; + + kz_amqp_routings_ptr routings = bindings->routing; + while(routings) { + if (amqp_exchange_bind(kz_conn->conn, channel, exchange->name, bindings->from_exchange->name, routings->routing, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding exchange", amqp_get_rpc_reply(kz_conn->conn))) + return -RET_AMQP_ERROR; + routings = routings->next; } - } + bindings = bindings->next; + } + return 0; + +} - amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table); - if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) +int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) +{ + int ret = -1; + + LM_DBG("BINDING CONSUMER %i TO %.*s\n", idx, (int)bind->exchange->name.len, (char*)bind->exchange->name.bytes); + kz_amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, kz_amqp_empty_table); + if (kz_amqp_error("Declaring consumer exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; goto error; } - if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } - } + if((ret = kz_amqp_bind_exchange(kz_conn, chan[idx].channel, bind->exchange, bind->exchange_bindings)) != 0) + goto error; - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { - sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes, - (bind->routing_key.len == 0 ? "" : "."), - (int)bind->routing_key.len, (char*)bind->routing_key.bytes - ); - federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated); - if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) - { - ret = -RET_AMQP_ERROR; - goto error; - } + kz_amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_table); + if (kz_amqp_error("Declaring consumer queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + if (kz_amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->exchange, bind->queue, bind->queue_bindings, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding consumer queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; } - if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 + if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue->name, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; @@ -1865,8 +2147,6 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int i chan[idx].consumer = bind; ret = idx; error: - kz_local_amqp_bytes_free(federated_exchange); - kz_local_amqp_bytes_free(federated_routing_key); return ret; } @@ -1902,14 +2182,14 @@ int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel } if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) { - json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->routing_key.bytes)); + json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)srv->channels[idx].targeted->queue_bindings->routing.bytes)); amqp_bytes_free(payload); payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj))); } int amqpres = amqp_basic_publish(srv->producer->conn, srv->channels[idx].channel, exchange, routing_key, 0, 0, &props, payload); if ( amqpres != AMQP_STATUS_OK ) { - LM_ERR("Failed to publish\n"); + LM_ERR("Failed to publish %i : %s\n", amqpres, amqp_error_string2(amqpres)); ret = -1; goto error; } @@ -2006,7 +2286,16 @@ void kz_amqp_consumer_event(char *payload, char* event_key, char* event_subkey) char* subkey = (event_subkey == NULL ? dbk_consumer_event_subkey.s : event_subkey); json_extract_field(key, ev_category); + if(ev_category.len == 0 && event_key) { + ev_category.s = event_key; + ev_category.len = strlen(event_key); + } + json_extract_field(subkey, ev_name); + if(ev_name.len == 0 && event_subkey) { + ev_name.s = event_subkey; + ev_name.len = strlen(event_subkey); + } sprintf(buffer, "kazoo:consumer-event-%.*s-%.*s",ev_category.len, ev_category.s, ev_name.len, ev_name.s); for (p=buffer ; *p; ++p) *p = tolower(*p); @@ -2510,14 +2799,13 @@ void kz_send_targeted_cmd(int server_id, amqp_bytes_t body) kz_amqp_cmd_ptr cmd = NULL; json_object* JObj = NULL; char* payload = kz_local_amqp_bytes_dup(body); - json_obj_ptr json_obj = NULL; if(payload == NULL) { LM_ERR("error allocating message payload\n"); goto error; } - json_obj = kz_json_parse(payload ); + json_obj_ptr json_obj = kz_json_parse(payload ); if (json_obj == NULL) { LM_ERR("error parsing json payload\n"); goto error; @@ -2587,6 +2875,9 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e return; } + json_object_object_add(json_obj, BLF_JSON_BROKER_ZONE, json_object_new_string(server_ptr->zone->zone)); + + json_object* JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); if(JObj != NULL) { const char* _kz_server_id_str = json_object_get_string(JObj); @@ -2622,6 +2913,7 @@ void kz_amqp_send_worker_event(kz_amqp_server_ptr server_ptr, amqp_envelope_t* e ptr->payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj)); ptr->cmd = cmd; ptr->message_id = message_id; + ptr->routing_key = kz_str_from_amqp_bytes(envelope->routing_key); if(bind) { ptr->event_key = kz_amqp_bytes_dup(bind->event_key); @@ -2912,7 +3204,7 @@ int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback return 0; -error: +error: if (timer_ev) pkg_free(timer_ev); diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index f5d13459883..c41215696cf 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -166,20 +166,53 @@ typedef struct { char* event_key; char* event_subkey; str* message_id; + str* routing_key; kz_amqp_cmd_ptr cmd; } kz_amqp_consumer_delivery, *kz_amqp_consumer_delivery_ptr; typedef struct { - amqp_bytes_t exchange; - amqp_bytes_t exchange_type; - amqp_bytes_t routing_key; - amqp_bytes_t queue; - amqp_bytes_t event_key; - amqp_bytes_t event_subkey; + amqp_bytes_t name; + amqp_bytes_t type; + amqp_boolean_t passive; + amqp_boolean_t durable; + amqp_boolean_t auto_delete; + amqp_boolean_t internal; +} kz_amqp_exchange, *kz_amqp_exchange_ptr; + +typedef struct { + amqp_bytes_t name; amqp_boolean_t passive; amqp_boolean_t durable; amqp_boolean_t exclusive; amqp_boolean_t auto_delete; +} kz_amqp_queue, *kz_amqp_queue_ptr; + +typedef struct kz_amqp_routings_t { + amqp_bytes_t routing; + struct kz_amqp_routings_t* next; +} kz_amqp_routings, *kz_amqp_routings_ptr; + +typedef struct kz_amqp_exchange_binding_t { + kz_amqp_exchange_ptr from_exchange; + kz_amqp_routings_ptr routing; + struct kz_amqp_exchange_binding_t* next; +} kz_amqp_exchange_binding, *kz_amqp_exchange_binding_ptr; + +typedef struct { +// amqp_bytes_t exchange; +// amqp_bytes_t exchange_type; + kz_amqp_exchange_ptr exchange; + kz_amqp_exchange_binding_ptr exchange_bindings; + kz_amqp_queue_ptr queue; + kz_amqp_routings_ptr queue_bindings; +// amqp_bytes_t routing_key; +// amqp_bytes_t queue; + amqp_bytes_t event_key; + amqp_bytes_t event_subkey; +// amqp_boolean_t passive; +// amqp_boolean_t durable; +// amqp_boolean_t exclusive; +// amqp_boolean_t auto_delete; amqp_boolean_t no_ack; amqp_boolean_t wait_for_consumer_ack; amqp_boolean_t federate; @@ -280,6 +313,14 @@ void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer); int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data); void kz_amqp_heartbeat_proc(int fd, short event, void *arg); +void kz_amqp_queue_free(kz_amqp_queue_ptr exchange); +void kz_amqp_exchange_free(kz_amqp_exchange_ptr exchange); +void kz_amqp_exchange_bindings_free(kz_amqp_exchange_binding_ptr binding); +void kz_amqp_routing_free(kz_amqp_routings_ptr routing); +kz_amqp_queue_ptr kz_amqp_queue_new(str *name); +kz_amqp_exchange_ptr kz_amqp_exchange_new(str *name, str* type); +kz_amqp_routings_ptr kz_amqp_routing_new(char* routing); + static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) { amqp_connection_close_t *mconn; @@ -325,3 +366,4 @@ static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) #endif /* KZ_AMQP_H_ */ + diff --git a/src/modules/kazoo/kz_json.c b/src/modules/kazoo/kz_json.c index df3e0f0582b..65d864bbd92 100644 --- a/src/modules/kazoo/kz_json.c +++ b/src/modules/kazoo/kz_json.c @@ -37,20 +37,15 @@ #include "../../core/pvar.h" #include "../../core/usr_avp.h" -# define json_foreach_key(obj,key) \ - char *key;\ - struct lh_entry *entry ## key; \ - struct lh_entry *entry_next ## key = NULL; \ - for(entry ## key = json_object_get_object(obj)->head; \ - (entry ## key ? ( \ - key = (char*)entry ## key->k, \ - entry_next ## key = entry ## key->next, \ - entry ## key) : 0); \ - entry ## key = entry_next ## key) static str kz_pv_str_empty = {"", 0}; +enum json_type kz_json_get_type(struct json_object *jso) +{ + return json_object_get_type(jso); +} + char** str_split(char* a_str, const char a_delim) { char** result = 0; @@ -322,3 +317,4 @@ int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst) return 1; } + diff --git a/src/modules/kazoo/kz_json.h b/src/modules/kazoo/kz_json.h index 2239d977da1..c53d0a041af 100644 --- a/src/modules/kazoo/kz_json.h +++ b/src/modules/kazoo/kz_json.h @@ -36,8 +36,61 @@ int kz_json_get_field(struct sip_msg* msg, char* json, char* field, char* dst); int kz_json_get_field_ex(str* json, str* field, pv_value_p dst_val); int kz_json_get_keys(struct sip_msg* msg, char* json, char* field, char* dst); +enum json_type kz_json_get_type(struct json_object *jso); struct json_object* kz_json_parse(const char *str); struct json_object* kz_json_get_object(struct json_object* jso, const char *key); +#if defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L + +# define json_foreach(obj,key,val) \ + char *key; \ + struct json_object *val __attribute__((__unused__)); \ + for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \ + ({ if(entry ## key) { \ + key = (char*)entry ## key->k; \ + val = (struct json_object*)entry ## key->v; \ + entry_next ## key = entry ## key->next; \ + } ; entry ## key; }); \ + entry ## key = entry_next ## key ) + +# define json_foreach_key(obj,key) \ + char *key; \ + for(struct lh_entry *entry ## key = json_object_get_object(obj)->head, *entry_next ## key = NULL; \ + ({ if(entry ## key) { \ + key = (char*)entry ## key->k; \ + entry_next ## key = entry ## key->next; \ + } ; entry ## key; }); \ + entry ## key = entry_next ## key ) + +#else /* ANSI C or MSC */ + +# define json_foreach(obj,key,val) \ + char *key;\ + struct json_object *val; \ + struct lh_entry *entry ## key; \ + struct lh_entry *entry_next ## key = NULL; \ + for(entry ## key = json_object_get_object(obj)->head; \ + (entry ## key ? ( \ + key = (char*)entry ## key->k, \ + val = (struct json_object*)entry ## key->v, \ + entry_next ## key = entry ## key->next, \ + entry ## key) : 0); \ + entry ## key = entry_next ## key) + +# define json_foreach_key(obj,key) \ + char *key;\ + struct lh_entry *entry ## key; \ + struct lh_entry *entry_next ## key = NULL; \ + for(entry ## key = json_object_get_object(obj)->head; \ + (entry ## key ? ( \ + key = (char*)entry ## key->k, \ + entry_next ## key = entry ## key->next, \ + entry ## key) : 0); \ + entry ## key = entry_next ## key) + +#endif /* defined(__GNUC__) && !defined(__STRICT_ANSI__) && __STDC_VERSION__ >= 199901L */ + + + #endif /* KZ_JSON_H_ */