Skip to content

Commit

Permalink
[threads] implement outgoing pckt fragmentation based on PMTUd
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 20, 2015
1 parent a662cd6 commit 31f3f4e
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 41 deletions.
4 changes: 4 additions & 0 deletions libknet/onwire.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ typedef uint16_t seq_num_t;

struct knet_header_payload_data {
seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pkcts */
uint8_t khp_data_frag_num; /* number of fragments of this pckt. 1 is not fragmented */
uint8_t khp_data_frag_seq; /* as above, indicates the frag sequence number */
uint8_t khp_data_userdata[0]; /* pointer to the real user data */
} __attribute__((packed));

Expand Down Expand Up @@ -158,6 +160,8 @@ struct knet_header {
*/

#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num
#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num
#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq
#define khp_data_userdata kh_payload.khp_data.khp_data_userdata

#define khp_ping_link kh_payload.khp_ping.khp_ping_link
Expand Down
171 changes: 130 additions & 41 deletions libknet/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@ static void _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
}

for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
sendto(dst_host->link[dst_host->active_links[link_idx]].listener_sock,
outbuf, outlen, MSG_DONTWAIT,
(struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].dst_addr,
sizeof(struct sockaddr_storage));
if (sendto(dst_host->link[dst_host->active_links[link_idx]].listener_sock,
outbuf, outlen, MSG_DONTWAIT,
(struct sockaddr *) &dst_host->link[dst_host->active_links[link_idx]].dst_addr,
sizeof(struct sockaddr_storage)) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to send data packet to host %s (%u) link %s:%s (%u): %s",
dst_host->name, dst_host->host_id,
dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
dst_host->link[dst_host->active_links[link_idx]].link_id,
strerror(errno));
}

if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
(dst_host->active_link_entries > 1)) {
Expand All @@ -67,15 +74,15 @@ static void _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,

static void _handle_send_to_links(knet_handle_t knet_h, int sockfd)
{
ssize_t inlen = 0, len, outlen;
ssize_t inlen = 0, len, outlen, frag_len;
struct knet_host *dst_host;
uint16_t dst_host_ids[KNET_MAX_HOST];
size_t dst_host_ids_entries = 0;
int bcast = 1;
unsigned char *outbuf = (unsigned char *)knet_h->send_to_links_buf;
struct knet_hostinfo *knet_hostinfo;
struct iovec iov_in;
uint16_t pckt_frag = 0;
unsigned int temp_data_mtu;

if (pthread_rwlock_rdlock(&knet_h->list_rwlock) != 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to get read lock");
Expand Down Expand Up @@ -105,8 +112,10 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd)
goto out_unlock;
}

outlen = len = inlen + KNET_HEADER_DATA_SIZE;

/*
* move this into a separate function to expand on
* extra switching rules
*/
switch(knet_h->send_to_links_buf->kh_type) {
case KNET_HEADER_TYPE_DATA:
if (knet_h->dst_host_filter_fn) {
Expand Down Expand Up @@ -142,62 +151,123 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd)
}

if (!knet_h->data_mtu) {
/*
* using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
*/
log_debug(knet_h, KNET_SUB_SEND_T,
"Received data packet but data MTU is still unknown. Packet might not be delivered");
"Received data packet but data MTU is still unknown."
" Packet might not be delivered."
" Assuming mininum IPv4 mtu (%d)",
KNET_PMTUD_MIN_MTU_V4);
temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
} else {
/*
* take a copy of the mtu to avoid value changing under
* our feet while we are sending a fragmented pckt
*/
temp_data_mtu = knet_h->data_mtu;
}

if ((knet_h->data_mtu) &&
(inlen > knet_h->data_mtu)) {
log_debug(knet_h, KNET_SUB_SEND_T, "Frag code is still not implemented");
pckt_frag = ceil((float)inlen / knet_h->data_mtu);
log_debug(knet_h, KNET_SUB_SEND_T, "Current inlen: %zu data mtu: %u frags: %d", inlen, knet_h->data_mtu, pckt_frag);
goto out_unlock;
frag_len = inlen;

knet_h->send_to_links_buf->khp_data_frag_seq = 0;
knet_h->send_to_links_buf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);

if (knet_h->send_to_links_buf->khp_data_frag_num > 1) {
log_debug(knet_h, KNET_SUB_SEND_T, "Current inlen: %zu data mtu: %u frags: %d",
inlen, temp_data_mtu, knet_h->send_to_links_buf->khp_data_frag_num);
}

if (!bcast) {
int host_idx;

for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
if (!dst_host) {
log_debug(knet_h, KNET_SUB_SEND_T, "unicast packet, host not found");
continue;
while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {

knet_h->send_to_links_buf->khp_data_frag_seq++;

if (frag_len > temp_data_mtu) {
outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
} else {
outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
}

knet_h->send_to_links_buf->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx);
for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
dst_host = knet_h->host_index[dst_host_ids[host_idx]];
if (!dst_host) {
log_debug(knet_h, KNET_SUB_SEND_T, "unicast packet, host not found");
continue;
}

if (knet_h->send_to_links_buf->khp_data_frag_seq == 1) {
knet_h->send_to_links_buf->khp_data_seq_num = htons(++dst_host->ucast_seq_num_tx);
} else {
knet_h->send_to_links_buf->khp_data_seq_num = htons(dst_host->ucast_seq_num_tx);
}

if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->send_to_links_buf,
len,
knet_h->send_to_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
goto out_unlock;
}
outbuf = knet_h->send_to_links_buf_crypt;
}

_dispatch_to_links(knet_h, dst_host, outbuf, outlen);

}

if (frag_len > temp_data_mtu) {
frag_len = frag_len - temp_data_mtu;
/*
* we can't verify this memmove till reassembly code is implemented
*/
memmove(knet_h->send_to_links_buf->khp_data_userdata,
knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
temp_data_mtu);
}
}
} else {
knet_h->send_to_links_buf->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx);

while (knet_h->send_to_links_buf->khp_data_frag_seq < knet_h->send_to_links_buf->khp_data_frag_num) {

knet_h->send_to_links_buf->khp_data_frag_seq++;

if (frag_len > temp_data_mtu) {
outlen = len = temp_data_mtu + KNET_HEADER_DATA_SIZE;
} else {
outlen = len = frag_len + KNET_HEADER_DATA_SIZE;
}

if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->send_to_links_buf,
len,
knet_h->send_to_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt unicast packet");
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt mcast/bcast packet");
goto out_unlock;
}
outbuf = knet_h->send_to_links_buf_crypt;
}

_dispatch_to_links(knet_h, dst_host, outbuf, outlen);

}
} else {
knet_h->send_to_links_buf->khp_data_seq_num = htons(++knet_h->bcast_seq_num_tx);

if (knet_h->crypto_instance) {
if (crypto_encrypt_and_sign(knet_h,
(const unsigned char *)knet_h->send_to_links_buf,
len,
knet_h->send_to_links_buf_crypt,
&outlen) < 0) {
log_debug(knet_h, KNET_SUB_SEND_T, "Unable to encrypt mcast/bcast packet");
goto out_unlock;
for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
_dispatch_to_links(knet_h, dst_host, outbuf, outlen);
}
outbuf = knet_h->send_to_links_buf_crypt;
}

for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
_dispatch_to_links(knet_h, dst_host, outbuf, outlen);
if (frag_len > temp_data_mtu) {
frag_len = frag_len - temp_data_mtu;
/*
* we can't verify this memmove till reassembly code is implemented
*/
memmove(knet_h->send_to_links_buf->khp_data_userdata,
knet_h->send_to_links_buf->khp_data_userdata + temp_data_mtu,
temp_data_mtu);
}
}
}

Expand Down Expand Up @@ -257,6 +327,22 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd)
goto exit_unlock;
}

/*
* skip fragmented pckt for now
*/

if (knet_h->recv_from_links_buf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->recv_from_links_buf->khp_data_frag_num > 1) {
if (knet_h->recv_from_links_buf->khp_data_frag_seq == 1) {
log_warn(knet_h, KNET_SUB_LINK_T, "pckt reassembly code not implemented! seq: %u frag: %u size: %zu",
ntohs(knet_h->recv_from_links_buf->khp_data_seq_num),
knet_h->recv_from_links_buf->khp_data_frag_seq,
len - KNET_HEADER_SIZE);
}
goto exit_unlock;
}
}

knet_h->recv_from_links_buf->kh_node = ntohs(knet_h->recv_from_links_buf->kh_node);
src_host = knet_h->host_index[knet_h->recv_from_links_buf->kh_node];
if (src_host == NULL) { /* host not found */
Expand Down Expand Up @@ -855,7 +941,10 @@ static void _handle_check_pmtud(knet_handle_t knet_h, struct knet_host *dst_host
}

if (found_mtu) {
dst_link->status.mtu = onwire_len;
/*
* account for IP overhead in PMTU calculation
*/
dst_link->status.mtu = onwire_len - overhead_len;
pthread_mutex_unlock(&knet_h->pmtud_mutex);
return;
}
Expand Down

0 comments on commit 31f3f4e

Please sign in to comment.