Skip to content

Commit

Permalink
[WIP] re organize tx and rx
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 16, 2020
1 parent b0aad78 commit 3b00ec9
Show file tree
Hide file tree
Showing 4 changed files with 387 additions and 273 deletions.
20 changes: 10 additions & 10 deletions libknet/onwire.h
Expand Up @@ -47,7 +47,7 @@
typedef uint16_t seq_num_t; /* data sequence number required to deduplicate pckts */
#define SEQ_MAX UINT16_MAX

struct knet_header_payload_data {
struct knet_header_payload_data_v1 {
seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pckts */
uint8_t khp_data_compress; /* identify if user data are compressed */
uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */
Expand All @@ -58,13 +58,13 @@ struct knet_header_payload_data {
uint8_t khp_data_userdata[0]; /* pointer to the real user data */
} __attribute__((packed));

#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num
#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num
#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq
#define khp_data_userdata kh_payload.khp_data.khp_data_userdata
#define khp_data_bcast kh_payload.khp_data.khp_data_bcast
#define khp_data_channel kh_payload.khp_data.khp_data_channel
#define khp_data_compress kh_payload.khp_data.khp_data_compress
#define khp_data_v1_seq_num kh_payload.khp_data_v1.khp_data_seq_num
#define khp_data_v1_frag_num kh_payload.khp_data_v1.khp_data_frag_num
#define khp_data_v1_frag_seq kh_payload.khp_data_v1.khp_data_frag_seq
#define khp_data_v1_userdata kh_payload.khp_data_v1.khp_data_userdata
#define khp_data_v1_bcast kh_payload.khp_data_v1.khp_data_bcast
#define khp_data_v1_channel kh_payload.khp_data_v1.khp_data_channel
#define khp_data_v1_compress kh_payload.khp_data_v1.khp_data_compress

/*
* KNET_HEADER_TYPE_PING / KNET_HEADER_TYPE_PONG
Expand Down Expand Up @@ -125,7 +125,7 @@ size_t calc_min_mtu(knet_handle_t knet_h);
*/

union knet_header_payload {
struct knet_header_payload_data khp_data; /* pure data packet struct */
struct knet_header_payload_data_v1 khp_data_v1; /* pure data packet struct */
struct knet_header_payload_ping_v1 khp_ping_v1; /* heartbeat packet struct */
struct knet_header_payload_pmtud_v1 khp_pmtud_v1; /* Path MTU discovery packet struct */
} __attribute__((packed));
Expand All @@ -151,6 +151,6 @@ struct knet_header {
#define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload))
#define KNET_HEADER_PING_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping_v1))
#define KNET_HEADER_PMTUD_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud_v1))
#define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data))
#define KNET_HEADER_DATA_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data_v1))

#endif
2 changes: 1 addition & 1 deletion libknet/tests/pckt_test.c
Expand Up @@ -17,7 +17,7 @@ int main(void)
printf("KNET_HEADER_SIZE: %zu\n", KNET_HEADER_SIZE);
printf("KNET_HEADER_PING_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PING_V1_SIZE, sizeof(struct knet_header_payload_ping_v1));
printf("KNET_HEADER_PMTUD_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_V1_SIZE, sizeof(struct knet_header_payload_pmtud_v1));
printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data));
printf("KNET_HEADER_DATA_V1_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_V1_SIZE, sizeof(struct knet_header_payload_data_v1));

return 0;
}
66 changes: 35 additions & 31 deletions libknet/threads_rx.c
Expand Up @@ -73,7 +73,7 @@ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
*/
for (i = 0; i < KNET_MAX_LINK; i++) {
if (src_host->defrag_buf[i].in_use) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_v1_seq_num) {
return i;
}
}
Expand All @@ -86,15 +86,15 @@ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
* buffer. If the pckt has been seen before, the buffer expired (ETIME)
* and there is no point to try to defrag it again.
*/
if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
if (!_seq_num_lookup(src_host, inbuf->khp_data_v1_seq_num, 1, 0)) {
errno = ETIME;
return -1;
}

/*
* register the pckt as seen
*/
_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
_seq_num_set(src_host, inbuf->khp_data_v1_seq_num, 1);

/*
* see if there is a free buffer
Expand Down Expand Up @@ -140,7 +140,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
if (!defrag_buf->in_use) {
memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
defrag_buf->in_use = 1;
defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
defrag_buf->pckt_seq = inbuf->khp_data_v1_seq_num;
}

/*
Expand All @@ -151,7 +151,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
/*
* check if we already received this fragment
*/
if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
if (defrag_buf->frag_map[inbuf->khp_data_v1_frag_seq]) {
/*
* if we have received this fragment and we didn't clear the buffer
* it means that we don't have all fragments yet
Expand All @@ -163,7 +163,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
* we need to handle the last packet with gloves due to its different size
*/

if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
if (inbuf->khp_data_v1_frag_seq == inbuf->khp_data_v1_frag_num) {
defrag_buf->last_frag_size = *len;

/*
Expand All @@ -177,31 +177,31 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
if (!defrag_buf->frag_size) {
defrag_buf->last_first = 1;
memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
inbuf->khp_data_userdata,
inbuf->khp_data_v1_userdata,
*len);
}
} else {
defrag_buf->frag_size = *len;
}

if (defrag_buf->frag_size) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_userdata, *len);
memmove(defrag_buf->buf + ((inbuf->khp_data_v1_frag_seq - 1) * defrag_buf->frag_size),
inbuf->khp_data_v1_userdata, *len);
}

defrag_buf->frag_recv++;
defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
defrag_buf->frag_map[inbuf->khp_data_v1_frag_seq] = 1;

/*
* check if we received all the fragments
*/
if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
if (defrag_buf->frag_recv == inbuf->khp_data_v1_frag_num) {
/*
* special case the last pckt
*/

if (defrag_buf->last_first) {
memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
memmove(defrag_buf->buf + ((inbuf->khp_data_v1_frag_num - 1) * defrag_buf->frag_size),
defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
defrag_buf->last_frag_size);
}
Expand All @@ -210,12 +210,12 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
* recalculate packet lenght
*/

*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
*len = ((inbuf->khp_data_v1_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;

/*
* copy the pckt back in the user data
*/
memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
memmove(inbuf->khp_data_v1_userdata, defrag_buf->buf, *len);

/*
* free this buffer
Expand Down Expand Up @@ -262,41 +262,41 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
}

inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
channel = inbuf->khp_data_channel;
inbuf->khp_data_v1_seq_num = ntohs(inbuf->khp_data_v1_seq_num);
channel = inbuf->khp_data_v1_channel;
src_host->got_data = 1;

if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
if (!_seq_num_lookup(src_host, inbuf->khp_data_v1_seq_num, 0, 0)) {
if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
}
return;
}

if (inbuf->khp_data_frag_num > 1) {
if (inbuf->khp_data_v1_frag_num > 1) {
/*
* len as received from the socket also includes extra stuff
* that the defrag code doesn't care about. So strip it
* here and readd only for repadding once we are done
* defragging
*/
len = len - KNET_HEADER_DATA_SIZE;
len = len - KNET_HEADER_DATA_V1_SIZE;
if (pckt_defrag(knet_h, inbuf, &len)) {
return;
}
len = len + KNET_HEADER_DATA_SIZE;
len = len + KNET_HEADER_DATA_V1_SIZE;
}

if (inbuf->khp_data_compress) {
if (inbuf->khp_data_v1_compress) {
ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
struct timespec start_time;
struct timespec end_time;
uint64_t compress_time;

clock_gettime(CLOCK_MONOTONIC, &start_time);
err = decompress(knet_h, inbuf->khp_data_compress,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
err = decompress(knet_h, inbuf->khp_data_v1_compress,
(const unsigned char *)inbuf->khp_data_v1_userdata,
len - KNET_HEADER_DATA_V1_SIZE,
knet_h->recv_from_links_buf_decompress,
&decmp_outlen);

Expand Down Expand Up @@ -325,8 +325,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc
knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;

memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_SIZE;
memmove(inbuf->khp_data_v1_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
len = decmp_outlen + KNET_HEADER_DATA_V1_SIZE;
} else {
knet_h->stats.rx_failed_to_decompress++;
pthread_mutex_unlock(&knet_h->handle_stats_mutex);
Expand All @@ -351,8 +351,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc

bcast = knet_h->dst_host_filter_fn(
knet_h->dst_host_filter_fn_private_data,
(const unsigned char *)inbuf->khp_data_userdata,
len - KNET_HEADER_DATA_SIZE,
(const unsigned char *)inbuf->khp_data_v1_userdata,
len - KNET_HEADER_DATA_V1_SIZE,
KNET_NOTIFY_RX,
knet_h->host_id,
inbuf->kh_node,
Expand Down Expand Up @@ -399,8 +399,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc
memset(iov_out, 0, sizeof(iov_out));

retry:
iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
iov_out[0].iov_base = (void *) inbuf->khp_data_v1_userdata + outlen;
iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_V1_SIZE);

outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
Expand All @@ -420,7 +420,7 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc
return;
}
if ((size_t)outlen == iov_out[0].iov_len) {
_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
_seq_num_set(src_host, inbuf->khp_data_v1_seq_num, 0);
}
}

Expand Down Expand Up @@ -505,6 +505,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
default:
log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version);
return;
break;
}

if (src_link->dynamic == KNET_LINK_DYNIP) {
Expand Down Expand Up @@ -569,16 +570,19 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud(knet_h, src_link, inbuf);
return; /* Don't need to unlock link_stats_mutex */
break;
case KNET_HEADER_TYPE_PMTUD_REPLY:
src_link->status.stats.rx_pmtu_packets++;
src_link->status.stats.rx_pmtu_bytes += len;
/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
pthread_mutex_unlock(&src_link->link_stats_mutex);
process_pmtud_reply(knet_h, src_link, inbuf);
return;
break;
default:
pthread_mutex_unlock(&src_link->link_stats_mutex);
return;
break;
}
pthread_mutex_unlock(&src_link->link_stats_mutex);
}
Expand Down

0 comments on commit 3b00ec9

Please sign in to comment.