diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index deaf8ad283c..d110131dec5 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -41,7 +41,7 @@ #include "kz_trans.h" #include "kz_pua.h" -#define DBK_DEFAULT_NO_CONSUMERS 4 +#define DBK_DEFAULT_NO_CONSUMERS 8 static int mod_init(void); static int mod_child_init(int rank); @@ -232,8 +232,10 @@ static int mod_init(void) { return -1; } - kz_amqp_init(); - + if(!kz_amqp_init()) { + return -1; + } + if(dbk_pua_mode == 1) { kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0; LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s); diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index e25f0122c45..498347edf39 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -19,9 +19,9 @@ #define RET_AMQP_ERROR 2 - -kz_amqp_conn_pool_ptr kz_pool = NULL; +kz_amqp_connection_pool_ptr kz_pool = NULL; kz_amqp_bindings_ptr kz_bindings = NULL; +int bindings_count = 0; static unsigned long rpl_query_routing_key_count = 0; @@ -49,7 +49,6 @@ extern pv_spec_t kz_query_timeout_spec; const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL }; const amqp_table_t kz_amqp_empty_table = { 0, NULL }; - static char *kz_amqp_str_dup(str *src) { char *res; @@ -153,7 +152,7 @@ void kz_amqp_free_bind(kz_amqp_bind_ptr bind) shm_free(bind); } -void kz_amqp_free_connection(kz_amqp_conn_ptr conn) +void kz_amqp_free_connection(kz_amqp_connection_ptr conn) { if(!conn) return; @@ -278,12 +277,12 @@ kz_amqp_bind_ptr kz_amqp_bind_alloc(str* exchange, str* exchange_type, str* queu void kz_amqp_init_connection_pool() { if(kz_pool == NULL) { - kz_pool = (kz_amqp_conn_pool_ptr) shm_malloc(sizeof(kz_amqp_conn_pool)); - memset(kz_pool, 0, sizeof(kz_amqp_conn_pool)); + kz_pool = (kz_amqp_connection_pool_ptr) shm_malloc(sizeof(kz_amqp_connection_pool)); + memset(kz_pool, 0, sizeof(kz_amqp_connection_pool)); } } -void kz_amqp_init() { +int kz_amqp_init() { int i; kz_amqp_init_connection_pool(); if(kz_bindings == NULL) { @@ -294,9 +293,14 @@ void kz_amqp_init() { channels = shm_malloc(dbk_channels * sizeof(kz_amqp_channel)); memset(channels, 0, dbk_channels * sizeof(kz_amqp_channel)); for(i=0; i < dbk_channels; i++) { + if(lock_init(&channels[i].lock)==NULL) { + LM_ERR("could not initialize locks for channels\n"); + return 0; + } channels[i].channel = i+1; } } + return 1; } void kz_amqp_destroy() { @@ -325,9 +329,9 @@ void kz_amqp_destroy() { } if(kz_pool != NULL) { - kz_amqp_conn_ptr conn = kz_pool->head; + kz_amqp_connection_ptr conn = kz_pool->head; while(conn != NULL) { - kz_amqp_conn_ptr tofree = conn; + kz_amqp_connection_ptr tofree = conn; conn = conn->next; kz_amqp_free_connection(tofree); } @@ -342,7 +346,7 @@ static char* KZ_URL_ROOT = "/"; int kz_amqp_add_connection(modparam_t type, void* val) { - kz_amqp_init_connection_pool(); // find a better way + kz_amqp_init_connection_pool(); char* url = (char*) val; int len = strlen(url); @@ -351,8 +355,8 @@ int kz_amqp_add_connection(modparam_t type, void* val) return -1; } - kz_amqp_conn_ptr newConn = shm_malloc(sizeof(kz_amqp_conn)); - memset(newConn, 0, sizeof(kz_amqp_conn)); + kz_amqp_connection_ptr newConn = shm_malloc(sizeof(kz_amqp_connection)); + memset(newConn, 0, sizeof(kz_amqp_connection)); newConn->url = shm_malloc( (KZ_URL_MAX_SIZE + 1) * sizeof(char) ); memset(newConn->url, 0, (KZ_URL_MAX_SIZE + 1) * sizeof(char)); @@ -432,19 +436,19 @@ int kz_amqp_connection_open(kz_amqp_conn_ptr rmq) { goto error; } - if (amqp_socket_open(rmq->socket, rmq->info.host, rmq->info.port)) { + if (amqp_socket_open(rmq->socket, rmq->info->info.host, rmq->info->info.port)) { LM_DBG("Failed to open TCP socket to AMQP broker\n"); goto error; } if (kz_amqp_error("Logging in", amqp_login(rmq->conn, - rmq->info.vhost, + rmq->info->info.vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, - rmq->info.user, - rmq->info.password))) { + rmq->info->info.user, + rmq->info->info.password))) { LM_ERR("Login to AMQP broker failed!\n"); goto error; @@ -473,6 +477,9 @@ int kz_amqp_channel_open(kz_amqp_conn_ptr rmq, amqp_channel_t channel) { } kz_amqp_conn_ptr kz_amqp_get_connection() { + return NULL; + + /* kz_amqp_conn_ptr ptr = NULL; if(kz_pool == NULL) { return NULL; @@ -499,9 +506,12 @@ kz_amqp_conn_ptr kz_amqp_get_connection() { // lock_release(&kz_pool->lock); return ptr; + */ } kz_amqp_conn_ptr kz_amqp_get_next_connection() { + return NULL; + /* kz_amqp_conn_ptr ptr = NULL; if(kz_pool == NULL) { return NULL; @@ -525,8 +535,44 @@ kz_amqp_conn_ptr kz_amqp_get_next_connection() { return ptr; + */ } +int kz_amqp_open_next_connection(kz_amqp_conn_ptr ptr) { + if(ptr == NULL) { + return -1; + } + + if(kz_pool == NULL) { + return -2; + } + + if(ptr->info == NULL) { + ptr->info = kz_pool->head; + } else { + ptr->info = ptr->info->next; + if(ptr->info == NULL) { + ptr->info = kz_pool->head; + } + } + + while(ptr->conn == NULL) { + if(kz_amqp_connection_open(ptr) == 0) { + break; + } + ptr->info = ptr->info->next; + if(ptr->info == NULL) { + LM_INFO("all connections tried, restarting from head\n"); + sleep(3); + ptr->info = kz_pool->head; + } + } + + + return 0; +} + + int kz_amqp_consume_error(amqp_connection_state_t conn) { amqp_frame_t frame; @@ -851,7 +897,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha if((pv_val.flags & PV_VAL_INT) && pv_val.ri != 0 ) { kz_timeout.tv_usec = (pv_val.ri % 1000) * 1000; kz_timeout.tv_sec = pv_val.ri / 1000; - LM_DBG("SET TIMEOUT TO %i\n", (int) kz_timeout.tv_sec); + LM_DBG("setting timeout to %i,%i\n", (int) kz_timeout.tv_sec, (int) kz_timeout.tv_usec); } } } @@ -951,6 +997,7 @@ int kz_amqp_subscribe_simple(struct sip_msg* msg, char* exchange, char* exchange kz_bindings->tail = binding; binding->bind = bind; + bindings_count++; return 1; @@ -1058,6 +1105,7 @@ int kz_amqp_subscribe(struct sip_msg* msg, char* payload) kz_bindings->tail = binding; binding->bind = bind; + bindings_count++; if(json_obj != NULL) json_object_put(json_obj); @@ -1160,7 +1208,7 @@ int get_channel_index() { return n; } if(channel_index == 0) { - LM_ERR("max channels (%d) reached. please exit kazoo and change db_kazoo amqp_max_channels param", dbk_channels); + LM_ERR("max channels (%d) reached. please exit kamailio and change kazoo amqp_max_channels param", dbk_channels); return -1; } channel_index = 0; @@ -1241,6 +1289,48 @@ int kz_amqp_bind_targeted_channels(kz_amqp_conn_ptr kz_conn , int loopcount) return 0; } +int kz_amqp_bind_consumer_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind, int idx, kz_amqp_channel_ptr chan) +{ + int ret = -1; + + amqp_queue_declare(kz_conn->conn, chan[idx].channel, bind->queue, bind->passive, bind->durable, bind->exclusive, bind->auto_delete, kz_amqp_empty_table); + if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + amqp_exchange_declare(kz_conn->conn, chan[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); + if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + LM_DBG("QUEUE BIND\n"); + if (amqp_queue_bind(kz_conn->conn, chan[idx].channel, bind->queue, bind->exchange, bind->routing_key, kz_amqp_empty_table) < 0 + || kz_amqp_error("Binding queue", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + LM_DBG("BASIC CONSUME\n"); + if (amqp_basic_consume(kz_conn->conn, chan[idx].channel, bind->queue, kz_amqp_empty_bytes, 0, bind->no_ack, 0, kz_amqp_empty_table) < 0 + || kz_amqp_error("Consuming", amqp_get_rpc_reply(kz_conn->conn))) + { + ret = -RET_AMQP_ERROR; + goto error; + } + + chan[idx].state = KZ_AMQP_CONSUMING; + chan[idx].consumer = bind; + ret = idx; + error: + + return ret; +} + int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind) { @@ -1286,7 +1376,6 @@ int kz_amqp_bind_consumer(kz_amqp_conn_ptr kz_conn, kz_amqp_bind_ptr bind) return ret; } - int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_channel_state state, int idx) { amqp_bytes_t exchange; @@ -1314,7 +1403,7 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann json_obj = kz_json_parse(cmd->payload); if (json_obj == NULL) - goto error; + goto error; if(kz_json_get_object(json_obj, BLF_JSON_SERVERID) == NULL) { json_object_object_add(json_obj, BLF_JSON_SERVERID, json_object_new_string((char*)channels[idx].targeted->routing_key.bytes)); @@ -1322,11 +1411,16 @@ int kz_amqp_send_ex(kz_amqp_conn_ptr kz_conn, kz_amqp_cmd_ptr cmd, kz_amqp_chann payload = amqp_bytes_malloc_dup(amqp_cstring_bytes((char*)json_object_to_json_string(json_obj))); } - - amqp_basic_publish(kz_conn->conn, channels[idx].channel, exchange, routing_key, 0, 0, &props, payload); + int amqpres = amqp_basic_publish(kz_conn->conn, channels[idx].channel, exchange, routing_key, 0, 0, &props, payload); + if ( amqpres != AMQP_STATUS_OK ) { + LM_ERR("Failed to publish\n"); + ret = -1; + goto error; + } if ( kz_amqp_error("Publishing", amqp_get_rpc_reply(kz_conn->conn)) ) { LM_ERR("Failed to publish\n"); + ret = -1; goto error; } gettimeofday(&channels[idx].timer, NULL); @@ -1458,6 +1552,7 @@ void kz_amqp_consumer_loop(int child_no) LM_DBG("starting consumer %d\n", child_no); close(kz_pipe_fds[child_no*2+1]); int data_pipe = kz_pipe_fds[child_no*2]; +// int back_idx = (dbk_consumer_processes+1)*2+1; fd_set fdset; int selret; @@ -1477,6 +1572,17 @@ void kz_amqp_consumer_loop(int child_no) if(read(data_pipe, &ptr, sizeof(ptr)) == sizeof(ptr)) { LM_DBG("consumer %d received payload %s\n", child_no, ptr->payload); kz_amqp_consumer_event(child_no, ptr->payload, ptr->event_key, ptr->event_subkey); + /* + if(ptr->channel > 0 && ptr->delivery_tag > 0) { + kz_amqp_cmd_ptr cmd = kz_amqp_alloc_pipe_cmd(); + cmd->type = KZ_AMQP_ACK; + cmd->channel = ptr->channel; + cmd->delivery_tag = ptr->delivery_tag; + if (write(kz_pipe_fds[back_idx], &cmd, sizeof(cmd)) != sizeof(cmd)) { + LM_ERR("failed to send ack to AMQP Manager in process %d, write to command pipe: %s\n", getpid(), strerror(errno)); + } + } + */ kz_amqp_free_consumer_delivery(ptr); } } @@ -1577,7 +1683,7 @@ void kz_amqp_manager_loop(int child_no) sleep(3); } - kz_amqp_fire_connection_event("open", kzconn->info.host); + kz_amqp_fire_connection_event("open", kzconn->info->info.host); loopcount++; for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { @@ -1670,7 +1776,7 @@ void kz_amqp_manager_loop(int child_no) } else { cmd->return_code = -1; OK = INTERNAL_READ = CONSUME = 0; - LM_ERR("ERROR SENDING PUBLISH\n"); + LM_ERR("ERROR SENDING PUBLISH"); } channels[idx].state = KZ_AMQP_FREE; channels[idx].cmd = NULL; @@ -1683,8 +1789,10 @@ void kz_amqp_manager_loop(int child_no) channels[idx].cmd = NULL; cmd->return_code = -1; lock_release(&cmd->lock); - LM_ERR("ERROR SENDING QUERY\n"); + LM_ERR("ERROR SENDING QUERY"); OK = INTERNAL_READ = CONSUME = 0; + } else { + gettimeofday(&channels[idx].timer, NULL); } break; default: @@ -1782,17 +1890,18 @@ void kz_amqp_manager_loop(int child_no) cmd->return_code = -1; lock_release(&cmd->lock); // rebind ?? - LM_ERR("QUERY TIMEOUT\n"); + LM_ERR("QUERY TIMEOUT"); } } } firstLoop = 0; } kz_amqp_connection_close(kzconn); - kz_amqp_fire_connection_event("closed", kzconn->info.host); + kz_amqp_fire_connection_event("closed", kzconn->info->info.host); } } + /* check timeouts */ void kz_amqp_timeout_proc(int child_no) { @@ -1800,17 +1909,22 @@ void kz_amqp_timeout_proc(int child_no) int i; while(1) { struct timeval now; - gettimeofday(&now, NULL); for(i=0; i < dbk_channels; i++) { + gettimeofday(&now, NULL); if(channels[i].state == KZ_AMQP_CALLING && channels[i].cmd != NULL && check_timeout(&now, &channels[i].timer, &channels[i].cmd->timeout)) { - cmd = channels[i].cmd; - LM_DBG("Kazoo Query timeout - %s\n", cmd->payload); - channels[i].state = KZ_AMQP_FREE; - channels[i].cmd = NULL; - cmd->return_code = -1; - lock_release(&cmd->lock); + lock_get(&channels[i].lock); + if(channels[i].cmd != NULL) + { + cmd = channels[i].cmd; + LM_DBG("Kazoo Query timeout - %s\n", cmd->payload); + cmd->return_code = -1; + lock_release(&cmd->lock); + channels[i].cmd = NULL; + channels[i].state = KZ_AMQP_FREE; + } + lock_release(&channels[i].lock); } } } @@ -1829,22 +1943,23 @@ void kz_amqp_publisher_proc(int child_no) kz_amqp_cmd_ptr cmd; int channel_res; + kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn)); + memset(kzconn, 0, sizeof(kz_amqp_conn)); while(1) { OK = 1; - while(1) { - kzconn = kz_amqp_get_next_connection(); - if(kzconn != NULL) - break; - LM_DBG("Connection failed : all servers down?\n"); - sleep(3); - } - - kz_amqp_fire_connection_event("open", kzconn->info.host); + kz_amqp_open_next_connection(kzconn); + kz_amqp_fire_connection_event("open", kzconn->info->info.host); for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { /* start cleanup */ channels[i].state = KZ_AMQP_CLOSED; + cmd = channels[i].cmd; + if(cmd != NULL) { + channels[i].cmd = NULL; + cmd->return_code = -1; + lock_release(&cmd->lock); + } /* end cleanup */ /* bind targeted channels */ @@ -1873,7 +1988,7 @@ void kz_amqp_publisher_proc(int child_no) } else { cmd->return_code = -1; OK = 0; - LM_ERR("ERROR SENDING PUBLISH\n"); + LM_ERR("ERROR SENDING PUBLISH"); } channels[idx].state = KZ_AMQP_FREE; channels[idx].cmd = NULL; @@ -1886,7 +2001,7 @@ void kz_amqp_publisher_proc(int child_no) channels[idx].cmd = NULL; cmd->return_code = -1; lock_release(&cmd->lock); - LM_ERR("ERROR SENDING QUERY\n"); + LM_ERR("ERROR SENDING QUERY"); OK = 0; } break; @@ -1898,7 +2013,7 @@ void kz_amqp_publisher_proc(int child_no) } } kz_amqp_connection_close(kzconn); - kz_amqp_fire_connection_event("closed", kzconn->info.host); + kz_amqp_fire_connection_event("closed", kzconn->info->info.host); } } @@ -1911,52 +2026,55 @@ void kz_amqp_consumer_proc(int child_no) char* payload; int channel_res; kz_amqp_conn_ptr kzconn; - kz_amqp_cmd_ptr cmd; + kz_amqp_channel_ptr consumer_channels = NULL; + + kzconn = (kz_amqp_conn_ptr)pkg_malloc(sizeof(kz_amqp_conn)); + memset(kzconn, 0, sizeof(kz_amqp_conn)); + + consumer_channels = (kz_amqp_channel_ptr)pkg_malloc(sizeof(kz_amqp_channel)*bindings_count); + for(i=0; i < bindings_count; i++) + consumer_channels[i].channel = dbk_channels + i + 1; while(1) { OK = 1; - while(1) { - kzconn = kz_amqp_get_next_connection(); - if(kzconn != NULL) - break; - LM_DBG("Connection failed : all servers down?\n"); - sleep(3); - } - - kz_amqp_fire_connection_event("open", kzconn->info.host); + kz_amqp_open_next_connection(kzconn); + kz_amqp_fire_connection_event("open", kzconn->info->info.host); /* reset channels */ for(i=0,channel_res=0; i < dbk_channels && channel_res == 0; i++) { /* start cleanup */ - channels[i].state = KZ_AMQP_CLOSED; channels[i].consumer = NULL; if(channels[i].targeted != NULL) { kz_amqp_free_bind(channels[i].targeted); channels[i].targeted = NULL; } - cmd = channels[i].cmd; - if(cmd != NULL) { - channels[i].cmd = NULL; - cmd->return_code = -1; - lock_release(&cmd->lock); - } /* end cleanup */ /* bind targeted channels */ channel_res = kz_amqp_channel_open(kzconn, channels[i].channel); if(channel_res == 0) { kz_amqp_bind_targeted_channel(kzconn, 0, i); - channels[i].state = KZ_AMQP_FREE; } } - channel_index = 0; + + for(i=0,channel_res=0; i < bindings_count && channel_res == 0; i++) { + /* start cleanup */ + consumer_channels[i].consumer = NULL; + /* end cleanup */ + + /* bind targeted channels */ + channel_res = kz_amqp_channel_open(kzconn, consumer_channels[i].channel); + } + + i = 0; /* bind consumers */ if(kz_bindings != NULL) { kz_amqp_binding_ptr binding = kz_bindings->head; while(binding != NULL) { - kz_amqp_bind_consumer(kzconn, binding->bind); + kz_amqp_bind_consumer_ex(kzconn, binding->bind, i, consumer_channels); binding = binding->next; + i++; } } @@ -1986,30 +2104,35 @@ void kz_amqp_consumer_proc(int child_no) case AMQP_RESPONSE_NORMAL: idx = envelope.channel-1; - switch(channels[idx].state) { - case KZ_AMQP_CALLING: - channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body); - channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL; - lock_release(&channels[idx].cmd->lock); - channels[idx].state = KZ_AMQP_FREE; - channels[idx].cmd = NULL; - break; - case KZ_AMQP_CONSUMING: + if(idx < dbk_channels) { + switch(channels[idx].state) { + case KZ_AMQP_CALLING: + lock_get(&channels[idx].lock); + if(channels[idx].cmd != NULL) { + channels[idx].cmd->return_payload = kz_amqp_bytes_dup(envelope.message.body); + channels[idx].cmd->return_code = AMQP_RESPONSE_NORMAL; + lock_release(&channels[idx].cmd->lock); + channels[idx].cmd = NULL; + channels[idx].state = KZ_AMQP_FREE; + } + lock_release(&channels[idx].lock); + break; + default: + LM_DBG("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes); + break; + } + } else { + idx = idx - dbk_channels; kz_amqp_send_consumer_event_ex(kz_amqp_bytes_dup(envelope.message.body), - kz_amqp_bytes_dup(channels[idx].consumer->event_key), - kz_amqp_bytes_dup(channels[idx].consumer->event_subkey), - channels[idx].consumer->no_ack ? 0 : envelope.channel, - channels[idx].consumer->no_ack ? 0 : envelope.delivery_tag,1); - if(!channels[idx].consumer->no_ack ) { + kz_amqp_bytes_dup(consumer_channels[idx].consumer->event_key), + kz_amqp_bytes_dup(consumer_channels[idx].consumer->event_subkey), + 0, 0, 1); + if(!consumer_channels[idx].consumer->no_ack ) { if(amqp_basic_ack(kzconn->conn, envelope.channel, envelope.delivery_tag, 0 ) < 0) { LM_ERR("AMQP ERROR TRYING TO ACK A MSG\n"); OK = 0; } } - break; - default: - LM_DBG("ignoring received payload on consumer - %.*s\n", (int) envelope.message.body.len, (char*)envelope.message.body.bytes); - break; } break; case AMQP_RESPONSE_SERVER_EXCEPTION: @@ -2026,8 +2149,7 @@ void kz_amqp_consumer_proc(int child_no) } kz_amqp_connection_close(kzconn); - kz_amqp_fire_connection_event("closed", kzconn->info.host); + kz_amqp_fire_connection_event("closed", kzconn->info->info.host); } } - diff --git a/modules/kazoo/kz_amqp.h b/modules/kazoo/kz_amqp.h index 9fcf551d323..88ad8356d1f 100644 --- a/modules/kazoo/kz_amqp.h +++ b/modules/kazoo/kz_amqp.h @@ -25,10 +25,20 @@ extern str dbk_consumer_event_key; extern str dbk_consumer_event_subkey; extern int dbk_consumer_processes; - -typedef struct kz_amqp_conn_t { +typedef struct kz_amqp_connection_t { kz_amqp_connection_info info; char* url; + struct kz_amqp_connection_t* next; +} kz_amqp_connection, *kz_amqp_connection_ptr; + +typedef struct { + kz_amqp_connection_ptr current; + kz_amqp_connection_ptr head; + kz_amqp_connection_ptr tail; +} kz_amqp_connection_pool, *kz_amqp_connection_pool_ptr; + +typedef struct kz_amqp_conn_t { + kz_amqp_connection_ptr info; amqp_connection_state_t conn; amqp_socket_t *socket; amqp_channel_t channel_count; @@ -110,6 +120,7 @@ typedef struct { amqp_channel_t channel; kz_amqp_channel_state state; struct timeval timer; + gen_lock_t lock; } kz_amqp_channel, *kz_amqp_channel_ptr; typedef struct kz_amqp_binding_t { @@ -122,7 +133,7 @@ typedef struct { kz_amqp_binding_ptr tail; } kz_amqp_bindings, *kz_amqp_bindings_ptr; -void kz_amqp_init(); +int kz_amqp_init(); void kz_amqp_destroy(); int kz_amqp_add_connection(modparam_t type, void* val);