Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ Transport Reconnect Logic #2651

Merged
merged 12 commits into from
May 10, 2021
25 changes: 21 additions & 4 deletions transports/janus_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ static janus_transport_callbacks *gateway = NULL;
static gboolean rmq_janus_api_enabled = FALSE;
static gboolean rmq_admin_api_enabled = FALSE;
static gboolean notify_events = TRUE;
static guint rmq_reconnect_backoff_initial = 100000; /* 100ms */
static guint rmq_reconnect_backoff_max = 5000000; /* 5s */
static gfloat rmq_reconnect_backoff_multiplier = 1.5;

#define JANUS_RABBITMQ_EXCHANGE_TYPE "fanout"

Expand Down Expand Up @@ -922,6 +925,8 @@ void *janus_rmq_in_thread(void *data) {
timeout.tv_sec = 0;
timeout.tv_usec = 20000;
amqp_frame_t frame;
guint rmq_reconnect_backoff = rmq_reconnect_backoff_initial;

while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {
amqp_maybe_release_buffers(rmq_client->rmq_conn);
atoppi marked this conversation as resolved.
Show resolved Hide resolved
/* Wait for a frame */
Expand All @@ -945,8 +950,14 @@ void *janus_rmq_in_thread(void *data) {
JANUS_LOG(LOG_VERB, "Trying to reconnect with RabbitMQ Server\n");
int result = janus_rabbitmq_connect();
if(result < 0) {
JANUS_LOG(LOG_ERR, "Failed to reconnect to RabbitMQ Server. Retrying in 5s...\n");
g_usleep(5000000);
JANUS_LOG(LOG_WARN, "Failed to reconnect to RabbitMQ Server. Retrying in %fs...\n", (gfloat)rmq_reconnect_backoff/1000000);
g_usleep(rmq_reconnect_backoff);
rmq_reconnect_backoff *= rmq_reconnect_backoff_multiplier;
if (rmq_reconnect_backoff >= rmq_reconnect_backoff_max) {
rmq_reconnect_backoff = rmq_reconnect_backoff_max;
}
chriswiggins marked this conversation as resolved.
Show resolved Hide resolved
} else {
rmq_reconnect_backoff = rmq_reconnect_backoff_initial;
}

continue;
Expand Down Expand Up @@ -1024,13 +1035,19 @@ void *janus_rmq_out_thread(void *data) {
return NULL;
}
JANUS_LOG(LOG_VERB, "Joining RabbitMQ out thread\n");
guint rmq_reconnect_backoff = rmq_reconnect_backoff_initial;
while(!rmq_client->destroy && !g_atomic_int_get(&stopping)) {

while (!rmq_client->connected) {
chriswiggins marked this conversation as resolved.
Show resolved Hide resolved
chriswiggins marked this conversation as resolved.
Show resolved Hide resolved
JANUS_LOG(LOG_WARN, "Waiting for 5s for RabbitMQ to reconnect\n");
g_usleep(5000000);
g_usleep(rmq_reconnect_backoff);
rmq_reconnect_backoff *= rmq_reconnect_backoff_multiplier;
if (rmq_reconnect_backoff >= rmq_reconnect_backoff_max) {
rmq_reconnect_backoff = rmq_reconnect_backoff_max;
}
chriswiggins marked this conversation as resolved.
Show resolved Hide resolved
}

rmq_reconnect_backoff = rmq_reconnect_backoff_initial;

/* We send messages from here as well, not only notifications */
janus_rabbitmq_response *response = g_async_queue_pop(rmq_client->messages);
if(response == &exit_message)
Expand Down