Skip to content

Commit

Permalink
[heartbeat] move all packet data filling in the same chuck of code
Browse files Browse the repository at this point in the history
and re-org code in the file a bit (no functional changes)

Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Aug 25, 2020
1 parent 4189893 commit fa0b4ce
Showing 1 changed file with 98 additions and 98 deletions.
196 changes: 98 additions & 98 deletions libknet/threads_heartbeat.c
Expand Up @@ -36,7 +36,7 @@ static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct
}
}

static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
static void send_ping(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
{
int err = 0, savederrno = 0, stats_err = 0;
int len;
Expand All @@ -61,6 +61,11 @@ static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host,
timespec_diff(dst_link->ping_last, clock_now, &diff_ping);

if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) {
/* preparing ping buffer */
knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);

memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec));
knet_h->pingbuf->khp_ping_link = dst_link->link_id;
if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
Expand Down Expand Up @@ -141,103 +146,6 @@ static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host,
}
}

void _send_pings(knet_handle_t knet_h, int timed)
{
struct knet_host *dst_host;
int link_idx;

if (pthread_mutex_lock(&knet_h->hb_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock");
return;
}

for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;

_handle_check_each(knet_h, dst_host, &dst_host->link[link_idx], timed);
}
}

pthread_mutex_unlock(&knet_h->hb_mutex);
}

static void _adjust_pong_timeouts(knet_handle_t knet_h)
{
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;

if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get backoff_mutex");
return;
}

for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;

dst_link = &dst_host->link[link_idx];

if (dst_link->pong_timeout_backoff > 1) {
dst_link->pong_timeout_backoff--;
}

dst_link->pong_timeout_adj = (dst_link->pong_timeout * dst_link->pong_timeout_backoff) + (dst_link->status.stats.latency_ave * KNET_LINK_PONG_TIMEOUT_LAT_MUL);
}
}

pthread_mutex_unlock(&knet_h->backoff_mutex);
}

void *_handle_heartbt_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
int i = 1;

set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED);

/* preparing ping buffer */
knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);

while (!shutdown_in_progress(knet_h)) {
usleep(knet_h->threads_timer_res);

if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock");
continue;
}

/*
* _adjust_pong_timeouts should execute approx once a second.
*/
if ((i % (1000000 / knet_h->threads_timer_res)) == 0) {
_adjust_pong_timeouts(knet_h);
i = 1;
} else {
i++;
}

_send_pings(knet_h, 1);

pthread_rwlock_unlock(&knet_h->global_rwlock);
}

set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STOPPED);

return NULL;
}

static void send_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link, struct knet_header *inbuf) {
int err = 0, savederrno = 0, stats_err = 0;
unsigned char *outbuf = (unsigned char *)inbuf;
Expand Down Expand Up @@ -401,3 +309,95 @@ void process_pong(knet_handle_t knet_h, struct knet_host *src_host, struct knet_
}
}
}

void _send_pings(knet_handle_t knet_h, int timed)
{
struct knet_host *dst_host;
int link_idx;

if (pthread_mutex_lock(&knet_h->hb_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock");
return;
}

for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;

send_ping(knet_h, dst_host, &dst_host->link[link_idx], timed);
}
}

pthread_mutex_unlock(&knet_h->hb_mutex);
}

static void _adjust_pong_timeouts(knet_handle_t knet_h)
{
struct knet_host *dst_host;
struct knet_link *dst_link;
int link_idx;

if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get backoff_mutex");
return;
}

for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
if ((dst_host->link[link_idx].status.enabled != 1) ||
(dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
(dst_host->link[link_idx].status.dynconnected != 1)))
continue;

dst_link = &dst_host->link[link_idx];

if (dst_link->pong_timeout_backoff > 1) {
dst_link->pong_timeout_backoff--;
}

dst_link->pong_timeout_adj = (dst_link->pong_timeout * dst_link->pong_timeout_backoff) + (dst_link->status.stats.latency_ave * KNET_LINK_PONG_TIMEOUT_LAT_MUL);
}
}

pthread_mutex_unlock(&knet_h->backoff_mutex);
}

void *_handle_heartbt_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
int i = 1;

set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED);

while (!shutdown_in_progress(knet_h)) {
usleep(knet_h->threads_timer_res);

if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock");
continue;
}

/*
* _adjust_pong_timeouts should execute approx once a second.
*/
if ((i % (1000000 / knet_h->threads_timer_res)) == 0) {
_adjust_pong_timeouts(knet_h);
i = 1;
} else {
i++;
}

_send_pings(knet_h, 1);

pthread_rwlock_unlock(&knet_h->global_rwlock);
}

set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STOPPED);

return NULL;
}

0 comments on commit fa0b4ce

Please sign in to comment.