Skip to content

Commit

Permalink
[rx/heartbeat] move pmtu / pmtu_reply to pmtud.[c|h]
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 9, 2020
1 parent 2b7a643 commit c09e87b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 79 deletions.
91 changes: 91 additions & 0 deletions libknet/threads_pmtud.c
Expand Up @@ -641,6 +641,97 @@ void *_handle_pmtud_link_thread(void *data)
return NULL;
}

static void send_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
{
int err = 0, savederrno = 0, stats_err = 0;
unsigned char *outbuf = (unsigned char *)inbuf;
ssize_t len, outlen;

outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);

if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
return;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}

savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
return;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
return;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);

src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
}

void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
{
send_pmtud_reply(knet_h, src_link, inbuf);
}

void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf)
{
if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
return;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
}

int knet_handle_pmtud_getfreq(knet_handle_t knet_h, unsigned int *interval)
{
int savederrno = 0;
Expand Down
3 changes: 3 additions & 0 deletions libknet/threads_pmtud.h
Expand Up @@ -12,4 +12,7 @@

void *_handle_pmtud_link_thread(void *data);

void process_pmtud(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf);
void process_pmtud_reply(knet_handle_t knet_h, struct knet_link *src_link, struct knet_header *inbuf);

#endif
82 changes: 3 additions & 79 deletions libknet/threads_rx.c
Expand Up @@ -26,6 +26,7 @@
#include "transport_common.h"
#include "threads_common.h"
#include "threads_heartbeat.h"
#include "threads_pmtud.h"
#include "threads_rx.h"
#include "netutils.h"

Expand Down Expand Up @@ -237,7 +238,6 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
int bcast = 1;
uint64_t decrypt_time = 0;
struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
ssize_t len = msg->msg_len;
struct iovec iov_out[1];
int8_t channel;
Expand Down Expand Up @@ -556,92 +556,16 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
case KNET_HEADER_TYPE_PMTUD:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
outlen = KNET_HEADER_PMTUD_SIZE;
inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
inbuf->kh_node = htons(knet_h->host_id);

if (knet_h->crypto_in_use_config) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)inbuf,
outlen,
knet_h->recv_from_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
break;
}
outbuf = knet_h->recv_from_links_buf_crypt;
stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}

/* Unlock so we don't deadlock with tx_mutex */
pthread_mutex_unlock(&src_link->link_stats_mutex);

savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
if (savederrno) {
log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
goto out_pmtud;
}
retry_pmtud:
if (src_link->transport_connected) {
if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
(struct sockaddr *) &src_link->dst_addr, sizeof(struct sockaddr_storage));
} else {
len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
}
savederrno = errno;
if (len != outlen) {
err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, len, savederrno);
stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
if (stats_err < 0) {
log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
break;
}
switch(err) {
case -1: /* unrecoverable error */
log_debug(knet_h, KNET_SUB_RX,
"Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
src_link->outsock, errno, strerror(errno),
src_link->status.src_ipaddr, src_link->status.src_port,
src_link->status.dst_ipaddr, src_link->status.dst_port);

src_link->status.stats.tx_pmtu_errors++;
break;
case 0: /* ignore error and continue */
src_link->status.stats.tx_pmtu_errors++;
break;
case 1: /* retry to send those same data */
src_link->status.stats.tx_pmtu_retries++;
pthread_mutex_unlock(&src_link->link_stats_mutex);
goto retry_pmtud;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
}
pthread_mutex_unlock(&knet_h->tx_mutex);
out_pmtud:
process_pmtud(knet_h, src_link, inbuf);
return; /* Don't need to unlock link_stats_mutex */
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;

/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);

if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
break;
}
src_link->last_recv_mtu = inbuf->khp_pmtud_size;
pthread_cond_signal(&knet_h->pmtud_cond);
pthread_mutex_unlock(&knet_h->pmtud_mutex);
process_pmtud_reply(knet_h, src_link, inbuf);
return;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
Expand Down

0 comments on commit c09e87b

Please sign in to comment.