From b1a243e166bdcf7d76b071a0964187e6ce714602 Mon Sep 17 00:00:00 2001 From: Luis Azedo Date: Fri, 3 Mar 2017 20:03:09 +0000 Subject: [PATCH] kazoo : producer heartbeats --- src/modules/kazoo/kz_amqp.c | 161 +++++++++++++++++++++++++++--------- src/modules/kazoo/kz_amqp.h | 14 +++- 2 files changed, 135 insertions(+), 40 deletions(-) diff --git a/src/modules/kazoo/kz_amqp.c b/src/modules/kazoo/kz_amqp.c index 2111e892d4f..a962d99c91c 100644 --- a/src/modules/kazoo/kz_amqp.c +++ b/src/modules/kazoo/kz_amqp.c @@ -741,6 +741,9 @@ void kz_amqp_connection_close(kz_amqp_conn_ptr rmq) { if (!rmq) return; + if(rmq->heartbeat) + kz_amqp_timer_destroy(&rmq->heartbeat); + kz_amqp_fire_connection_event("closed", rmq->server->connection->info.host); if (rmq->conn) { @@ -2273,7 +2276,13 @@ int kz_amqp_connect(kz_amqp_conn_ptr rmq) } } - return 0; + if(dbk_use_hearbeats > 0) { + if(kz_amqp_timer_create(&rmq->heartbeat, dbk_use_hearbeats, kz_amqp_heartbeat_proc, rmq) != 0) { + LM_ERR("could not schedule heartbeats for the connection\n"); + } + } + + return 0; error: kz_amqp_handle_server_failure(rmq); @@ -2291,53 +2300,21 @@ void kz_amqp_reconnect_cb(int fd, short event, void *arg) return; } - if (connection->ev != NULL) { - event_del(connection->ev); - pkg_free(connection->ev); - connection->ev = NULL; - } - - close(fd); - pkg_free(connection->timer); - + kz_amqp_timer_destroy(&connection->reconnect); kz_amqp_connect(connection); } int kz_amqp_handle_server_failure(kz_amqp_conn_ptr connection) { + int res = 0; if(connection->state != KZ_AMQP_CONNECTION_CLOSED) connection->state = KZ_AMQP_CONNECTION_FAILURE; - int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - - if (timerfd == -1) { - LM_ERR("could not create timerfd to reschedule connection. No further attempts will be made to reconnect this server.\n"); - return -1; + if((res = kz_amqp_timer_create(&connection->reconnect, 5, kz_amqp_reconnect_cb, connection)) != 0) { + LM_ERR("could not reschedule connection. No further attempts will be made to reconnect this server.\n"); } - - struct itimerspec *itime = pkg_malloc(sizeof(struct itimerspec)); - itime->it_interval.tv_sec = 0; - itime->it_interval.tv_nsec = 0; - - itime->it_value.tv_sec = 5; - itime->it_value.tv_nsec = 0; - - if (timerfd_settime(timerfd, 0, itime, NULL) == -1) - { - LM_ERR("could not set timer to reschedule connection. No further attempts will be made to reconnect this server.\n"); - return -1; - } - LM_DBG("timerfd value is %d\n", timerfd); - struct event *timer_ev = pkg_malloc(sizeof(struct event)); - event_set(timer_ev, timerfd, EV_READ, kz_amqp_reconnect_cb, connection); - if(event_add(timer_ev, NULL) == -1) { - LM_ERR("event_add failed while rescheduling connection (%s). No further attempts will be made to reconnect this server.\n", strerror(errno)); - return -1; - } - connection->ev = timer_ev; - connection->timer = itime; - return 0; + return res; } int kz_amqp_publisher_send(kz_amqp_cmd_ptr cmd) @@ -2847,3 +2824,111 @@ int kz_amqp_consumer_worker_proc(int cmd_pipe) return 0; } +void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer) +{ + kz_amqp_timer_ptr timer = *pTimer; + if (timer->ev != NULL) { + event_del(timer->ev); + pkg_free(timer->ev); + timer->ev = NULL; + } + close(timer->fd); + pkg_free(timer->timer); + pkg_free(timer); + *pTimer = NULL; +} + +int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data) +{ + kz_amqp_timer_ptr timer = NULL; + struct itimerspec *itime = NULL; + struct event *timer_ev = NULL; + int timerfd = 0; + + timer = (kz_amqp_timer_ptr) pkg_malloc(sizeof(kz_amqp_timer)); + if (!timer) { + LM_ERR("could not allocate timer struct.\n"); + goto error; + } + memset(timer, 0, sizeof(kz_amqp_timer)); + + timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (timerfd == -1) { + LM_ERR("could not create timer.\n"); + goto error; + } + + itime = pkg_malloc(sizeof(struct itimerspec)); + if (!itime) { + LM_ERR("could not allocate itimerspec struct.\n"); + goto error; + } + itime->it_interval.tv_sec = 0; + itime->it_interval.tv_nsec = 0; + itime->it_value.tv_sec = seconds; + itime->it_value.tv_nsec = 0; + + if (timerfd_settime(timerfd, 0, itime, NULL) == -1) { + LM_ERR("could not set timer for %i seconds in %i\n", seconds, timerfd); + goto error; + } + + LM_DBG("timerfd value is %d\n", timerfd); + timer_ev = pkg_malloc(sizeof(struct event)); + if (!timer_ev) { + LM_ERR("could not allocate event struct.\n"); + goto error; + } + event_set(timer_ev, timerfd, EV_READ | EV_PERSIST, callback, data); + if (event_add(timer_ev, NULL) == -1) { + LM_ERR("event_add failed while creating timer (%s).\n", strerror(errno)); + goto error; + } + + timer->ev = timer_ev; + timer->timer = itime; + timer->fd = timerfd; + *pTimer = timer; + + return 0; + +error: + + if (timer_ev) + pkg_free(timer_ev); + + if (itime) + pkg_free(itime); + + if (timerfd > 0) + close(timerfd); + + if (timer) + pkg_free(timer); + + *pTimer = NULL; + + return -1; +} + +void kz_amqp_heartbeat_proc(int fd, short event, void *arg) +{ + int res; + amqp_frame_t heartbeat; + kz_amqp_conn_ptr connection = (kz_amqp_conn_ptr) arg; + LM_DBG("sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id); + if (connection->state != KZ_AMQP_CONNECTION_OPEN) { + kz_amqp_timer_destroy(&connection->heartbeat); + return; + } + heartbeat.channel = 0; + heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; + res = amqp_send_frame(connection->conn, &heartbeat); + if (res != AMQP_STATUS_OK) { + LM_ERR("error sending heartbeat to zone : %s , connection id : %d\n", connection->server->zone->zone, connection->server->id); + kz_amqp_timer_destroy(&connection->heartbeat); + kz_amqp_handle_server_failure(connection); + return; + } + timerfd_settime(connection->heartbeat->fd, 0, connection->heartbeat->timer, NULL); +} diff --git a/src/modules/kazoo/kz_amqp.h b/src/modules/kazoo/kz_amqp.h index 0e5d6688f38..6aa36b875e6 100644 --- a/src/modules/kazoo/kz_amqp.h +++ b/src/modules/kazoo/kz_amqp.h @@ -82,12 +82,18 @@ typedef struct kz_amqp_connection_t { char* url; } kz_amqp_connection, *kz_amqp_connection_ptr; +typedef struct kz_amqp_timer_t { + struct event *ev; + struct itimerspec *timer; + int fd; +} kz_amqp_timer, *kz_amqp_timer_ptr; + typedef struct kz_amqp_conn_t { struct kz_amqp_server_t* server; amqp_connection_state_t conn; kz_amqp_connection_state state; - struct event *ev; - struct itimerspec *timer; + kz_amqp_timer_ptr reconnect; + kz_amqp_timer_ptr heartbeat; amqp_socket_t *socket; amqp_channel_t channel_count; amqp_channel_t channel_counter; @@ -269,6 +275,10 @@ void kz_amqp_fire_connection_event(char *event, char* host); void kz_amqp_free_pipe_cmd(kz_amqp_cmd_ptr cmd); +void kz_amqp_timer_destroy(kz_amqp_timer_ptr* pTimer); +int kz_amqp_timer_create(kz_amqp_timer_ptr* pTimer, int seconds, void (*callback)(int, short, void *), void *data); +void kz_amqp_heartbeat_proc(int fd, short event, void *arg); + static inline int kz_amqp_error(char const *context, amqp_rpc_reply_t x) { amqp_connection_close_t *mconn;