Skip to content

Commit

Permalink
kazoo : fix memory allocation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo committed Jan 23, 2015
1 parent 836e693 commit fa5468c
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 60 deletions.
16 changes: 16 additions & 0 deletions modules/kazoo/kazoo.c
Expand Up @@ -200,6 +200,12 @@ static int mod_init(void) {

kz_amqp_init();

if (kz_callid_init() < 0) {
LOG(L_CRIT, "Error while initializing Call-ID generator\n");
return -1;
}


if(dbk_pua_mode == 1) {
kz_db_url.len = kz_db_url.s ? strlen(kz_db_url.s) : 0;
LM_DBG("db_url=%s/%d/%p\n", ZSW(kz_db_url.s), kz_db_url.len,kz_db_url.s);
Expand Down Expand Up @@ -271,6 +277,15 @@ static int mod_child_init(int rank)

fire_init_event(rank);

if (rank != PROC_INIT) {
if (kz_callid_child_init(rank) < 0) {
/* don't init callid for PROC_INIT*/
LOG(L_ERR, "ERROR: child_init: Error while initializing Call-ID"
" generator\n");
return -2;
}
}

if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
return 0;

Expand Down Expand Up @@ -361,6 +376,7 @@ static int fire_init_event(int rank)
static void mod_destroy(void) {
kz_amqp_destroy();
shm_free(kz_pipe_fds);
kz_tr_clear_buffers();
}


161 changes: 150 additions & 11 deletions modules/kazoo/kz_amqp.c
Expand Up @@ -5,7 +5,6 @@
#include <amqp_framing.h>
#include <amqp_tcp_socket.h>
#include <json.h>
#include <uuid/uuid.h>
#include "../../mem/mem.h"
#include "../../timer_proc.h"
#include "../../sr_module.h"
Expand Down Expand Up @@ -47,6 +46,8 @@ extern int dbk_consume_messages_on_reconnect;
const amqp_bytes_t kz_amqp_empty_bytes = { 0, NULL };
const amqp_table_t kz_amqp_empty_table = { 0, NULL };

char* last_payload_result = NULL;


static char *kz_amqp_str_dup(str *src)
{
Expand Down Expand Up @@ -332,6 +333,8 @@ void kz_amqp_destroy() {
shm_free(kz_pool);
}

if(last_payload_result != NULL)
free(last_payload_result);

}

Expand Down Expand Up @@ -618,13 +621,16 @@ int kz_amqp_pipe_send(str *str_exchange, str *str_routing_key, str *str_payload)
str unique_string = { 0, 0 };
char serverid[512];

/*
uuid_t id;
char uuid_buffer[40];
uuid_generate_random(id);
uuid_unparse_lower(id, uuid_buffer);
unique_string.s = uuid_buffer;
unique_string.len = strlen(unique_string.s);
*/
kz_generate_callid(&unique_string);

sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);

Expand Down Expand Up @@ -688,13 +694,17 @@ int kz_amqp_pipe_send_receive(str *str_exchange, str *str_routing_key, str *str_
str unique_string = { 0, 0 };
char serverid[512];

/*
uuid_t id;
char uuid_buffer[40];
uuid_generate_random(id);
uuid_unparse_lower(id, uuid_buffer);
unique_string.s = uuid_buffer;
unique_string.len = strlen(unique_string.s);
*/
kz_generate_callid(&unique_string);


sprintf(serverid, "kamailio@%.*s-<%d>-script-%lu", dbk_node_hostname.len, dbk_node_hostname.s, my_pid(), rpl_query_routing_key_count++);

Expand Down Expand Up @@ -796,9 +806,6 @@ int kz_amqp_publish(struct sip_msg* msg, char* exchange, char* routing_key, char

};


char* last_payload_result = NULL;

int kz_pv_get_last_query_result(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
{
return last_payload_result == NULL ? pv_get_null(msg, param, res) : pv_get_strzval(msg, param, res, last_payload_result);
Expand All @@ -812,7 +819,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha
str routing_key_s;

if(last_payload_result)
pkg_free(last_payload_result);
free(last_payload_result);

last_payload_result = NULL;

Expand Down Expand Up @@ -849,7 +856,7 @@ int kz_amqp_query_ex(struct sip_msg* msg, char* exchange, char* routing_key, cha

char* strjson = (char*)json_object_to_json_string(ret);
int len = strlen(strjson);
char* value = pkg_malloc(len+1);
char* value = malloc(len+1);
memcpy(value, strjson, len);
value[len] = '\0';
last_payload_result = value;
Expand Down Expand Up @@ -1154,7 +1161,7 @@ int get_channel_index() {
int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int idx )
{
kz_amqp_bind_ptr bind = NULL;
amqp_queue_declare_ok_t *r = NULL;
// amqp_queue_declare_ok_t *r = NULL;
str rpl_exch = str_init("targeted");
str rpl_exch_type = str_init("direct");
int ret = -1;
Expand Down Expand Up @@ -1182,13 +1189,13 @@ int kz_amqp_bind_targeted_channel(kz_amqp_conn_ptr kz_conn, int loopcount, int i
goto error;
}

r = amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
amqp_queue_declare(kz_conn->conn, channels[idx].channel, bind->queue, 0, 0, 1, 1, kz_amqp_empty_table);
if (kz_amqp_error("Declaring queue", amqp_get_rpc_reply(kz_conn->conn)))
{
goto error;
}

amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
amqp_exchange_declare(kz_conn->conn, channels[idx].channel, bind->exchange, bind->exchange_type, 0, 0, kz_amqp_empty_table);
if (kz_amqp_error("Declaring exchange", amqp_get_rpc_reply(kz_conn->conn)))
{
ret = -RET_AMQP_ERROR;
Expand Down Expand Up @@ -1547,7 +1554,7 @@ void kz_amqp_manager_loop(int child_no)
int INTERNAL_READ_COUNT , INTERNAL_READ_MAX_LOOP;
int CONSUMER_READ_COUNT , CONSUMER_READ_MAX_LOOP;
int ACK_READ_COUNT , ACK_READ_MAX_LOOP;
char* payload;
// char* payload;
int channel_res;
kz_amqp_conn_ptr kzconn;
kz_amqp_cmd_ptr cmd;
Expand Down Expand Up @@ -1694,7 +1701,7 @@ void kz_amqp_manager_loop(int child_no)

CONSUMER_READ_COUNT = 0;
while(CONSUME && (CONSUMER_READ_COUNT < CONSUMER_READ_MAX_LOOP || firstLoop)) {
payload = NULL;
// payload = NULL;
CONSUMER_READ_COUNT++;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(kzconn->conn);
Expand Down Expand Up @@ -1788,3 +1795,135 @@ void kz_amqp_manager_loop(int child_no)
kz_amqp_fire_connection_event("closed", kzconn->info.host);
}
}


/**
* \brief Length of a Call-ID in TM
*/
#define CALLID_NR_LEN 20

/**
* \brief Length of the Call-ID suffix
*/
#define CALLID_SUFFIX_LEN ( 1 /* - */ + \
5 /* pid */ + \
42 /* embedded v4inv6 address can be looong '128.' */ + \
2 /* parenthesis [] */ + \
1 /* ZT 0 */ + \
16 /* one never knows ;-) */ \
)


static unsigned long callid_nr;
static char callid_buf[CALLID_NR_LEN + CALLID_SUFFIX_LEN];

static str callid_prefix;
static str callid_suffix;


/**
* \brief Initialize the Call-ID generator, generates random prefix
* \return 0 on success, -1 on error
*/
int kz_callid_init(void)
{
int rand_bits, i;

/* calculate the initial call-id */
/* how many bits and chars do we need to display the
* whole ULONG number */
callid_prefix.len = sizeof(unsigned long) * 2;
callid_prefix.s = callid_buf;

if (callid_prefix.len > CALLID_NR_LEN) {
LOG(L_ERR, "ERROR: Too small callid buffer\n");
return -1;
}

for(rand_bits = 1, i = RAND_MAX; i; i >>= 1, rand_bits++); /* how long are the rand()s ? */
i = callid_prefix.len * 4 / rand_bits; /* how many rands() fit in the ULONG ? */

/* now fill in the callid with as many random
* numbers as you can + 1 */
callid_nr = rand(); /* this is the + 1 */

while(i--) {
callid_nr <<= rand_bits;
callid_nr |= rand();
}

i = snprintf(callid_prefix.s, callid_prefix.len + 1, "%0*lx", callid_prefix.len, callid_nr);
if ((i == -1) || (i > callid_prefix.len)) {
LOG(L_CRIT, "BUG: SORRY, callid calculation failed\n");
return -2;
}

DBG("Call-ID initialization: '%.*s'\n", callid_prefix.len, callid_prefix.s);
return 0;
}


/**
* \brief Child initialization, generates suffix
* \param rank not used
* \return 0 on success, -1 on error
*/
int kz_callid_child_init(int rank)
{
struct socket_info *si;

/* on tcp/tls bind_address is 0 so try to get the first address we listen
* on no matter the protocol */
si=bind_address?bind_address:get_first_socket();
if (si==0){
LOG(L_CRIT, "BUG: child_init_callid: null socket list\n");
return -1;
}
callid_suffix.s = callid_buf + callid_prefix.len;

callid_suffix.len = snprintf(callid_suffix.s, CALLID_SUFFIX_LEN,
"%c%d@%.*s", '-', my_pid(),
si->address_str.len,
si->address_str.s);
if ((callid_suffix.len == -1) || (callid_suffix.len > CALLID_SUFFIX_LEN)) {
LOG(L_ERR, "ERROR: child_init_callid: buffer too small\n");
return -1;
}

DBG("DEBUG: callid: '%.*s'\n", callid_prefix.len + callid_suffix.len, callid_prefix.s);
return 0;
}


/**
* \brief Increment a character in hex, return the carry flag
* \param _c input character
* \return carry flag
*/
static inline int inc_hexchar(char* _c)
{
if (*_c == '9') {
*_c = 'a';
return 0;
}

if (*_c == 'f') {
*_c = '0';
return 1;
}

(*_c)++;
return 0;
}


void kz_generate_callid(str* callid)
{
int i;

for(i = callid_prefix.len; i; i--) {
if (!inc_hexchar(callid_prefix.s + i - 1)) break;
}
callid->s = callid_prefix.s;
callid->len = callid_prefix.len + callid_suffix.len;
}

0 comments on commit fa5468c

Please sign in to comment.