Skip to content

Commit

Permalink
[tx] fill in header data closer to where they are currently used
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 9, 2020
1 parent 7e99d86 commit c198f7a
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions libknet/threads_tx.c
Expand Up @@ -171,6 +171,9 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan

inbuf = knet_h->recv_from_sock_buf;
inbuf->kh_type = KNET_HEADER_TYPE_DATA;
inbuf->kh_version = KNET_HEADER_VERSION;
inbuf->khp_data_frag_seq = 0;
inbuf->kh_node = htons(knet_h->host_id);

if (knet_h->enabled != 1) {
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
Expand Down Expand Up @@ -465,9 +468,12 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[frag_idx]->kh_node = htons(knet_h->host_id);
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
knet_h->send_to_links_buf[frag_idx]->khp_data_frag_seq = frag_idx + 1;
knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
Expand Down Expand Up @@ -576,18 +582,31 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
return err;
}

static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel)
static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;

memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;

memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;

if ((channel >= 0) &&
(channel < KNET_DATAFD_MAX) &&
(!knet_h->sockfd[channel].is_socket)) {
inlen = readv(sockfd, msg->msg_iov, 1);
inlen = readv(sockfd, msg.msg_iov, 1);
} else {
inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg->msg_flags & MSG_TRUNC) {
inlen = recvmsg(sockfd, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
if (msg.msg_flags & MSG_TRUNC) {
log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
return;
}
Expand Down Expand Up @@ -631,32 +650,10 @@ void *_handle_send_to_links_thread(void *data)
int i, nev;
int flush, flush_queue_limit;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
struct sockaddr_storage address;

set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);

memset(&events, 0, sizeof(events));
memset(&iov_in, 0, sizeof(iov_in));
iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
iov_in.iov_len = KNET_MAX_PACKET_SIZE;

memset(&msg, 0, sizeof(struct msghdr));
msg.msg_name = &address;
msg.msg_namelen = sizeof(struct sockaddr_storage);
msg.msg_iov = &iov_in;
msg.msg_iovlen = 1;

knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);

for (i = 0; i < PCKT_FRAG_MAX; i++) {
knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
}

flush_queue_limit = 0;

Expand Down Expand Up @@ -721,7 +718,7 @@ void *_handle_send_to_links_thread(void *data)
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel);
_handle_send_to_links(knet_h, events[i].data.fd, channel);
pthread_mutex_unlock(&knet_h->tx_mutex);
}

Expand Down

0 comments on commit c198f7a

Please sign in to comment.