Skip to content

Commit

Permalink
rabbitmq: don't create reply-to queue on publish
Browse files Browse the repository at this point in the history
When using the rabbitmq_publish function, there is no need to create
a reply to queue, because it will never be read. And since there is
never a real consumer, so the queue will never be deleted. This
will eventually cloak up the RabbitMQ server with millions of
generic reply queues.
This bug has been fixed in master already, so this is basically a
backport.
  • Loading branch information
Sebastian Damm committed May 12, 2017
1 parent 40d26ea commit 9abd1e0
Showing 1 changed file with 0 additions and 35 deletions.
35 changes: 0 additions & 35 deletions src/modules/rabbitmq/rabbitmq.c
Expand Up @@ -175,7 +175,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
int reconnect_attempts = 0;
int log_ret;
str exchange, routingkey, messagebody, contenttype;
amqp_bytes_t reply_to_queue;

// sanity checks
if (get_str_fparam(&exchange, msg, (fparam_t*)in_exchange) < 0) {
Expand Down Expand Up @@ -231,44 +230,13 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
return RABBITMQ_ERR_CHANNEL;
}

// alloc queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
if (log_on_amqp_error(amqp_get_rpc_reply(conn), "amqp_queue_declare()") != AMQP_RESPONSE_NORMAL) {
LM_ERR("FAIL: amqp_queue_declare()\n");
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
return RABBITMQ_ERR_QUEUE;
}

// alloc bytes
reply_to_queue = amqp_bytes_malloc_dup(r->queue);
LM_DBG("%.*s\n", (int)reply_to_queue.len, (char*)reply_to_queue.bytes);
if (reply_to_queue.bytes == NULL) {
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_bytes_free(reply_to_queue);
LM_ERR("Out of memory while copying queue name");
return -1;
}

// alloc properties
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_REPLY_TO_FLAG |
AMQP_BASIC_CORRELATION_ID_FLAG;
props.content_type = amqp_cstring_bytes(contenttype.s);
props.delivery_mode = 2; /* persistent delivery mode */
props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
if (props.reply_to.bytes == NULL) {
// debug
LM_ERR("Out of memory while copying queue name");

// cleanup
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_bytes_free(reply_to_queue);

// error
return -1;
}
props.correlation_id = amqp_cstring_bytes("1");

// publish
Expand All @@ -285,7 +253,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou

// cleanup
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
amqp_bytes_free(reply_to_queue);

// error
return RABBITMQ_ERR_PUBLISH;
Expand All @@ -295,8 +262,6 @@ static int rabbitmq_publish(struct sip_msg* msg, char* in_exchange, char* in_rou
LM_DBG("SUCCESS: amqp_basic_publish()\n");

// cleanup
amqp_bytes_free(props.reply_to);
amqp_bytes_free(reply_to_queue);
amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

// success
Expand Down

0 comments on commit 9abd1e0

Please sign in to comment.