Skip to content

Commit

Permalink
rabbitmq: renamed global variable, same name being used in some funct…
Browse files Browse the repository at this point in the history
…ions

- global variables made static
  • Loading branch information
miconda committed Nov 5, 2018
1 parent 3afc035 commit 4f303b6
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions src/modules/rabbitmq/rabbitmq.c
Expand Up @@ -68,16 +68,16 @@ static int rabbitmq_publish_consume(
static int mod_init(void);
static int mod_child_init(int);

amqp_socket_t *amqp_sock = NULL;
amqp_connection_state_t conn = NULL;
static amqp_socket_t *amqp_sock = NULL;
static amqp_connection_state_t amqp_conn = NULL;

/* module parameters */
struct amqp_connection_info amqp_info;
char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL;
int max_reconnect_attempts = 1;
int timeout_sec = 1;
int timeout_usec = 0;
int direct_reply_to = 0;
static struct amqp_connection_info amqp_info;
static char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL;
static int max_reconnect_attempts = 1;
static int timeout_sec = 1;
static int timeout_usec = 0;
static int direct_reply_to = 0;

/* module helper functions */
static int rabbitmq_connect(amqp_connection_state_t *conn);
Expand Down Expand Up @@ -170,7 +170,7 @@ static int mod_child_init(int rank)
}

// routing process
if(rabbitmq_connect(&conn) != RABBITMQ_OK) {
if(rabbitmq_connect(&amqp_conn) != RABBITMQ_OK) {
LM_ERR("FAIL rabbitmq_connect()");
return -1;
}
Expand All @@ -188,9 +188,9 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,

reconnect:
// open channel
amqp_channel_open(conn, 1);
amqp_channel_open(amqp_conn, 1);
log_ret =
log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
log_on_amqp_error(amqp_get_rpc_reply(amqp_conn), "amqp_channel_open()");

// open channel - failed
if(log_ret != AMQP_RESPONSE_NORMAL) {
Expand All @@ -203,7 +203,7 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
LM_ERR("RETRY: rabbitmq_reconnect()\n");

// reconnect - success
if(rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
if(rabbitmq_reconnect(&amqp_conn) == RABBITMQ_OK) {
// reconnect - debug
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
}
Expand All @@ -214,7 +214,7 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
}

// reconnect - close channel
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);

// reconnect - return error
return RABBITMQ_ERR_CHANNEL;
Expand All @@ -229,7 +229,7 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
props.correlation_id = amqp_cstring_bytes("1");

// publish
if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange->s),
if(log_on_error(amqp_basic_publish(amqp_conn, 1, amqp_cstring_bytes(exchange->s),
amqp_cstring_bytes(routingkey->s), 0, 0, &props,
amqp_cstring_bytes(messagebody->s)),
"amqp_basic_publish()")
Expand All @@ -238,7 +238,7 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
LM_ERR("FAIL: amqp_basic_publish()\n");

// cleanup
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);

// error
return RABBITMQ_ERR_PUBLISH;
Expand All @@ -248,7 +248,7 @@ static int ki_rabbitmq_publish(sip_msg_t *msg, str *exchange, str *routingkey,
LM_DBG("SUCCESS: amqp_basic_publish()\n");

// cleanup
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);

// success
return RABBITMQ_OK;
Expand Down Expand Up @@ -311,9 +311,9 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,

reconnect:
// open channel
amqp_channel_open(conn, 1);
amqp_channel_open(amqp_conn, 1);
log_ret =
log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_channel_open()");
log_on_amqp_error(amqp_get_rpc_reply(amqp_conn), "amqp_channel_open()");

// open channel - failed
if(log_ret != AMQP_RESPONSE_NORMAL) {
Expand All @@ -326,7 +326,7 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
LM_ERR("RETRY: rabbitmq_reconnect()\n");

// reconnect - success
if(rabbitmq_reconnect(&conn) == RABBITMQ_OK) {
if(rabbitmq_reconnect(&amqp_conn) == RABBITMQ_OK) {
// reconnect - debug
LM_ERR("SUCCESS: rabbitmq_reconnect()\n");
}
Expand All @@ -337,15 +337,15 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
}

// reconnect - close channel
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);

// reconnect - return error
return RABBITMQ_ERR_CHANNEL;
}

// alloc reply_to queue
if(direct_reply_to == 1) {
reply_to = amqp_queue_declare(conn, 1,
reply_to = amqp_queue_declare(amqp_conn, 1,
amqp_cstring_bytes("amq.rabbitmq.reply-to"), 0, 0, 1, 1,
amqp_empty_table);
} else {
Expand All @@ -356,14 +356,14 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
strcat(reply_to_buffer, uuid_buffer);

reply_to =
amqp_queue_declare(conn, 1, amqp_cstring_bytes(reply_to_buffer),
amqp_queue_declare(amqp_conn, 1, amqp_cstring_bytes(reply_to_buffer),
0, 0, 1, 1, amqp_empty_table);
}

if(log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()")
if(log_on_amqp_error(amqp_get_rpc_reply(amqp_conn), "amqp_queue_declare()")
!= AMQP_RESPONSE_NORMAL) {
LM_ERR("FAIL: amqp_queue_declare()\n");
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);
return RABBITMQ_ERR_QUEUE;
}

Expand All @@ -379,36 +379,36 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
props.reply_to = reply_to->queue;
if(props.reply_to.bytes == NULL) {
LM_ERR("Out of memory while copying queue name");
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);
return -1;
}
props.correlation_id = amqp_cstring_bytes("1");

// start consume
amqp_basic_consume(conn, 1, reply_to->queue, amqp_empty_bytes, 0, 1, 0,
amqp_basic_consume(amqp_conn, 1, reply_to->queue, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
if(log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_basic_consume()")
if(log_on_amqp_error(amqp_get_rpc_reply(amqp_conn), "amqp_basic_consume()")
!= AMQP_RESPONSE_NORMAL) {
LM_ERR("FAIL: amqp_basic_consume()\n");
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);
return RABBITMQ_ERR_CONSUME;
}

// publish
if(log_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange->s),
if(log_on_error(amqp_basic_publish(amqp_conn, 1, amqp_cstring_bytes(exchange->s),
amqp_cstring_bytes(routingkey->s), 0, 0, &props,
amqp_cstring_bytes(messagebody->s)),
"amqp_basic_publish()")
!= AMQP_RESPONSE_NORMAL) {
LM_ERR("FAIL: amqp_basic_publish()\n");
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);
return RABBITMQ_ERR_PUBLISH;
}

// consume frame
for(;;) {
amqp_maybe_release_buffers(conn);
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
amqp_maybe_release_buffers(amqp_conn);
result = amqp_simple_wait_frame_noblock(amqp_conn, &frame, &tv);
if(result < 0) {
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
result = -1;
Expand All @@ -433,7 +433,7 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
(char *)d->exchange.bytes, (int)d->routing_key.len,
(char *)d->routing_key.bytes);

result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
result = amqp_simple_wait_frame_noblock(amqp_conn, &frame, &tv);
if(result < 0) {
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
result = -1;
Expand All @@ -458,7 +458,7 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
body_received = 0;

while(body_received < body_target) {
result = amqp_simple_wait_frame_noblock(conn, &frame, &tv);
result = amqp_simple_wait_frame_noblock(amqp_conn, &frame, &tv);
if(result < 0) {
LM_ERR("amqp_simple_wait_frame_noblock() error: %d\n", result);
result = -1;
Expand Down Expand Up @@ -495,7 +495,7 @@ static int rabbitmq_publish_consume_helper(sip_msg_t *msg, str *exchange,
break;
}

amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_channel_close(amqp_conn, 1, AMQP_REPLY_SUCCESS);

return result;
}
Expand Down

0 comments on commit 4f303b6

Please sign in to comment.