From 96c44ab31df9cc1a4400ad16515191332ef767fc Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Fri, 23 Jan 2015 23:52:57 +0000 Subject: [PATCH] kazoo : fix memory allocation errors (cherry picked from commit fa5468c24283c8ff0a1b2254e927a128d03137fd) --- modules/kazoo/kazoo.c | 16 ++++ modules/kazoo/kz_amqp.c | 161 ++++++++++++++++++++++++++++++--- modules/kazoo/kz_trans.c | 188 +++++++++++++++++++++++++++++---------- modules/kazoo/kz_trans.h | 2 + 4 files changed, 307 insertions(+), 60 deletions(-) diff --git a/modules/kazoo/kazoo.c b/modules/kazoo/kazoo.c index 466919d0378..0c671f74432 100644 --- a/modules/kazoo/kazoo.c +++ b/modules/kazoo/kazoo.c @@ -200,6 +200,12 @@ static int mod_init(void) { kz_amqp_init(); + if (kz_callid_init() < 0) { + LOG(L_CRIT, "Error while initializing Call-ID generator\n"); + return -1; + } + + if(dbk_pua_mode == 1) { kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0; LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s); @@ -271,6 +277,15 @@ static int mod_child_init(int rank) fire_init_event(rank); + if (rank != PROC_INIT) { + if (kz_callid_child_init(rank) < 0) { + /* don't init callid for PROC_INIT*/ + LOG(L_ERR, "ERROR: child_init: Error while initializing Call-ID" + " generator\n"); + return -2; + } + } + if (rank==PROC_INIT || rank==PROC_TCP_MAIN) return 0; @@ -361,6 +376,7 @@ static int fire_init_event(int rank) static void mod_destroy(void) { kz_amqp_destroy(); shm_free(kz_pipe_fds); + kz_tr_clear_buffers(); } diff --git a/modules/kazoo/kz_amqp.c b/modules/kazoo/kz_amqp.c index ef90dfba6be..4f7661f21e0 100644 --- a/modules/kazoo/kz_amqp.c +++ b/modules/kazoo/kz_amqp.c @@ -5,7 +5,6 @@ #include #include #include -#include #include "../../mem/mem.h" #include "../../timer_proc.h" #include "../../sr_module.h" @@ -47,6 +46,8 @@ extern int dbk_consume_messages_on_reconnect; const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL }; const amqp_table_t kz_amqp_empty_table = { 0, NULL }; +char* last_payload_result = NULL; + static char *kz_amqp_str_dup(str *src) { @@ -332,6 +333,8 @@ void kz_amqp_destroy() { shm_free(kz_pool); } + if(last_payload_result != NULL) + free(last_payload_result); } @@ -618,6 +621,7 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) str unique_string = { 0, 0 }; char serverid[512]; + /* uuid_t id; char uuid_buffer[40]; @@ -625,6 +629,8 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload) uuid_unparse_lower(id, uuid_buffer); unique_string.s = uuid_buffer; unique_string.len = strlen(unique_string.s); + */ + kz_generate_callid(&unique_string); sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++); @@ -688,6 +694,7 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ str unique_string = { 0, 0 }; char serverid[512]; + /* uuid_t id; char uuid_buffer[40]; @@ -695,6 +702,9 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_ uuid_unparse_lower(id, uuid_buffer); unique_string.s = uuid_buffer; unique_string.len = strlen(unique_string.s); + */ + kz_generate_callid(&unique_string); + sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++); @@ -796,9 +806,6 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char }; - -char* last_payload_result = NULL; - int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res) { return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result); @@ -812,7 +819,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha str routing_key_s; if(last_payload_result) - pkg_free(last_payload_result); + free(last_payload_result); last_payload_result = NULL; @@ -849,7 +856,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha char* strjson = (char*)json_object_to_json_string(ret); int len = strlen(strjson); - char* value = pkg_malloc(len+1); + char* value = malloc(len+1); memcpy(value, strjson, len); value[len] = '\0'; last_payload_result = value; @@ -1154,7 +1161,7 @@ int get_channel_index() { int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx ) { kz_amqp_bind_ptr bind = NULL; - amqp_queue_declare_ok_t *r = NULL; +// amqp_queue_declare_ok_t *r = NULL; str rpl_exch = str_init("targeted"); str rpl_exch_type = str_init("direct"); int ret = -1; @@ -1182,13 +1189,13 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int i goto error; } - r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table); + amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table); if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn))) { goto error; } - amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); + amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table); if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn))) { ret = -RET_AMQP_ERROR; @@ -1547,7 +1554,7 @@ void kz_amqp_manager_loop(int child_no) int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP; int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP; int ACK_READ_COUNT , ACK_READ_MAX_LOOP; - char* payload; +// char* payload; int channel_res; kz_amqp_conn_ptr kzconn; kz_amqp_cmd_ptr cmd; @@ -1694,7 +1701,7 @@ void kz_amqp_manager_loop(int child_no) CONSUMER_READ_COUNT = 0; while(CONSUME && (CONSUMER_READ_COUNT < CONSUMER_READ_MAX_LOOP || firstLoop)) { - payload = NULL; +// payload = NULL; CONSUMER_READ_COUNT++; amqp_envelope_t envelope; amqp_maybe_release_buffers(kzconn->conn); @@ -1788,3 +1795,135 @@ void kz_amqp_manager_loop(int child_no) kz_amqp_fire_connection_event("closed", kzconn->info.host); } } + + +/** + * \brief Length of a Call-ID in TM + */ +#define CALLID_NR_LEN 20 + +/** + * \brief Length of the Call-ID suffix + */ +#define CALLID_SUFFIX_LEN ( 1 /* - */ + \ + 5 /* pid */ + \ + 42 /* embedded v4inv6 address can be looong '128.' */ + \ + 2 /* parenthesis [] */ + \ + 1 /* ZT 0 */ + \ + 16 /* one never knows ;-) */ \ + ) + + +static unsigned long callid_nr; +static char callid_buf[CALLID_NR_LEN + CALLID_SUFFIX_LEN]; + +static str callid_prefix; +static str callid_suffix; + + +/** + * \brief Initialize the Call-ID generator, generates random prefix + * \return 0 on success, -1 on error + */ +int kz_callid_init(void) +{ + int rand_bits, i; + + /* calculate the initial call-id */ + /* how many bits and chars do we need to display the + * whole ULONG number */ + callid_prefix.len = sizeof(unsigned long) * 2; + callid_prefix.s = callid_buf; + + if (callid_prefix.len > CALLID_NR_LEN) { + LOG(L_ERR, "ERROR: Too small callid buffer\n"); + return -1; + } + + for(rand_bits = 1, i = RAND_MAX; i; i >>= 1, rand_bits++); /* how long are the rand()s ? */ + i = callid_prefix.len * 4 / rand_bits; /* how many rands() fit in the ULONG ? */ + + /* now fill in the callid with as many random + * numbers as you can + 1 */ + callid_nr = rand(); /* this is the + 1 */ + + while(i--) { + callid_nr <<= rand_bits; + callid_nr |= rand(); + } + + i = snprintf(callid_prefix.s, callid_prefix.len + 1, "%0*lx", callid_prefix.len, callid_nr); + if ((i == -1) || (i > callid_prefix.len)) { + LOG(L_CRIT, "BUG: SORRY, callid calculation failed\n"); + return -2; + } + + DBG("Call-ID initialization: '%.*s'\n", callid_prefix.len, callid_prefix.s); + return 0; +} + + +/** + * \brief Child initialization, generates suffix + * \param rank not used + * \return 0 on success, -1 on error + */ +int kz_callid_child_init(int rank) +{ + struct socket_info *si; + + /* on tcp/tls bind_address is 0 so try to get the first address we listen + * on no matter the protocol */ + si=bind_address?bind_address:get_first_socket(); + if (si==0){ + LOG(L_CRIT, "BUG: child_init_callid: null socket list\n"); + return -1; + } + callid_suffix.s = callid_buf + callid_prefix.len; + + callid_suffix.len = snprintf(callid_suffix.s, CALLID_SUFFIX_LEN, + "%c%d@%.*s", '-', my_pid(), + si->address_str.len, + si->address_str.s); + if ((callid_suffix.len == -1) || (callid_suffix.len > CALLID_SUFFIX_LEN)) { + LOG(L_ERR, "ERROR: child_init_callid: buffer too small\n"); + return -1; + } + + DBG("DEBUG: callid: '%.*s'\n", callid_prefix.len + callid_suffix.len, callid_prefix.s); + return 0; +} + + +/** + * \brief Increment a character in hex, return the carry flag + * \param _c input character + * \return carry flag + */ +static inline int inc_hexchar(char* _c) +{ + if (*_c == '9') { + *_c = 'a'; + return 0; + } + + if (*_c == 'f') { + *_c = '0'; + return 1; + } + + (*_c)++; + return 0; +} + + +void kz_generate_callid(str* callid) +{ + int i; + + for(i = callid_prefix.len; i; i--) { + if (!inc_hexchar(callid_prefix.s + i - 1)) break; + } + callid->s = callid_prefix.s; + callid->len = callid_prefix.len + callid_suffix.len; +} diff --git a/modules/kazoo/kz_trans.c b/modules/kazoo/kz_trans.c index 32e38e68f40..eb2ea643727 100644 --- a/modules/kazoo/kz_trans.c +++ b/modules/kazoo/kz_trans.c @@ -54,7 +54,7 @@ /*! transformation buffer size */ #define KZ_TR_BUFFER_SIZE 65536 -#define KZ_TR_BUFFER_SLOTS 8 +#define KZ_TR_BUFFER_SLOTS 4 /*! transformation buffer */ static char **_kz_tr_buffer_list = NULL; @@ -63,6 +63,14 @@ static char *_kz_tr_buffer = NULL; static int _kz_tr_buffer_idx = 0; +#define KZ_TR_ALLOC_PARSE_SIZE 2048 + +static pv_spec_t** _kz_parse_specs = NULL; +static tr_param_t** _kz_parse_params = NULL; +static int _kz_tr_parse_spec = 0; +static int _kz_tr_parse_params = 0; + + /*! * */ @@ -71,6 +79,7 @@ int kz_tr_init_buffers(void) int i; _kz_tr_buffer_list = (char**)malloc(KZ_TR_BUFFER_SLOTS * sizeof(char*)); + if(_kz_tr_buffer_list==NULL) return -1; for(i=0; irs.s = _kz_tr_buffer; \ } while(0); +void kz_destroy_pv_value(pv_value_t *val) { + + if(val->flags & PV_VAL_PKG) + pkg_free(val->rs.s); + else if(val->flags & PV_VAL_SHM) + shm_free(val->rs.s); + pkg_free(val); +} + +void kz_free_pv_value(pv_value_t *val ) { + if(val->flags & PV_VAL_PKG) + pkg_free(val->rs.s); + else if(val->flags & PV_VAL_SHM) + shm_free(val->rs.s); +} + +pv_value_t* kz_alloc_pv_value() { + pv_value_t* v = (pv_value_t*) pkg_malloc(sizeof(pv_value_t)); + if(v != NULL) + memset(v, 0, sizeof(pv_value_t*)); + return v; +} + + /*! * \brief Evaluate kazoo transformations * \param msg SIP message @@ -111,11 +188,13 @@ char *kz_tr_set_crt_buffer(void) int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val) { + str sv; + pv_value_t* pv; + pv_value_t v; + if(val==NULL || (val->flags&PV_VAL_NULL)) return -1; - char* tofree = NULL; - int oldflags = 0; kz_tr_set_crt_buffer(); @@ -125,72 +204,80 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val if(!(val->flags&PV_VAL_STR)) return -1; - oldflags = val->flags; - tofree = val->rs.s; + pv = kz_alloc_pv_value(); + if(pv == NULL) + { + LM_ERR("kazoo encode transform : no more private memory\n"); + return -1; + } - if( kz_amqp_encode_ex(&val->rs, val ) != 1) { + if( kz_amqp_encode_ex(&val->rs, pv ) != 1) { LM_ERR("error encoding value\n"); + kz_destroy_pv_value(pv); return -1; } - /* - // it seems that val memory is not freed - // even with flag set to PV_VAL_PKG + strncpy(_kz_tr_buffer, pv->rs.s, pv->rs.len); + _kz_tr_buffer[pv->rs.len] = '\0'; - strncpy(_kz_tr_buffer, val->rs.s, val->rs.len); - if(val->flags & PV_VAL_PKG) - pkg_free(val->rs.s); - else if(val->flags & PV_VAL_SHM) - shm_free(val->rs.s); - _kz_tr_buffer[val->rs.len] = '\0'; val->flags = PV_VAL_STR; val->ri = 0; val->rs.s = _kz_tr_buffer; - */ - - if(oldflags & PV_VAL_PKG) { - pkg_free(tofree); - } else if(oldflags & PV_VAL_SHM) { - shm_free(tofree); - } + val->rs.len = pv->rs.len; + kz_destroy_pv_value(pv); + kz_free_pv_value(val); break; case TR_KAZOO_JSON: + if(!(val->flags&PV_VAL_STR)) + return -1; + if(tp==NULL) { LM_ERR("kazoo json transform invalid parameter\n"); return -1; } - oldflags = val->flags; - tofree = val->rs.s; + pv = kz_alloc_pv_value(); + if(pv == NULL) + { + LM_ERR("kazoo encode transform : no more private memory\n"); + return -1; + } + - if(kz_json_get_field_ex(&val->rs, &tp->v.s, val ) != 1) { + if(tp->type == TR_PARAM_STRING) + { + sv = tp->v.s; + } else { + if(pv_get_spec_value(msg, (pv_spec_p)tp->v.data, &v)!=0 + || (!(v.flags&PV_VAL_STR)) || v.rs.len<=0) + { + LM_ERR("value cannot get spec value in json transform\n"); + kz_destroy_pv_value(pv); + return -1; + } + sv = v.rs; + } + + + if(kz_json_get_field_ex(&val->rs, &sv, pv ) != 1) { LM_ERR("error getting json\n"); + kz_destroy_pv_value(pv); return -1; } - /* - // it seems that val memory is not freed - // even with flag set to PV_VAL_PKG - - strncpy(_kz_tr_buffer, val->rs.s, val->rs.len); - if(val->flags & PV_VAL_PKG) - pkg_free(val->rs.s); - else if(val->flags & PV_VAL_SHM) - shm_free(val->rs.s); - _kz_tr_buffer[val->rs.len] = '\0'; + strncpy(_kz_tr_buffer, pv->rs.s, pv->rs.len); + _kz_tr_buffer[pv->rs.len] = '\0'; + val->flags = PV_VAL_STR; val->ri = 0; val->rs.s = _kz_tr_buffer; - */ + val->rs.len = pv->rs.len; - if(oldflags & PV_VAL_PKG) { - pkg_free(tofree); - } else if(oldflags & PV_VAL_SHM) { - shm_free(tofree); - } + kz_destroy_pv_value(pv); + kz_free_pv_value(val); break; @@ -205,7 +292,7 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val while(is_in_str(_p, _in) && (*_p==' ' || *_p=='\t' || *_p=='\n')) _p++; \ if(*_p==PV_MARKER) \ { /* pseudo-variable */ \ - _spec = (pv_spec_t*)pkg_malloc(sizeof(pv_spec_t)); \ + _spec = (pv_spec_t*)malloc(sizeof(pv_spec_t)); \ if(_spec==NULL) \ { \ LM_ERR("no more private memory!\n"); \ @@ -220,7 +307,7 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val goto error; \ } \ _p = _p0; \ - _tp = (tr_param_t*)pkg_malloc(sizeof(tr_param_t)); \ + _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \ if(_tp==NULL) \ { \ LM_ERR("no more private memory!\n"); \ @@ -229,6 +316,8 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val memset(_tp, 0, sizeof(tr_param_t)); \ _tp->type = TR_PARAM_SPEC; \ _tp->v.data = (void*)_spec; \ + _kz_parse_specs[_kz_tr_parse_spec++] = _spec; \ + _kz_parse_params[_kz_tr_parse_params++] = _tp; \ } else { /* string */ \ _ps = _p; \ while(is_in_str(_p, _in) && *_p!='\t' && *_p!='\n' \ @@ -240,7 +329,7 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val _in->len, _in->s); \ goto error; \ } \ - _tp = (tr_param_t*)pkg_malloc(sizeof(tr_param_t)); \ + _tp = (tr_param_t*)malloc(sizeof(tr_param_t)); \ if(_tp==NULL) \ { \ LM_ERR("no more private memory!\n"); \ @@ -250,6 +339,7 @@ int kz_tr_eval(struct sip_msg *msg, tr_param_t *tp, int subtype, pv_value_t *val _tp->type = TR_PARAM_STRING; \ _tp->v.s.s = _ps; \ _tp->v.s.len = _p - _ps; \ + _kz_parse_params[_kz_tr_parse_params++] = _tp; \ } @@ -317,9 +407,9 @@ char* kz_tr_parse(str* in, trans_t *t) name.len, name.s, name.len); error: if(tp) - tr_param_free(tp); + free(tp); if(spec) - pv_spec_free(spec); + free(spec); return NULL; done: t->name = name; diff --git a/modules/kazoo/kz_trans.h b/modules/kazoo/kz_trans.h index e5d45ff91d2..d765357737f 100644 --- a/modules/kazoo/kz_trans.h +++ b/modules/kazoo/kz_trans.h @@ -38,5 +38,7 @@ enum _kz_tr_subtype { TR_KAZOO_NONE=0, TR_KAZOO_ENCODE, TR_KAZOO_JSON }; char* kz_tr_parse(str *in, trans_t *tr); int kz_tr_init_buffers(void); +void kz_tr_clear_buffers(void); + #endif