From 0b54fb31be17a3e03200545bd3ad5896437ec771 Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Thu, 9 Jul 2015 20:54:06 +0100 Subject: [PATCH] kazoo : fix null pointer usage also some code cleanup --- modules/kazoo/kazoo.c | 2 +- modules/kazoo/kz_amqp.c | 194 +++++----------------------------------- 2 files changed, 25 insertions(+), 171 deletions(-) diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 2515582fa1b..23cbb83f6dc 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -60,7 +60,7 @@ int dbk_reconn_retries = 8; int dbk_presentity_phtable_size = 4096; int dbk_command_table_size = 2048; -int dbk_use_federated_exchange = 1; +int dbk_use_federated_exchange = 0; str dbk_federated_exchange = str_init("federation"); str dbk_primary_zone_name = str_init("local"); diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 6afaccf67e1..0086e6aaef0 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -23,8 +23,6 @@ struct tm_binds tmb; -//kz_amqp_connection_pool_ptr kz_pool = NULL; - kz_amqp_bindings_ptr kz_bindings = NULL; int bindings_count = 0; @@ -34,7 +32,6 @@ typedef struct json_object *json_obj_ptr; extern pv_spec_t kz_query_result_spec; -//kz_amqp_channel_ptr channels = NULL; extern int *kz_worker_pipes; extern int kz_cmd_pipe; @@ -156,23 +153,6 @@ static inline str* kz_local_str_dup(str* src) return dst; } -/* -static char *kz_local_amqp_string_dup(char *src) -{ - char *res; - int sz; - if (!src ) - return NULL; - - sz = strlen(src); - if (!(res = (char *) pkg_malloc(sz + 1))) - return NULL; - strncpy(res, src, sz); - res[sz] = 0; - return res; -} -*/ - char *kz_amqp_bytes_dup(amqp_bytes_t bytes) { char *res; @@ -789,68 +769,6 @@ int kz_amqp_channel_open(kz_amqp_conn_ptr rmq, amqp_channel_t channel) { return 0; } -kz_amqp_conn_ptr kz_amqp_get_connection() { - return NULL; - - /* - kz_amqp_conn_ptr ptr = NULL; - if(kz_pool == NULL) { - return NULL; - } -// lock_get(&kz_pool->lock); - - ptr = kz_pool->head; - - if(kz_pool->current != NULL) { - ptr = kz_pool->current; - } - - if(ptr->socket == NULL ) - { - while(ptr != NULL) { - if(kz_amqp_connection_open(ptr) == 0) { - kz_pool->current = ptr; - break; - } - ptr = ptr->next; - } - } - -// lock_release(&kz_pool->lock); - - return ptr; - */ -} - -kz_amqp_conn_ptr kz_amqp_get_next_connection() { - return NULL; - /* - kz_amqp_conn_ptr ptr = NULL; - if(kz_pool == NULL) { - return NULL; - } - - if(kz_pool->current != NULL) { - ptr = kz_pool->current->next; - } - - if(ptr == NULL) { - ptr = kz_pool->head; - } - - while(ptr != NULL) { - if(kz_amqp_connection_open(ptr) == 0) { - kz_pool->current = ptr; - break; - } - ptr = ptr->next; - } - - - return ptr; - */ -} - int kz_amqp_consume_error(kz_amqp_conn_ptr ptr) { amqp_connection_state_t conn = ptr->conn; @@ -1008,7 +926,6 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) if(lock_init(&cmd->lock)==NULL) { LM_ERR("cannot init the lock for publishing in process %d\n", getpid()); -// lock_dealloc(&cmd->lock); goto error; } lock_get(&cmd->lock); @@ -1175,7 +1092,6 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key int ret = -1; json_obj_ptr json_obj = NULL; kz_amqp_cmd_ptr cmd = NULL; -// json_obj_ptr json_body = NULL; unsigned int hash_index = 0; unsigned int label = 0; tm_cell_t *t = 0; @@ -1281,6 +1197,7 @@ int kz_amqp_async_query(struct sip_msg* msg, char* _exchange, char* _routing_key lock_dealloc(&cmd->lock); goto error; } + lock_get(&cmd->lock); cmd->type = KZ_AMQP_CMD_ASYNC_CALL; cmd->consumer = getpid(); if (write(kz_cmd_pipe, &cmd, sizeof(cmd)) != sizeof(cmd)) { @@ -2146,18 +2063,6 @@ int kz_amqp_start_cmd_timer(kz_amqp_cmd_ptr cmd) return 0; } -/* -void kz_amqp_stop_cmd_timer(kz_amqp_cmd_ptr cmd) -{ - if (cmd->timer_ev) { - close(cmd->timerfd); - event_del(cmd->timer_ev); - pkg_free(cmd->timer_ev); - } else { - LM_ERR("No timer for message id %.*s\n", cmd->message_id->len, cmd->message_id->s); - } -} -*/ /* check timeouts */ int kz_amqp_timeout_proc() @@ -2284,7 +2189,6 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg) int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection) { -// LM_INFO("Setting timer to reconnect to %s on port %d in %d seconds.\n", server->host, server->port, JSONRPC_RECONNECT_INTERVAL); connection->state = KZ_AMQP_CONNECTION_FAILURE; int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); @@ -2295,7 +2199,6 @@ int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection) } struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); -// CHECK_MALLOC(itime); itime->it_interval.tv_sec = 0; itime->it_interval.tv_nsec = 0; @@ -2422,7 +2325,6 @@ void kz_amqp_publisher_proc_cb(int fd, short event, void *arg) LM_DBG("amqp message id %.*s not found.\n", cmd->message_id->len, cmd->message_id->s); kz_amqp_free_pipe_cmd(cmd); } else { -// kz_amqp_stop_cmd_timer(retrieved_cmd); retrieved_cmd->return_code = cmd->return_code; retrieved_cmd->return_payload = cmd->return_payload; cmd->return_payload = NULL; @@ -2572,13 +2474,6 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ kz_amqp_consumer_delivery_ptr ptr = NULL; str* message_id = NULL; int idx = envelope->channel-1; - /* - char* payload = kz_amqp_bytes_dup(envelope->message.body); - if(!payload) { - LM_ERR("error alocating memory\n"); - return; - } - */ json_obj_ptr json_obj = kz_json_parse((char*)envelope->message.body.bytes); if (json_obj == NULL) { @@ -2605,13 +2500,6 @@ void kz_amqp_send_worker_event(int _kz_server_id, amqp_envelope_t* envelope, kz_ cmd = kz_cmd_retrieve(message_id); if(cmd) cmd->return_code = AMQP_RESPONSE_NORMAL; - - /* - if(cmd != NULL) { - cmd->return_code = 0; - cmd->return_payload = kz_amqp_string_dup((char*)json_object_to_json_string(json_obj)); - } - */ } } @@ -2669,34 +2557,26 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) if(server_ptr->zone == kz_amqp_get_primary_zone()) channel_base = dbk_channels; -// server_ptr->consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn)); -// if(server_ptr->consumer == NULL) consumer = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn)); if(consumer == NULL) { LM_ERR("NO MORE PACKAGE MEMORY\n"); return 1; } -// memset(server_ptr->consumer, 0, sizeof(kz_amqp_conn)); -// server_ptr->consumer->server = server_ptr; memset(consumer, 0, sizeof(kz_amqp_conn)); consumer->server = server_ptr; consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count); -// server_ptr->consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count); -// if(server_ptr->consumer_channels == NULL) if(consumer_channels == NULL) { LM_ERR("NO MORE PACKAGE MEMORY\n"); return 1; } for(i=0; i < bindings_count; i++) -// server_ptr->consumer_channels[i].channel = dbk_channels + i + 1; consumer_channels[i].channel = channel_base + i + 1; while(1) { OK = 1; -// if(kz_amqp_connection_open(server_ptr->consumer)) { if(kz_amqp_connection_open(consumer)) { LM_ERR("Error opening connection\n"); sleep(3); @@ -2706,25 +2586,17 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) /* reset channels */ + /* bind targeted channels */ for(i=0,channel_res=0; i < channel_base && channel_res == 0; i++) { - /* start cleanup */ -// server_ptr->channels[i].consumer = NULL; - /* end cleanup */ - - /* bind targeted channels */ -// channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->channels[i].channel); channel_res = kz_amqp_channel_open(consumer, server_ptr->channels[i].channel); if(channel_res == 0) { -// kz_amqp_bind_targeted_channel(server_ptr->consumer, i); kz_amqp_bind_targeted_channel(consumer, i); } } + /* cleanup consumer channels */ for(i=0,channel_res=0; i < bindings_count && channel_res == 0; i++) { - /* start cleanup */ -// server_ptr->consumer_channels[i].consumer = NULL; consumer_channels[i].consumer = NULL; - /* end cleanup */ } i = 0; @@ -2733,12 +2605,9 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) kz_amqp_binding_ptr binding = kz_bindings->head; while(binding != NULL && OK) { if(binding->bind->federate || server_ptr->zone == kz_amqp_get_primary_zone()) { -// channel_res = kz_amqp_channel_open(server_ptr->consumer, server_ptr->consumer_channels[i].channel); channel_res = kz_amqp_channel_open(consumer, consumer_channels[i].channel); if(channel_res == 0) { -// kz_amqp_bind_consumer(server_ptr->consumer, binding->bind, i, server_ptr->consumer_channels); kz_amqp_bind_consumer(consumer, binding->bind, i, consumer_channels); -// server_ptr->consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED; consumer_channels[i].state = KZ_AMQP_CHANNEL_BINDED; i++; } else { @@ -2755,9 +2624,7 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) while(OK) { payload = NULL; amqp_envelope_t envelope; -// amqp_maybe_release_buffers(server_ptr->consumer->conn); amqp_maybe_release_buffers(consumer->conn); -// amqp_rpc_reply_t reply = amqp_consume_message(server_ptr->consumer->conn, &envelope, NULL, 0); amqp_rpc_reply_t reply = amqp_consume_message(consumer->conn, &envelope, NULL, 0); switch(reply.reply_type) { case AMQP_RESPONSE_LIBRARY_EXCEPTION: @@ -2770,7 +2637,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) break; case AMQP_STATUS_UNEXPECTED_STATE: LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n"); -// OK = kz_amqp_consume_error(server_ptr->consumer); OK = kz_amqp_consume_error(consumer); break; default: @@ -2785,36 +2651,15 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) kz_amqp_send_worker_event(server_ptr->id, &envelope, NULL); } else { idx = idx - channel_base; -// if(!server_ptr->consumer_channels[idx].consumer->no_ack ) { if(!consumer_channels[idx].consumer->no_ack ) { -// if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) { if(amqp_basic_ack(consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) { LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n"); OK = 0; } } if(OK) -// kz_amqp_send_worker_event(server_ptr->id, &envelope, server_ptr->consumer_channels[idx].consumer); kz_amqp_send_worker_event(server_ptr->id, &envelope, consumer_channels[idx].consumer); } - /* - idx = envelope.channel-1; - if(idx < dbk_channels) { - kz_send_targeted_cmd(server_ptr->id, envelope.message.body); - } else { - idx = idx - dbk_channels; - kz_amqp_send_consumer_event_ex(maybe_add_consumer_key(server_ptr->id, envelope.message.body), - kz_amqp_bytes_dup(server_ptr->consumer_channels[idx].consumer->event_key), - kz_amqp_bytes_dup(server_ptr->consumer_channels[idx].consumer->event_subkey), - 0, 0, 1); - if(!server_ptr->consumer_channels[idx].consumer->no_ack ) { - if(amqp_basic_ack(server_ptr->consumer->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) { - LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n"); - OK = 0; - } - } - } - */ break; case AMQP_RESPONSE_SERVER_EXCEPTION: LM_ERR("AMQP_RESPONSE_SERVER_EXCEPTION in consume\n"); @@ -2829,7 +2674,6 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) amqp_destroy_envelope(&envelope); } -// kz_amqp_connection_close(server_ptr->consumer); kz_amqp_connection_close(consumer); } @@ -2838,27 +2682,37 @@ int kz_amqp_consumer_proc(kz_amqp_server_ptr server_ptr) void kz_amqp_consumer_worker_cb(int fd, short event, void *arg) { - kz_amqp_consumer_delivery_ptr cmd; - if (read(fd, &cmd, sizeof(cmd)) != sizeof(cmd)) { + kz_amqp_cmd_ptr cmd = NULL; + kz_amqp_consumer_delivery_ptr Evt; + if (read(fd, &Evt, sizeof(Evt)) != sizeof(Evt)) { LM_ERR("failed to read from command pipe: %s\n", strerror(errno)); return; } - LM_DBG("consumer %d received payload %s\n", my_pid(), cmd->payload); + LM_DBG("consumer %d received payload %s\n", my_pid(), Evt->payload); - if(cmd->cmd) { - if(cmd->cmd->return_code == AMQP_RESPONSE_NORMAL) { - kz_amqp_set_last_result(cmd->payload); - kz_amqp_cb_ok(cmd->cmd); + if(Evt->cmd) { + cmd =Evt->cmd; + if(cmd->type == KZ_AMQP_CMD_ASYNC_CALL ) { + if(cmd->return_code == AMQP_RESPONSE_NORMAL) { + kz_amqp_set_last_result(Evt->payload); + kz_amqp_cb_ok(cmd); + } else { + kz_amqp_reset_last_result(); + kz_amqp_cb_error(cmd); + LM_DBG("run error exiting consumer %d\n", my_pid()); + } } else { - kz_amqp_reset_last_result(); - kz_amqp_cb_error(cmd->cmd); + cmd->return_payload = Evt->payload; + Evt->payload = NULL; + Evt->cmd = NULL; + lock_release(&cmd->lock); } } else { - kz_amqp_consumer_event(cmd->payload, cmd->event_key, cmd->event_subkey); + kz_amqp_consumer_event(Evt->payload, Evt->event_key, Evt->event_subkey); } - kz_amqp_free_consumer_delivery(cmd); + kz_amqp_free_consumer_delivery(Evt); LM_DBG("exiting consumer %d\n", my_pid()); }