diff --git a/modules/kazoo/doc/kazoo_admin.xml b/modules/kazoo/doc/kazoo_admin.xml index 5f2dac8868b..93846986f87 100644 --- a/modules/kazoo/doc/kazoo_admin.xml +++ b/modules/kazoo/doc/kazoo_admin.xml @@ -451,7 +451,7 @@ modparam("kazoo", "amqp_interprocess_timeout_micro", 200000)
- <varname>amqp_waitframe_timout</varname>(str) + <varname>amqp_waitframe_tiemout</varname>(str) Timeout when checking for messages from rabbitmq. @@ -459,18 +459,18 @@ modparam("kazoo", "amqp_interprocess_timeout_micro", 200000) Default value is 100000 micro. - Set <varname>amqp_waitframe_timout</varname> parameter + Set <varname>amqp_waitframe_timeout</varname> parameter ... -modparam("kazoo", "amqp_waitframe_timout_sec", 1) -modparam("kazoo", "amqp_waitframe_timout_micro", 200000) +modparam("kazoo", "amqp_waitframe_timeout_sec", 1) +modparam("kazoo", "amqp_waitframe_timeout_micro", 200000) ...
- <varname>amqp_query_timout</varname>(str) + <varname>amqp_query_timeout</varname>(str) Timeout when checking for reply messages from rabbitmq for kazoo_query commands. @@ -478,16 +478,42 @@ modparam("kazoo", "amqp_waitframe_timout_micro", 200000) Default value is 2 sec. - Set <varname>amqp_query_timout</varname> parameter + Set <varname>amqp_query_timeout</varname> parameter ... -modparam("kazoo", "amqp_query_timout_sec", 1) -modparam("kazoo", "amqp_query_timout_micro", 200000) +modparam("kazoo", "amqp_query_timeout_sec", 1) +modparam("kazoo", "amqp_query_timeout_micro", 200000) ...
+
+ <varname>amqp_query_timeout_avp</varname>(str) + + avp holding the value in seconds for Timeout when checking for reply messages from rabbitmq for kazoo_query commands. + + + Default value is NULL (no value). + + + >Set <varname>amqp_query_timeout_avp</varname> parameter + +... +modparam("kazoo", "amqp_query_timeout_avp", "$var(kz_timeout)") + +route[SOME_ROUTE] +{ + $var(kz_timeout) = 12; + kazoo_query(exchange, routingkey, payload); +} + +... + + +
+ +
diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 0c671f74432..2f86e5ecc26 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -96,6 +96,9 @@ db_func_t kz_pa_dbf; str kz_presentity_table = str_init("presentity"); str kz_db_url = {0,0}; +str kz_query_timeout_avp = {0,0}; +pv_spec_t kz_query_timeout_spec; + MODULE_VERSION static tr_export_t mod_trans[] = { @@ -149,13 +152,13 @@ static param_export_t params[] = { {"amqp_consumer_ack_timeout_sec", INT_PARAM, &kz_ack_tv.tv_sec}, {"amqp_interprocess_timeout_micro", INT_PARAM, &kz_sock_tv.tv_usec}, {"amqp_interprocess_timeout_sec", INT_PARAM, &kz_sock_tv.tv_sec}, - {"amqp_waitframe_timout_micro", INT_PARAM, &kz_amqp_tv.tv_usec}, - {"amqp_waitframe_timout_sec", INT_PARAM, &kz_amqp_tv.tv_sec}, + {"amqp_waitframe_timeout_micro", INT_PARAM, &kz_amqp_tv.tv_usec}, + {"amqp_waitframe_timeout_sec", INT_PARAM, &kz_amqp_tv.tv_sec}, {"amqp_consumer_processes", INT_PARAM, &dbk_consumer_processes}, {"amqp_consumer_event_key", STR_PARAM, &dbk_consumer_event_key.s}, {"amqp_consumer_event_subkey", STR_PARAM, &dbk_consumer_event_subkey.s}, - {"amqp_query_timout_micro", INT_PARAM, &kz_qtimeout_tv.tv_usec}, - {"amqp_query_timout_sec", INT_PARAM, &kz_qtimeout_tv.tv_sec}, + {"amqp_query_timeout_micro", INT_PARAM, &kz_qtimeout_tv.tv_usec}, + {"amqp_query_timeout_sec", INT_PARAM, &kz_qtimeout_tv.tv_sec}, {"amqp_internal_loop_count", INT_PARAM, &dbk_internal_loop_count}, {"amqp_consumer_loop_count", INT_PARAM, &dbk_consumer_loop_count}, {"amqp_consumer_ack_loop_count", INT_PARAM, &dbk_consumer_ack_loop_count}, @@ -165,6 +168,7 @@ static param_export_t params[] = { {"pua_mode", INT_PARAM, &dbk_pua_mode}, {"single_consumer_on_reconnect", INT_PARAM, &dbk_single_consumer_on_reconnect}, {"consume_messages_on_reconnect", INT_PARAM, &dbk_consume_messages_on_reconnect}, + {"amqp_query_timeout_avp", STR_PARAM, &kz_query_timeout_avp.s}, {0, 0, 0} }; @@ -184,6 +188,30 @@ struct module_exports exports = { mod_child_init /* per-child init function */ }; +inline static int kz_parse_avp( str *avp_spec, pv_spec_t *avp, char *txt) +{ + if (pv_parse_spec(avp_spec, avp)==NULL) { + LM_ERR("malformed or non AVP %s AVP definition\n",txt); + return -1; + } + return 0; +} + +static int kz_init_avp(void) { + if(kz_query_timeout_avp.s) + kz_query_timeout_avp.len = strlen(kz_query_timeout_avp.s); + + if ( kz_query_timeout_avp.s ) { + if ( kz_parse_avp(&kz_query_timeout_avp, &kz_query_timeout_spec, "amqp_query_timeout_avp") <0) { + return -1; + } + } else { + memset( &kz_query_timeout_spec, 0, sizeof(pv_spec_t)); + } + + return 0; +} + static int mod_init(void) { int i; startup_time = (int) time(NULL); @@ -198,13 +226,13 @@ static int mod_init(void) { dbk_consumer_event_key.len = strlen(dbk_consumer_event_key.s); dbk_consumer_event_subkey.len = strlen(dbk_consumer_event_subkey.s); - kz_amqp_init(); - if (kz_callid_init() < 0) { - LOG(L_CRIT, "Error while initializing Call-ID generator\n"); - return -1; - } + if(kz_init_avp()) { + LM_ERR("Error in avp params\n"); + return -1; + } + kz_amqp_init(); if(dbk_pua_mode == 1) { kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0; @@ -277,15 +305,6 @@ static int mod_child_init(int rank) fire_init_event(rank); - if (rank != PROC_INIT) { - if (kz_callid_child_init(rank) < 0) { - /* don't init callid for PROC_INIT*/ - LOG(L_ERR, "ERROR: child_init: Error while initializing Call-ID" - " generator\n"); - return -2; - } - } - if (rank==PROC_INIT || rank==PROC_TCP_MAIN) return 0; @@ -376,7 +395,6 @@ static int fire_init_event(int rank) static void mod_destroy(void) { kz_amqp_destroy(); shm_free(kz_pipe_fds); - kz_tr_clear_buffers(); } diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index 4f7661f21e0..1c109bb63ac 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "../../mem/mem.h" #include "../../timer_proc.h" #include "../../sr_module.h" @@ -43,11 +44,11 @@ extern int dbk_consumer_ack_loop_count; extern int dbk_single_consumer_on_reconnect; extern int dbk_consume_messages_on_reconnect; +extern pv_spec_t kz_query_timeout_spec; + const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL }; const amqp_table_t kz_amqp_empty_table = { 0, NULL }; -char* last_payload_result = NULL; - static char *kz_amqp_str_dup(str *src) { @@ -333,8 +334,6 @@ void kz_amqp_destroy() { shm_free(kz_pool); } - if(last_payload_result != NULL) - free(last_payload_result); } @@ -621,7 +620,6 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) str unique_string = { 0, 0 }; char serverid[512]; - /* uuid_t id; char uuid_buffer[40]; @@ -629,8 +627,6 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) uuid_unparse_lower(id, uuid_buffer); unique_string.s = uuid_buffer; unique_string.len = strlen(unique_string.s); - */ - kz_generate_callid(&unique_string); sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++); @@ -684,7 +680,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) return ret; } -int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_payload, json_obj_ptr* json_ret ) +int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_payload, struct timeval* kz_timeout, json_obj_ptr* json_ret ) { int ret = 1; json_obj_ptr json_obj = NULL; @@ -694,7 +690,6 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ str unique_string = { 0, 0 }; char serverid[512]; - /* uuid_t id; char uuid_buffer[40]; @@ -702,9 +697,6 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ uuid_unparse_lower(id, uuid_buffer); unique_string.s = uuid_buffer; unique_string.len = strlen(unique_string.s); - */ - kz_generate_callid(&unique_string); - sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++); @@ -728,7 +720,9 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ cmd->routing_key = kz_amqp_str_dup(str_routing_key); cmd->reply_routing_key = kz_amqp_string_dup(serverid); cmd->payload = kz_amqp_string_dup(payload); - cmd->timeout = kz_qtimeout_tv; + + cmd->timeout = *kz_timeout; + if(cmd->payload == NULL || cmd->routing_key == NULL || cmd->exchange == NULL) { LM_ERR("failed to allocate kz_amqp_cmd parameters in process %d\n", getpid()); goto error; @@ -806,6 +800,9 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char }; + +char* last_payload_result = NULL; + int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result); @@ -817,9 +814,10 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha str json_s; str exchange_s; str routing_key_s; + struct timeval kz_timeout = kz_qtimeout_tv; if(last_payload_result) - free(last_payload_result); + pkg_free(last_payload_result); last_payload_result = NULL; @@ -847,8 +845,19 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha json_object_put(j); + if(kz_query_timeout_spec.type != PVT_NONE) { + pv_value_t pv_val; + if(pv_get_spec_value( msg, &kz_query_timeout_spec, &pv_val) == 0) { + if((pv_val.flags & PV_VAL_INT) && pv_val.ri != 0 ) { + kz_timeout.tv_usec = 0; + kz_timeout.tv_sec = pv_val.ri; + LM_INFO("SET TIMEOUT TO %i\n", kz_timeout.tv_sec); + } + } + } + json_obj_ptr ret = NULL; - int res = kz_amqp_pipe_send_receive(&exchange_s, &routing_key_s, &json_s, &ret ); + int res = kz_amqp_pipe_send_receive(&exchange_s, &routing_key_s, &json_s, &kz_timeout, &ret ); if(res != 0) { return -1; @@ -856,7 +865,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha char* strjson = (char*)json_object_to_json_string(ret); int len = strlen(strjson); - char* value = malloc(len+1); + char* value = pkg_malloc(len+1); memcpy(value, strjson, len); value[len] = '\0'; last_payload_result = value; @@ -1161,7 +1170,7 @@ int get_channel_index() { int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx ) { kz_amqp_bind_ptr bind = NULL; -// amqp_queue_declare_ok_t *r = NULL; + amqp_queue_declare_ok_t *r = NULL; str rpl_exch = str_init("targeted"); str rpl_exch_type = str_init("direct"); int ret = -1; @@ -1189,13 +1198,13 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int i goto error; } - amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table); + r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table); if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); + amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; @@ -1554,7 +1563,7 @@ void kz_amqp_manager_loop(int child_no) int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP; int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP; int ACK_READ_COUNT , ACK_READ_MAX_LOOP; -// char* payload; + char* payload; int channel_res; kz_amqp_conn_ptr kzconn; kz_amqp_cmd_ptr cmd; @@ -1701,7 +1710,7 @@ void kz_amqp_manager_loop(int child_no) CONSUMER_READ_COUNT = 0; while(CONSUME && (CONSUMER_READ_COUNT < CONSUMER_READ_MAX_LOOP || firstLoop)) { -// payload = NULL; + payload = NULL; CONSUMER_READ_COUNT++; amqp_envelope_t envelope; amqp_maybe_release_buffers(kzconn->conn); @@ -1795,135 +1804,3 @@ void kz_amqp_manager_loop(int child_no) kz_amqp_fire_connection_event("closed", kzconn->info.host); } } - - -/** - * \brief Length of a Call-ID in TM - */ -#define CALLID_NR_LEN 20 - -/** - * \brief Length of the Call-ID suffix - */ -#define CALLID_SUFFIX_LEN ( 1 /* - */ + \ - 5 /* pid */ + \ - 42 /* embedded v4inv6 address can be looong '128.' */ + \ - 2 /* parenthesis [] */ + \ - 1 /* ZT 0 */ + \ - 16 /* one never knows ;-) */ \ - ) - - -static unsigned long callid_nr; -static char callid_buf[CALLID_NR_LEN + CALLID_SUFFIX_LEN]; - -static str callid_prefix; -static str callid_suffix; - - -/** - * \brief Initialize the Call-ID generator, generates random prefix - * \return 0 on success, -1 on error - */ -int kz_callid_init(void) -{ - int rand_bits, i; - - /* calculate the initial call-id */ - /* how many bits and chars do we need to display the - * whole ULONG number */ - callid_prefix.len = sizeof(unsigned long) * 2; - callid_prefix.s = callid_buf; - - if (callid_prefix.len > CALLID_NR_LEN) { - LOG(L_ERR, "ERROR: Too small callid buffer\n"); - return -1; - } - - for(rand_bits = 1, i = RAND_MAX; i; i >>= 1, rand_bits++); /* how long are the rand()s ? */ - i = callid_prefix.len * 4 / rand_bits; /* how many rands() fit in the ULONG ? */ - - /* now fill in the callid with as many random - * numbers as you can + 1 */ - callid_nr = rand(); /* this is the + 1 */ - - while(i--) { - callid_nr <<= rand_bits; - callid_nr |= rand(); - } - - i = snprintf(callid_prefix.s, callid_prefix.len + 1, "%0*lx", callid_prefix.len, callid_nr); - if ((i == -1) || (i > callid_prefix.len)) { - LOG(L_CRIT, "BUG: SORRY, callid calculation failed\n"); - return -2; - } - - DBG("Call-ID initialization: '%.*s'\n", callid_prefix.len, callid_prefix.s); - return 0; -} - - -/** - * \brief Child initialization, generates suffix - * \param rank not used - * \return 0 on success, -1 on error - */ -int kz_callid_child_init(int rank) -{ - struct socket_info *si; - - /* on tcp/tls bind_address is 0 so try to get the first address we listen - * on no matter the protocol */ - si=bind_address?bind_address:get_first_socket(); - if (si==0){ - LOG(L_CRIT, "BUG: child_init_callid: null socket list\n"); - return -1; - } - callid_suffix.s = callid_buf + callid_prefix.len; - - callid_suffix.len = snprintf(callid_suffix.s, CALLID_SUFFIX_LEN, - "%c%d@%.*s", '-', my_pid(), - si->address_str.len, - si->address_str.s); - if ((callid_suffix.len == -1) || (callid_suffix.len > CALLID_SUFFIX_LEN)) { - LOG(L_ERR, "ERROR: child_init_callid: buffer too small\n"); - return -1; - } - - DBG("DEBUG: callid: '%.*s'\n", callid_prefix.len + callid_suffix.len, callid_prefix.s); - return 0; -} - - -/** - * \brief Increment a character in hex, return the carry flag - * \param _c input character - * \return carry flag - */ -static inline int inc_hexchar(char* _c) -{ - if (*_c == '9') { - *_c = 'a'; - return 0; - } - - if (*_c == 'f') { - *_c = '0'; - return 1; - } - - (*_c)++; - return 0; -} - - -void kz_generate_callid(str* callid) -{ - int i; - - for(i = callid_prefix.len; i; i--) { - if (!inc_hexchar(callid_prefix.s + i - 1)) break; - } - callid->s = callid_prefix.s; - callid->len = callid_prefix.len + callid_suffix.len; -}