From b9e5b9181c0f9c315e0f27ad96f69d5ca8cafba3 Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Mon, 23 Feb 2015 20:55:57 +0000 Subject: [PATCH] kazoo - fix timeouts timeout should be ms not sec set time before state so timeout check doesn't set timeout immediately separate proc for handling timeouts --- modules/kazoo/kazoo.c | 67 ++++++---- modules/kazoo/kz_amqp.c | 267 +++++++++++++++++++++++++++++++++++++--- modules/kazoo/kz_amqp.h | 4 + 3 files changed, 292 insertions(+), 46 deletions(-) diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 2f86e5ecc26..deaf8ad283c 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -269,8 +269,8 @@ static int mod_init(void) { } - int total_workers = dbk_consumer_processes + 1; - int total_pipes = total_workers + 1; + int total_workers = dbk_consumer_processes + 3; + int total_pipes = total_workers; kz_pipe_fds = (int*) shm_malloc(sizeof(int) * (total_pipes) * 2 ); for(i=0; i < total_pipes; i++) { @@ -310,45 +310,60 @@ static int mod_child_init(int rank) if (rank==PROC_MAIN) { - pid=fork_process(1, "AMQP Manager", 1); + pid=fork_process(1, "AMQP Publisher", 1); if (pid<0) return -1; /* error */ if(pid==0){ - kz_amqp_manager_loop(0); + kz_amqp_publisher_proc(0); } else { - for(i=0; i < dbk_consumer_processes; i++) { - pid=fork_process(i+2, "AMQP Consumer", 1); + pid=fork_process(2, "AMQP Consumer", 1); + if (pid<0) + return -1; /* error */ + if(pid==0){ + kz_amqp_consumer_proc(1); + } + else { + pid=fork_process(3, "AMQP Timer", 1); if (pid<0) return -1; /* error */ if(pid==0){ - mod_consumer_proc(i+1); + kz_amqp_timeout_proc(2); + } + else { + for(i=0; i < dbk_consumer_processes; i++) { + pid=fork_process(i+4, "AMQP Consumer Worker", 1); + if (pid<0) + return -1; /* error */ + if(pid==0){ + mod_consumer_proc(i+3); + } + } } } } - return 0; } - if(dbk_pua_mode == 1) { - if (kz_pa_dbf.init==0) - { - LM_CRIT("child_init: database not bound\n"); - return -1; - } - kz_pa_db = kz_pa_dbf.init(&kz_db_url); - if (!kz_pa_db) - { - LM_ERR("child %d: unsuccessful connecting to database\n", rank); - return -1; - } + if(dbk_pua_mode == 1) { + if (kz_pa_dbf.init==0) + { + LM_CRIT("child_init: database not bound\n"); + return -1; + } + kz_pa_db = kz_pa_dbf.init(&kz_db_url); + if (!kz_pa_db) + { + LM_ERR("child %d: unsuccessful connecting to database\n", rank); + return -1; + } - if (kz_pa_dbf.use_table(kz_pa_db, &kz_presentity_table) < 0) - { - LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank); - return -1; - } - LM_DBG("child %d: Database connection opened successfully\n", rank); + if (kz_pa_dbf.use_table(kz_pa_db, &kz_presentity_table) < 0) + { + LM_ERR( "child %d:unsuccessful use_table presentity_table\n", rank); + return -1; + } + LM_DBG("child %d: Database connection opened successfully\n", rank); } return 0; diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 29028d4c12d..e25f0122c45 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -849,8 +849,8 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha pv_value_t pv_val; if(pv_get_spec_value( msg, &kz_query_timeout_spec, &pv_val) == 0) { if((pv_val.flags & PV_VAL_INT) && pv_val.ri != 0 ) { - kz_timeout.tv_usec = 0; - kz_timeout.tv_sec = pv_val.ri; + kz_timeout.tv_usec = (pv_val.ri % 1000) * 1000; + kz_timeout.tv_sec = pv_val.ri / 1000; LM_DBG("SET TIMEOUT TO %i\n", (int) kz_timeout.tv_sec); } } @@ -1292,7 +1292,7 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann amqp_bytes_t exchange; amqp_bytes_t routing_key; amqp_bytes_t payload; - int ret = 1; + int ret = -1; json_obj_ptr json_obj = NULL; amqp_basic_properties_t props; @@ -1327,9 +1327,9 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann if ( kz_amqp_error("Publishing", amqp_get_rpc_reply(kz_conn->conn)) ) { LM_ERR("Failed to publish\n"); - ret = -1; goto error; } + gettimeofday(&channels[idx].timer, NULL); channels[idx].state = state; channels[idx].cmd = cmd; @@ -1458,7 +1458,6 @@ void kz_amqp_consumer_loop(int child_no) LM_DBG("starting consumer %d\n", child_no); close(kz_pipe_fds[child_no*2+1]); int data_pipe = kz_pipe_fds[child_no*2]; - int back_idx = (dbk_consumer_processes+1)*2+1; fd_set fdset; int selret; @@ -1478,15 +1477,6 @@ void kz_amqp_consumer_loop(int child_no) if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) { LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload); kz_amqp_consumer_event(child_no, ptr->payload, ptr->event_key, ptr->event_subkey); - if(ptr->channel > 0 && ptr->delivery_tag > 0) { - kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd(); - cmd->type = KZ_AMQP_ACK; - cmd->channel = ptr->channel; - cmd->delivery_tag = ptr->delivery_tag; - if (write(kz_pipe_fds[back_idx], &cmd, sizeof(cmd)) != sizeof(cmd)) { - LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno)); - } - } kz_amqp_free_consumer_delivery(ptr); } } @@ -1521,7 +1511,7 @@ void kz_amqp_send_consumer_event_ex(char* payload, char* event_key, char* event_ ptr->payload = payload; ptr->event_key = event_key; ptr->event_subkey = event_subkey; - if (write(kz_pipe_fds[consumer*2+1], &ptr, sizeof(ptr)) != sizeof(ptr)) { + if (write(kz_pipe_fds[(consumer+2)*2+1], &ptr, sizeof(ptr)) != sizeof(ptr)) { LM_ERR("failed to send payload to consumer %d : %s\nPayload %s\n", consumer, strerror(errno), payload); } @@ -1680,7 +1670,7 @@ void kz_amqp_manager_loop(int child_no) } else { cmd->return_code = -1; OK = INTERNAL_READ = CONSUME = 0; - LM_ERR("ERROR SENDING PUBLISH"); + LM_ERR("ERROR SENDING PUBLISH\n"); } channels[idx].state = KZ_AMQP_FREE; channels[idx].cmd = NULL; @@ -1693,10 +1683,8 @@ void kz_amqp_manager_loop(int child_no) channels[idx].cmd = NULL; cmd->return_code = -1; lock_release(&cmd->lock); - LM_ERR("ERROR SENDING QUERY"); + LM_ERR("ERROR SENDING QUERY\n"); OK = INTERNAL_READ = CONSUME = 0; - } else { - gettimeofday(&channels[idx].timer, NULL); } break; default: @@ -1794,7 +1782,7 @@ void kz_amqp_manager_loop(int child_no) cmd->return_code = -1; lock_release(&cmd->lock); // rebind ?? - LM_ERR("QUERY TIMEOUT"); + LM_ERR("QUERY TIMEOUT\n"); } } } @@ -1804,3 +1792,242 @@ void kz_amqp_manager_loop(int child_no) kz_amqp_fire_connection_event("closed", kzconn->info.host); } } + +/* check timeouts */ +void kz_amqp_timeout_proc(int child_no) +{ + kz_amqp_cmd_ptr cmd; + int i; + while(1) { + struct timeval now; + gettimeofday(&now, NULL); + for(i=0; i < dbk_channels; i++) { + if(channels[i].state == KZ_AMQP_CALLING + && channels[i].cmd != NULL + && check_timeout(&now, &channels[i].timer, &channels[i].cmd->timeout)) { + cmd = channels[i].cmd; + LM_DBG("Kazoo Query timeout - %s\n", cmd->payload); + channels[i].state = KZ_AMQP_FREE; + channels[i].cmd = NULL; + cmd->return_code = -1; + lock_release(&cmd->lock); + } + } + } +} + +void kz_amqp_publisher_proc(int child_no) +{ + LM_DBG("starting manager %d\n", child_no); + close(kz_pipe_fds[child_no*2+1]); + int data_pipe = kz_pipe_fds[child_no*2]; + fd_set fdset; + int idx, i; + int selret; + int OK; + kz_amqp_conn_ptr kzconn; + kz_amqp_cmd_ptr cmd; + int channel_res; + + + while(1) { + OK = 1; + while(1) { + kzconn = kz_amqp_get_next_connection(); + if(kzconn != NULL) + break; + LM_DBG("Connection failed : all servers down?\n"); + sleep(3); + } + + kz_amqp_fire_connection_event("open", kzconn->info.host); + + for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { + /* start cleanup */ + channels[i].state = KZ_AMQP_CLOSED; + /* end cleanup */ + + /* bind targeted channels */ + channel_res = kz_amqp_channel_open(kzconn, channels[i].channel); + if(channel_res == 0) { + channels[i].state = KZ_AMQP_FREE; + } + + } + while(OK) { + FD_ZERO(&fdset); + FD_SET(data_pipe, &fdset); + selret = select(FD_SETSIZE, &fdset, NULL, NULL, &kz_sock_tv); + if (selret < 0) { + LM_ERR("select() failed: %s\n", strerror(errno)); + continue; + } else if (!selret) { + continue; + } else { + if(FD_ISSET(data_pipe, &fdset) && read(data_pipe, &cmd, sizeof(cmd)) == sizeof(cmd)) { + switch (cmd->type) { + case KZ_AMQP_PUBLISH: + idx = kz_amqp_send(kzconn, cmd); + if(idx >= 0) { + cmd->return_code = AMQP_RESPONSE_NORMAL; + } else { + cmd->return_code = -1; + OK = 0; + LM_ERR("ERROR SENDING PUBLISH\n"); + } + channels[idx].state = KZ_AMQP_FREE; + channels[idx].cmd = NULL; + lock_release(&cmd->lock); + break; + case KZ_AMQP_CALL: + idx = kz_amqp_send_receive(kzconn, cmd); + if(idx < 0) { + channels[idx].state = KZ_AMQP_FREE; + channels[idx].cmd = NULL; + cmd->return_code = -1; + lock_release(&cmd->lock); + LM_ERR("ERROR SENDING QUERY\n"); + OK = 0; + } + break; + default: + LM_DBG("unknown pipe cmd %d\n", cmd->type); + break; + } + } + } + } + kz_amqp_connection_close(kzconn); + kz_amqp_fire_connection_event("closed", kzconn->info.host); + } +} + +void kz_amqp_consumer_proc(int child_no) +{ + LM_DBG("starting consumer %d\n", child_no); + close(kz_pipe_fds[child_no*2+1]); + int i, idx; + int OK; + char* payload; + int channel_res; + kz_amqp_conn_ptr kzconn; + kz_amqp_cmd_ptr cmd; + + while(1) { + OK = 1; + while(1) { + kzconn = kz_amqp_get_next_connection(); + if(kzconn != NULL) + break; + LM_DBG("Connection failed : all servers down?\n"); + sleep(3); + } + + kz_amqp_fire_connection_event("open", kzconn->info.host); + + /* reset channels */ + + for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { + /* start cleanup */ + channels[i].state = KZ_AMQP_CLOSED; + channels[i].consumer = NULL; + if(channels[i].targeted != NULL) { + kz_amqp_free_bind(channels[i].targeted); + channels[i].targeted = NULL; + } + cmd = channels[i].cmd; + if(cmd != NULL) { + channels[i].cmd = NULL; + cmd->return_code = -1; + lock_release(&cmd->lock); + } + /* end cleanup */ + + /* bind targeted channels */ + channel_res = kz_amqp_channel_open(kzconn, channels[i].channel); + if(channel_res == 0) { + kz_amqp_bind_targeted_channel(kzconn, 0, i); + channels[i].state = KZ_AMQP_FREE; + } + } + channel_index = 0; + /* bind consumers */ + if(kz_bindings != NULL) { + kz_amqp_binding_ptr binding = kz_bindings->head; + while(binding != NULL) { + kz_amqp_bind_consumer(kzconn, binding->bind); + binding = binding->next; + } + } + + while(OK) { + payload = NULL; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(kzconn->conn); + amqp_rpc_reply_t reply = amqp_consume_message(kzconn->conn, &envelope, NULL, 0); + switch(reply.reply_type) { + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + switch(reply.library_error) { + case AMQP_STATUS_HEARTBEAT_TIMEOUT: + LM_ERR("AMQP_STATUS_HEARTBEAT_TIMEOUT\n"); + OK=0; + break; + case AMQP_STATUS_TIMEOUT: + break; + case AMQP_STATUS_UNEXPECTED_STATE: + LM_DBG("AMQP_STATUS_UNEXPECTED_STATE\n"); + OK = kz_amqp_consume_error(kzconn->conn); + break; + default: + OK = 0; + break; + }; + break; + + case AMQP_RESPONSE_NORMAL: + idx = envelope.channel-1; + switch(channels[idx].state) { + case KZ_AMQP_CALLING: + channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body); + channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL; + lock_release(&channels[idx].cmd->lock); + channels[idx].state = KZ_AMQP_FREE; + channels[idx].cmd = NULL; + break; + case KZ_AMQP_CONSUMING: + kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body), + kz_amqp_bytes_dup(channels[idx].consumer->event_key), + kz_amqp_bytes_dup(channels[idx].consumer->event_subkey), + channels[idx].consumer->no_ack ? 0 : envelope.channel, + channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,1); + if(!channels[idx].consumer->no_ack ) { + if(amqp_basic_ack(kzconn->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) { + LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n"); + OK = 0; + } + } + break; + default: + LM_DBG("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes); + break; + } + break; + case AMQP_RESPONSE_SERVER_EXCEPTION: + LM_ERR("AMQP_RESPONSE_SERVER_EXCEPTION in consume\n"); + OK = 0; + break; + + default: + LM_ERR("UNHANDLED AMQP_RESPONSE in consume\n"); + OK = 0; + break; + }; + amqp_destroy_envelope(&envelope); + } + + kz_amqp_connection_close(kzconn); + kz_amqp_fire_connection_event("closed", kzconn->info.host); + + } +} + diff --git a/modules/kazoo/kz_amqp.h b/modules/kazoo/kz_amqp.h index 2761c852d14..9fcf551d323 100644 --- a/modules/kazoo/kz_amqp.h +++ b/modules/kazoo/kz_amqp.h @@ -139,6 +139,10 @@ void kz_amqp_consumer_loop(int child_no); //void kz_amqp_generic_consumer_loop(int child_no); void kz_amqp_manager_loop(int child_no); +void kz_amqp_consumer_proc(int child_no); +void kz_amqp_publisher_proc(int child_no); +void kz_amqp_timeout_proc(int child_no); + int kz_pv_get_event_payload(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res); int kz_pv_get_connection_host(struct sip_msg *msg, pv_param_t *param, pv_value_t *res);