From 1f124fa5db0b3e1e787cd7b1f454d7a694d34a80 Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Fri, 26 Jun 2015 19:35:55 +0100 Subject: [PATCH] kazoo : changes in targeted exchanges --- modules/kazoo/kazoo.c | 9 +- modules/kazoo/kz_amqp.c | 430 ++++++++++++++++++++++++++++++++-------- modules/kazoo/kz_amqp.h | 29 ++- modules/kazoo/kz_hash.c | 182 +++++++++++++++++ modules/kazoo/kz_hash.h | 15 ++ 5 files changed, 578 insertions(+), 87 deletions(-) create mode 100644 modules/kazoo/kz_hash.c create mode 100644 modules/kazoo/kz_hash.h diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 6366d3b87c0..e9ad264c5d1 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -57,6 +57,7 @@ int kz_zone_counter = 0; int dbk_auth_wait_timeout = 3; int dbk_reconn_retries = 8; int dbk_presentity_phtable_size = 4096; +int dbk_command_table_size = 2048; int dbk_use_federated_exchange = 1; str dbk_federated_exchange = str_init("federation"); @@ -189,6 +190,7 @@ static param_export_t params[] = { {"federated_exchange", STR_PARAM, &dbk_federated_exchange.s}, {"amqp_heartbeats", INT_PARAM, &dbk_use_hearbeats}, {"amqp_primary_zone", STR_PARAM, &dbk_primary_zone_name.s}, + {"amqp_command_hashtable_size", INT_PARAM, &dbk_command_table_size}, {0, 0, 0} }; @@ -347,14 +349,19 @@ static int mod_child_init(int rank) if (rank==PROC_INIT || rank==PROC_TCP_MAIN) return 0; +// if (rank>PROC_MAIN) +// kz_cmd_pipe = kz_cmd_pipe_fds[1]; + if (rank==PROC_MAIN) { + /* pid=fork_process(PROC_NOCHLDINIT, "AMQP Timer", 0); if (pid<0) - return -1; /* error */ + return -1; if(pid==0){ return(kz_amqp_timeout_proc()); } + */ for(i=0; i < dbk_consumer_processes; i++) { pid=fork_process(i+1, "AMQP Consumer Worker", 1); diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 5e1836e4ae0..f5152ee1bb3 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -16,6 +16,7 @@ #include "kz_amqp.h" #include "kz_json.h" +#include "kz_hash.h" #define RET_AMQP_ERROR 2 @@ -58,6 +59,40 @@ int set_non_blocking(int fd) return 0; } +static inline str* kz_str_dup(str* src) +{ + char *dst_char = (char*)shm_malloc(sizeof(str)+src->len+1); + if (!dst_char) { + LM_ERR("error allocating shared memory for str"); + return NULL; + } + str* dst = (str*)dst_char; + dst->s = dst_char+sizeof(str); + + memcpy(dst->s, src->s, src->len); + dst->len = src->len; + dst->s[dst->len] = '\0'; + return dst; +} + +static inline str* kz_str_dup_from_char(char* src) +{ + int len = strlen(src); + char *dst_char = (char*)shm_malloc(sizeof(str)+len+1); + if (!dst_char) { + LM_ERR("error allocating shared memory for str"); + return NULL; + } + str* dst = (str*)dst_char; + dst->s = dst_char+sizeof(str); + + memcpy(dst->s, src, len); + dst->len = len; + dst->s[dst->len] = '\0'; + return dst; +} + + static char *kz_amqp_str_dup(str *src) { char *res; @@ -99,6 +134,7 @@ static char *kz_local_amqp_str_dup(str *src) return res; } +/* static char *kz_local_amqp_string_dup(char *src) { char *res; @@ -113,6 +149,7 @@ static char *kz_local_amqp_string_dup(char *src) res[sz] = 0; return res; } +*/ char *kz_amqp_bytes_dup(amqp_bytes_t bytes) { @@ -257,6 +294,8 @@ void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd) shm_free(cmd->payload); if (cmd->return_payload) shm_free(cmd->return_payload); + if (cmd->message_id) + shm_free(cmd->message_id); lock_release(&cmd->lock); lock_destroy(&cmd->lock); shm_free(cmd); @@ -431,6 +470,10 @@ int kz_amqp_init() { int i; kz_amqp_zone_ptr g; kz_amqp_server_ptr s; + + if(!kz_hash_init()) + return 0; + if(kz_bindings == NULL) { kz_bindings = (kz_amqp_bindings_ptr) shm_malloc(sizeof(kz_amqp_bindings)); memset(kz_bindings, 0, sizeof(kz_amqp_bindings)); @@ -487,6 +530,7 @@ kz_amqp_server_ptr kz_amqp_destroy_server(kz_amqp_server_ptr server_ptr) kz_amqp_server_ptr next = server_ptr->next; kz_amqp_destroy_connection(server_ptr->connection); kz_amqp_destroy_channels(server_ptr); + shm_free(server_ptr->producer); shm_free(server_ptr); return next; } @@ -510,6 +554,7 @@ void kz_amqp_destroy_zones() while(g) { g = kz_amqp_destroy_zone(g); } + shm_free(kz_zones); kz_zones = NULL; kz_primary_zone = NULL; } @@ -528,6 +573,7 @@ void kz_amqp_destroy() { } shm_free(kz_bindings); } + kz_hash_destroy(); } #define KZ_URL_MAX_SIZE 100 @@ -916,7 +962,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) if(lock_init(&cmd->lock)==NULL) { LM_ERR("cannot init the lock for publishing in process %d\n", getpid()); - lock_dealloc(&cmd->lock); +// lock_dealloc(&cmd->lock); goto error; } lock_get(&cmd->lock); @@ -983,6 +1029,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ cmd->routing_key = kz_amqp_str_dup(str_routing_key); cmd->reply_routing_key = kz_amqp_string_dup(serverid); cmd->payload = kz_amqp_string_dup(payload); + cmd->message_id = kz_str_dup(&unique_string); cmd->timeout = *kz_timeout; @@ -1475,76 +1522,82 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int idx ) int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) { - int ret = -1; - amqp_bytes_t federated_exchange = {0, 0}; - amqp_bytes_t federated_routing_key = {0, 0}; + int ret = -1; + amqp_bytes_t federated_exchange = {0, 0}; + amqp_bytes_t federated_routing_key = {0, 0}; char _federated[100]; - if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) { - ret = -RET_AMQP_ERROR; - goto error; - } + if(bind->federate == 0 + || dbk_use_federated_exchange == 0 + || kz_conn->server->zone == kz_amqp_get_primary_zone()) { + amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); + if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } } - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { - federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s); - amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); - if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) { - ret = -RET_AMQP_ERROR; - goto error; - } + if(bind->federate == 1 + && dbk_use_federated_exchange == 1 + && kz_conn->server->zone != kz_amqp_get_primary_zone()) { + federated_exchange = kz_local_amqp_bytes_dup_from_string(dbk_federated_exchange.s); + amqp_exchange_declare(kz_conn->conn, chan[idx].channel, federated_exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); + if (kz_amqp_error("Declaring federated exchange", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } } amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table); - if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) { + if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) + { ret = -RET_AMQP_ERROR; goto error; } if(bind->federate == 0 - || dbk_use_federated_exchange == 0 - || kz_conn->server->zone == kz_amqp_get_primary_zone()) { - if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) { - ret = -RET_AMQP_ERROR; - goto error; - } + || dbk_use_federated_exchange == 0 + || kz_conn->server->zone == kz_amqp_get_primary_zone()) { + if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } } - if(bind->federate == 1 - && dbk_use_federated_exchange == 1 - && kz_conn->server->zone != kz_amqp_get_primary_zone()) { + if(bind->federate == 1 + && dbk_use_federated_exchange == 1 + && kz_conn->server->zone != kz_amqp_get_primary_zone()) { sprintf(_federated, "%.*s%s%.*s", (int)bind->exchange.len, (char*)bind->exchange.bytes, - (bind->routing_key.len == 0 ? "" : "."), + (bind->routing_key.len == 0 ? "" : "."), (int)bind->routing_key.len, (char*)bind->routing_key.bytes - ); - federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated); + ); + federated_routing_key = kz_local_amqp_bytes_dup_from_string(_federated); if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, federated_exchange, federated_routing_key, kz_amqp_empty_table) < 0 - || kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) { - ret = -RET_AMQP_ERROR; - goto error; - } - } - - if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 - || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) { + || kz_amqp_error("Binding queue to federated exchange", amqp_get_rpc_reply(kz_conn->conn))) + { ret = -RET_AMQP_ERROR; goto error; - } + } + } + + if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 + || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } - chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING; + chan[idx].state = KZ_AMQP_CHANNEL_CONSUMING; chan[idx].consumer = bind; - ret = idx; -error: - kz_local_amqp_bytes_free(federated_exchange); - kz_local_amqp_bytes_free(federated_routing_key); - return ret; + ret = idx; + error: + kz_local_amqp_bytes_free(federated_exchange); + kz_local_amqp_bytes_free(federated_routing_key); + return ret; } int kz_amqp_send_ex(kz_amqp_server_ptr srv, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx) @@ -1767,6 +1820,78 @@ void kz_amqp_fire_connection_event(char *event, char* host) kz_amqp_send_consumer_event(payload, 1); } +void kz_amqp_cmd_timeout_cb(int fd, short event, void *arg) +{ + kz_amqp_cmd_ptr cmd = (kz_amqp_cmd_ptr) arg; + kz_amqp_cmd_ptr retrieved_cmd = kz_cmd_retrieve(cmd->message_id); + if(retrieved_cmd == NULL) { + LM_ERR("timer what!? "); + } else { + LM_ERR("amqp message timeout for exchange '%s' with routing key '%s' and message id '%.*s'\n" + , cmd->exchange, cmd->routing_key + , cmd->message_id->len, cmd->message_id->s + ); + close(retrieved_cmd->timerfd); + event_del(retrieved_cmd->timer_ev); + pkg_free(retrieved_cmd->timer_ev); + retrieved_cmd->return_code = -1; + lock_release(&retrieved_cmd->lock); + } +} + + +int kz_amqp_start_cmd_timer(kz_amqp_cmd_ptr cmd) +{ + int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + + if (timerfd == -1) { + LM_ERR("Could not create timerfd."); + return 0; + } + + cmd->timerfd = timerfd; + struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); + if(itime == NULL){ + LM_ERR("Could not set timer."); + return 0; + } + itime->it_interval.tv_sec = 0; + itime->it_interval.tv_nsec = 0; + + itime->it_value.tv_sec = cmd->timeout.tv_sec; + itime->it_value.tv_nsec = cmd->timeout.tv_usec * 1000; + if (timerfd_settime(timerfd, 0, itime, NULL) == -1) { + LM_ERR("Could not set timer."); + return 0; + } + pkg_free(itime); + struct event *timer_ev = pkg_malloc(sizeof(struct event)); + if(timer_ev == NULL) { + LM_ERR("Could not allocate timer_ev."); + return 0; + } + event_set(timer_ev, timerfd, EV_READ, kz_amqp_cmd_timeout_cb, cmd); + if(event_add(timer_ev, NULL) == -1) { + LM_ERR("event_add failed while setting request timer (%s).", strerror(errno)); + pkg_free(timer_ev); + return 0; + } + cmd->timer_ev = timer_ev; + return 1; +} + +void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd) +{ + if (cmd->timer_ev) { + close(cmd->timerfd); + event_del(cmd->timer_ev); + pkg_free(cmd->timer_ev); + } else { + LM_ERR("No timer for message id %.*s\n", cmd->message_id->len, cmd->message_id->s); + } +} + + /* check timeouts */ int kz_amqp_timeout_proc() { @@ -1807,7 +1932,9 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) { int i,channel_res; kz_amqp_cmd_ptr cmd; - kz_amqp_connection_close(rmq); + if(rmq->state != KZ_AMQP_CONNECTION_CLOSED) { + kz_amqp_connection_close(rmq); + } rmq->state = KZ_AMQP_CONNECTION_CLOSED; rmq->channel_count = rmq->channel_counter = 0; if (!(rmq->conn = amqp_new_connection())) { @@ -1925,32 +2052,9 @@ int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection) return 0; } - -void kz_amqp_publisher_connect() +int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd) { - kz_amqp_zone_ptr g; - kz_amqp_server_ptr s; - for (g = kz_amqp_get_zones(); g != NULL; g = g->next) { - for (s = g->servers->head; s != NULL; s = s->next) { - if(s->producer == NULL) { - s->producer = (kz_amqp_conn_ptr) shm_malloc(sizeof(kz_amqp_conn)); - memset(s->producer, 0, sizeof(kz_amqp_conn)); - s->producer->server = s; - } - kz_amqp_connect(s->producer); - } - } -} - -void kz_amqp_publisher_proc_cb(int fd, short event, void *arg) -{ - kz_amqp_cmd_ptr cmd; int idx; - if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { - LM_ERR("failed to read from command pipe: %s\n", strerror(errno)); - return; - } - int sent = 0; kz_amqp_zone_ptr g; kz_amqp_server_ptr s; @@ -1958,39 +2062,123 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg) for (s = g->servers->head; s != NULL && sent == 0; s = s->next) { if(cmd->server_id == s->id || cmd->server_id == 0) { if(s->producer->state == KZ_AMQP_CONNECTION_OPEN) { - if(cmd->type == KZ_AMQP_CMD_PUBLISH) { + if(cmd->type == KZ_AMQP_CMD_PUBLISH || cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST) { idx = kz_amqp_send(s, cmd); if(idx >= 0) { cmd->return_code = AMQP_RESPONSE_NORMAL; + s->channels[idx].state = KZ_AMQP_CHANNEL_FREE; sent = 1; } else { cmd->return_code = -1; - LM_ERR("ERROR SENDING PUBLISH"); + s->channels[idx].state = KZ_AMQP_CHANNEL_CLOSED; + LM_ERR("error sending publish to zone : %s , connection id : %d, uri : %s", s->zone->zone, s->id, s->connection->url); kz_amqp_handle_server_failure(s->producer); } s->channels[idx].cmd = NULL; - s->channels[idx].state = KZ_AMQP_CHANNEL_FREE; } else if(cmd->type == KZ_AMQP_CMD_CALL) { idx = kz_amqp_send_receive(s, cmd); if(idx < 0) { s->channels[idx].cmd = NULL; cmd->return_code = -1; - s->channels[idx].state = KZ_AMQP_CHANNEL_FREE; - LM_ERR("ERROR SENDING QUERY"); + s->channels[idx].state = KZ_AMQP_CHANNEL_CLOSED; + LM_ERR("error sending query to zone : %s , connection id : %d, uri : %s", s->zone->zone, s->id, s->connection->url); kz_amqp_handle_server_failure(s->producer); } else { + s->channels[idx].state = KZ_AMQP_CHANNEL_FREE; sent = 1; } } } } } + if(cmd->type == KZ_AMQP_CMD_PUBLISH_BROADCAST) { + sent = 0; + } } - if(sent == 0) { - LM_ERR("ERROR SENDING QUERY"); + return sent; +} + + +void kz_amqp_publisher_connect() +{ + kz_amqp_zone_ptr g; + kz_amqp_server_ptr s; + for (g = kz_amqp_get_zones(); g != NULL; g = g->next) { + for (s = g->servers->head; s != NULL; s = s->next) { + if(s->producer == NULL) { + s->producer = (kz_amqp_conn_ptr) shm_malloc(sizeof(kz_amqp_conn)); + memset(s->producer, 0, sizeof(kz_amqp_conn)); + s->producer->server = s; + } + kz_amqp_connect(s->producer); + } + } +} + +void kz_amqp_publisher_proc_cb(int fd, short event, void *arg) +{ + kz_amqp_cmd_ptr cmd; + kz_amqp_cmd_ptr retrieved_cmd; + if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { + LM_ERR("failed to read from command pipe: %s\n", strerror(errno)); + return; } - if(sent == 0 || (sent == 1 && cmd->type == KZ_AMQP_CMD_PUBLISH)) + + switch(cmd->type) { + case KZ_AMQP_CMD_PUBLISH: + kz_amqp_publisher_send(cmd); + lock_release(&cmd->lock); + break; + + case KZ_AMQP_CMD_CALL: + if(kz_amqp_publisher_send(cmd) < 0) { + lock_release(&cmd->lock); + } else { + if(!kz_cmd_store(cmd)) { + cmd->return_code = -1; + lock_release(&cmd->lock); + } else { + if(!kz_amqp_start_cmd_timer(cmd)) { + cmd->return_code = -1; + lock_release(&cmd->lock); + } + } + } + break; + + case KZ_AMQP_CMD_TARGETED_CONSUMER: + retrieved_cmd = kz_cmd_retrieve(cmd->message_id); + if(retrieved_cmd == NULL) { + LM_ERR("amqp message id %.*s not found.\n", cmd->message_id->len, cmd->message_id->s); + kz_amqp_free_pipe_cmd(cmd); + } else { + kz_amqp_stop_cmd_timer(retrieved_cmd); + retrieved_cmd->return_code = cmd->return_code; + retrieved_cmd->return_payload = cmd->return_payload; + cmd->return_payload = NULL; + lock_release(&retrieved_cmd->lock); + kz_amqp_free_pipe_cmd(cmd); + } + break; + + case KZ_AMQP_CMD_PUBLISH_BROADCAST: + kz_amqp_publisher_send(cmd); lock_release(&cmd->lock); + break; + + case KZ_AMQP_CMD_ASYNC_CALL: + break; + + case KZ_AMQP_CMD_COLLECT: + break; + + case KZ_AMQP_CMD_ASYNC_COLLECT: + break; + + default: + break; + + } } int kz_amqp_publisher_proc(int cmd_pipe) @@ -2029,6 +2217,73 @@ char* maybe_add_consumer_key(int server_id, amqp_bytes_t body) return payload; } +void kz_send_targeted_cmd(int server_id, amqp_bytes_t body) +{ + char buffer[100]; + char* server_id_str = NULL; + kz_amqp_cmd_ptr cmd = NULL; + json_object* JObj = NULL; + char* payload = kz_local_amqp_bytes_dup(body); + + if(payload == NULL) { + LM_ERR("error allocating message payload\n"); + goto error; + } + + json_obj_ptr json_obj = kz_json_parse(payload ); + if (json_obj == NULL) { + LM_ERR("error parsing json payload\n"); + goto error; + } + + cmd = (kz_amqp_cmd_ptr)shm_malloc(sizeof(kz_amqp_cmd)); + if(cmd == NULL) { + LM_ERR("failed to allocate kz_amqp_cmd in process %d\n", getpid()); + goto error; + } + memset(cmd, 0, sizeof(kz_amqp_cmd)); + if(lock_init(&cmd->lock)==NULL) + { + LM_ERR("cannot init the lock for targeted delivery in process %d\n", getpid()); + goto error; + } + + cmd->type = KZ_AMQP_CMD_TARGETED_CONSUMER; + cmd->return_code = AMQP_RESPONSE_NORMAL; + + JObj = kz_json_get_object(json_obj, BLF_JSON_SERVERID); + if(JObj != NULL) { + server_id_str = (char*) json_object_get_string(JObj); + sprintf(buffer, "consumer://%d/%s", server_id, server_id_str); + json_object_object_del(json_obj, BLF_JSON_SERVERID); + json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string(buffer)); + } + + cmd->return_payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj)); + + JObj = kz_json_get_object(json_obj, BLF_JSON_MSG_ID); + if(JObj != NULL) { + cmd->message_id = kz_str_dup_from_char((char*)json_object_get_string(JObj)); + } + + if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) { + LM_ERR("failed to publish message to amqp in process %d, write to command pipe: %s\n", getpid(), strerror(errno)); + } else { + cmd = NULL; + } + +error: + if(json_obj) + json_object_put(json_obj); + + if(payload) + pkg_free(payload); + + if(cmd) + kz_amqp_free_pipe_cmd(cmd); + +} + int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) { @@ -2068,7 +2323,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { /* start cleanup */ - server_ptr->channels[i].consumer = NULL; +// server_ptr->channels[i].consumer = NULL; /* end cleanup */ /* bind targeted channels */ @@ -2133,6 +2388,9 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) case AMQP_RESPONSE_NORMAL: idx = envelope.channel-1; if(idx < dbk_channels) { + kz_send_targeted_cmd(server_ptr->id, envelope.message.body); + + /* switch(server_ptr->channels[idx].state) { case KZ_AMQP_CHANNEL_CALLING: lock_get(&server_ptr->channels[idx].lock); @@ -2149,6 +2407,10 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) LM_INFO("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes); break; } + */ + + + } else { idx = idx - dbk_channels; kz_amqp_send_consumer_event_ex(maybe_add_consumer_key(server_ptr->id, envelope.message.body), diff --git a/modules/kazoo/kz_amqp.h b/modules/kazoo/kz_amqp.h index d1aec44b9d1..d74bb7e5e37 100644 --- a/modules/kazoo/kz_amqp.h +++ b/modules/kazoo/kz_amqp.h @@ -30,7 +30,12 @@ typedef enum { KZ_AMQP_CMD_PUBLISH = 1, KZ_AMQP_CMD_CALL = 2, KZ_AMQP_CMD_CONSUME = 3, - KZ_AMQP_CMD_ACK = 4 + KZ_AMQP_CMD_ACK = 4, + KZ_AMQP_CMD_TARGETED_CONSUMER = 5, + KZ_AMQP_CMD_PUBLISH_BROADCAST = 6, + KZ_AMQP_CMD_COLLECT = 7, + KZ_AMQP_CMD_ASYNC_CALL = 8, + KZ_AMQP_CMD_ASYNC_COLLECT = 9 } kz_amqp_pipe_cmd_type; typedef enum { @@ -83,10 +88,11 @@ typedef struct { } kz_amqp_conn_pool, *kz_amqp_conn_pool_ptr; +/* #define AMQP_KZ_CMD_PUBLISH 1 #define AMQP_KZ_CMD_CALL 2 #define AMQP_KZ_CMD_CONSUME 3 - +*/ typedef struct { gen_lock_t lock; @@ -98,14 +104,31 @@ typedef struct { char* queue; char* payload; char* return_payload; + str* message_id; int return_code; int consumer; int server_id; uint64_t delivery_tag; amqp_channel_t channel; struct timeval timeout; + + /* timer */ + struct event *timer_ev; + int timerfd; + } kz_amqp_cmd, *kz_amqp_cmd_ptr; +typedef struct kz_amqp_cmd_entry_t { + kz_amqp_cmd_ptr cmd; + struct kz_amqp_cmd_entry_t* next; +} kz_amqp_cmd_entry, *kz_amqp_cmd_entry_ptr; + +typedef struct kz_amqp_cmd_table_t { + kz_amqp_cmd_entry_ptr entries; + gen_lock_t lock; +} kz_amqp_cmd_table, *kz_amqp_cmd_table_ptr; + + typedef struct { char* payload; uint64_t delivery_tag; @@ -216,6 +239,8 @@ kz_amqp_zone_ptr kz_amqp_add_zone(char* zone); void kz_amqp_fire_connection_event(char *event, char* host); +void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd); + static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) { amqp_connection_close_t *mconn; diff --git a/modules/kazoo/kz_hash.c b/modules/kazoo/kz_hash.c new file mode 100644 index 00000000000..29da9bb21b7 --- /dev/null +++ b/modules/kazoo/kz_hash.c @@ -0,0 +1,182 @@ +#include "kz_hash.h" + +#include +#include +#include "../../mem/shm_mem.h" +#include "../../hashes.h" +#include "../../dprint.h" +#include "../../str.h" + +extern int dbk_command_table_size; + +kz_amqp_cmd_table_ptr kz_cmd_htable = NULL; + +int kz_hash_init() +{ + int i, j; + + if(kz_cmd_htable) + { + LM_ERR("already initialized"); + return 1; + } + + i = 0; + kz_cmd_htable = (kz_amqp_cmd_table_ptr)shm_malloc(dbk_command_table_size* sizeof(kz_amqp_cmd_table)); + if(kz_cmd_htable == NULL) + { + LM_ERR("memory error allocating command table"); + return 0; + } + memset(kz_cmd_htable, 0, dbk_command_table_size* sizeof(kz_amqp_cmd_table)); + + for(i= 0; i< dbk_command_table_size; i++) + { + if(lock_init(&kz_cmd_htable[i].lock)== 0) + { + LM_ERR("initializing lock [%d]\n", i); + goto error; + } + kz_cmd_htable[i].entries= (kz_amqp_cmd_entry_ptr)shm_malloc(sizeof(kz_amqp_cmd_entry)); + if(kz_cmd_htable[i].entries== NULL) + { + LM_ERR("memory error allocating command entry"); + return 0; + } + memset(kz_cmd_htable[i].entries, 0, sizeof(kz_amqp_cmd_entry)); + kz_cmd_htable[i].entries->next= NULL; + } + + return 1; + +error: + if(kz_cmd_htable) + { + for(j=0; j< i; j++) + { + if(kz_cmd_htable[i].entries) + shm_free(kz_cmd_htable[i].entries); + else + break; + lock_destroy(&kz_cmd_htable[i].lock); + } + shm_free(kz_cmd_htable); + } + return 0; + +} + +void kz_hash_destroy() +{ + int i; + kz_amqp_cmd_entry_ptr p, prev_p; + + if(kz_cmd_htable== NULL) + return; + + for(i= 0; i< dbk_command_table_size; i++) + { + lock_destroy(&kz_cmd_htable[i].lock); + p= kz_cmd_htable[i].entries; + while(p) + { + prev_p= p; + p= p->next; + kz_amqp_free_pipe_cmd(prev_p->cmd); + shm_free(prev_p); + } + } + shm_free(kz_cmd_htable); +} + +kz_amqp_cmd_entry_ptr kz_search_cmd_table(str* message_id, unsigned int hash_code) +{ + kz_amqp_cmd_entry_ptr p; + + LM_DBG("searching %.*s\n", message_id->len, message_id->s); + p= kz_cmd_htable[hash_code].entries->next; + while(p) + { + if(p->cmd->message_id->len== message_id->len && + strncmp(p->cmd->message_id->s, message_id->s, p->cmd->message_id->len) == 0 ) + return p; + p= p->next; + } + return NULL; +} + +int kz_cmd_store(kz_amqp_cmd_ptr cmd) +{ + unsigned int hash_code; + kz_amqp_cmd_entry_ptr p= NULL; + + hash_code = core_hash(cmd->message_id, NULL, dbk_command_table_size); + + lock_get(&kz_cmd_htable[hash_code].lock); + + p= kz_search_cmd_table(cmd->message_id, hash_code); + if(p) + { + LM_ERR("command already stored\n"); + lock_release(&kz_cmd_htable[hash_code].lock); + return 0; + } + + p = shm_malloc(sizeof(kz_amqp_cmd_entry)); + if(p== NULL) + { + lock_release(&kz_cmd_htable[hash_code].lock); + LM_ERR("memory error allocation command pointer\n"); + return 0; + } + memset(p, 0, sizeof(kz_amqp_cmd_entry)); + + p->cmd = cmd; + p->next= kz_cmd_htable[hash_code].entries->next; + kz_cmd_htable[hash_code].entries->next= p; + + lock_release(&kz_cmd_htable[hash_code].lock); + + return 1; +} + +kz_amqp_cmd_ptr kz_cmd_retrieve(str* message_id) +{ + unsigned int hash_code; + kz_amqp_cmd_entry_ptr p= NULL, prev_p= NULL; + kz_amqp_cmd_ptr cmd = NULL; + + hash_code= core_hash(message_id, NULL, dbk_command_table_size); + + lock_get(&kz_cmd_htable[hash_code].lock); + + p = kz_search_cmd_table(message_id, hash_code); + if(p== NULL) + { + LM_DBG("command pointer hash entry not found\n"); + lock_release(&kz_cmd_htable[hash_code].lock); + return NULL; + } + + prev_p = kz_cmd_htable[hash_code].entries; + while(prev_p->next) + { + if(prev_p->next== p) + break; + prev_p= prev_p->next; + } + if(prev_p->next== NULL) + { + LM_ERR("command pointer not found\n"); + lock_release(&kz_cmd_htable[hash_code].lock); + return NULL; + } + prev_p->next= p->next; + cmd = p->cmd; + shm_free(p); + lock_release(&kz_cmd_htable[hash_code].lock); + + return cmd; +} + + diff --git a/modules/kazoo/kz_hash.h b/modules/kazoo/kz_hash.h new file mode 100644 index 00000000000..18ab2b3072f --- /dev/null +++ b/modules/kazoo/kz_hash.h @@ -0,0 +1,15 @@ +#ifndef KZ_HASH_H +#define KZ_HASH_H + +#include "../../lock_ops.h" +#include "kz_amqp.h" + +int kz_hash_init(); +void kz_hash_destroy(); + +int kz_cmd_store(kz_amqp_cmd_ptr cmd); +kz_amqp_cmd_ptr kz_cmd_retrieve(str* message_id); + + +#endif +