From 3b00ec9a98fa08666dada7247c82895bd5651056 Mon Sep 17 00:00:00 2001 From: "Fabio M. Di Nitto" Date: Wed, 16 Sep 2020 14:14:01 +0200 Subject: [PATCH] [WIP] re organize tx and rx Signed-off-by: Fabio M. Di Nitto --- libknet/onwire.h | 20 +- libknet/tests/pckt_test.c | 2 +- libknet/threads_rx.c | 66 ++--- libknet/threads_tx.c | 572 +++++++++++++++++++++++--------------- 4 files changed, 387 insertions(+), 273 deletions(-) diff --git a/libknet/onwire.h b/libknet/onwire.h index f9fb218db..8ac4e4620 100644 --- a/libknet/onwire.h +++ b/libknet/onwire.h @@ -47,7 +47,7 @@ typedef uint16_t seq_num_t; /* data sequence number required to deduplicate pckts */ #define SEQ_MAX UINT16_MAX -struct knet_header_payload_data { +struct knet_header_payload_data_v1 { seq_num_t khp_data_seq_num; /* pckt seq number used to deduplicate pckts */ uint8_t khp_data_compress; /* identify if user data are compressed */ uint8_t khp_data_pad1; /* make sure to have space in the header to grow features */ @@ -58,13 +58,13 @@ struct knet_header_payload_data { uint8_t khp_data_userdata[0]; /* pointer to the real user data */ } __attribute__((packed)); -#define khp_data_seq_num kh_payload.khp_data.khp_data_seq_num -#define khp_data_frag_num kh_payload.khp_data.khp_data_frag_num -#define khp_data_frag_seq kh_payload.khp_data.khp_data_frag_seq -#define khp_data_userdata kh_payload.khp_data.khp_data_userdata -#define khp_data_bcast kh_payload.khp_data.khp_data_bcast -#define khp_data_channel kh_payload.khp_data.khp_data_channel -#define khp_data_compress kh_payload.khp_data.khp_data_compress +#define khp_data_v1_seq_num kh_payload.khp_data_v1.khp_data_seq_num +#define khp_data_v1_frag_num kh_payload.khp_data_v1.khp_data_frag_num +#define khp_data_v1_frag_seq kh_payload.khp_data_v1.khp_data_frag_seq +#define khp_data_v1_userdata kh_payload.khp_data_v1.khp_data_userdata +#define khp_data_v1_bcast kh_payload.khp_data_v1.khp_data_bcast +#define khp_data_v1_channel kh_payload.khp_data_v1.khp_data_channel +#define khp_data_v1_compress kh_payload.khp_data_v1.khp_data_compress /* * KNET_HEADER_TYPE_PING / KNET_HEADER_TYPE_PONG @@ -125,7 +125,7 @@ size_t calc_min_mtu(knet_handle_t knet_h); */ union knet_header_payload { - struct knet_header_payload_data khp_data; /* pure data packet struct */ + struct knet_header_payload_data_v1 khp_data_v1; /* pure data packet struct */ struct knet_header_payload_ping_v1 khp_ping_v1; /* heartbeat packet struct */ struct knet_header_payload_pmtud_v1 khp_pmtud_v1; /* Path MTU discovery packet struct */ } __attribute__((packed)); @@ -151,6 +151,6 @@ struct knet_header { #define KNET_HEADER_SIZE (KNET_HEADER_ALL_SIZE - sizeof(union knet_header_payload)) #define KNET_HEADER_PING_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_ping_v1)) #define KNET_HEADER_PMTUD_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_pmtud_v1)) -#define KNET_HEADER_DATA_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data)) +#define KNET_HEADER_DATA_V1_SIZE (KNET_HEADER_SIZE + sizeof(struct knet_header_payload_data_v1)) #endif diff --git a/libknet/tests/pckt_test.c b/libknet/tests/pckt_test.c index e9b736904..35e73c37e 100644 --- a/libknet/tests/pckt_test.c +++ b/libknet/tests/pckt_test.c @@ -17,7 +17,7 @@ int main(void) printf("KNET_HEADER_SIZE: %zu\n", KNET_HEADER_SIZE); printf("KNET_HEADER_PING_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PING_V1_SIZE, sizeof(struct knet_header_payload_ping_v1)); printf("KNET_HEADER_PMTUD_V1_SIZE: %zu (%zu)\n", KNET_HEADER_PMTUD_V1_SIZE, sizeof(struct knet_header_payload_pmtud_v1)); - printf("KNET_HEADER_DATA_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_SIZE, sizeof(struct knet_header_payload_data)); + printf("KNET_HEADER_DATA_V1_SIZE: %zu (%zu)\n", KNET_HEADER_DATA_V1_SIZE, sizeof(struct knet_header_payload_data_v1)); return 0; } diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index 6655c60bf..949551e73 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -73,7 +73,7 @@ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) */ for (i = 0; i < KNET_MAX_LINK; i++) { if (src_host->defrag_buf[i].in_use) { - if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) { + if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_v1_seq_num) { return i; } } @@ -86,7 +86,7 @@ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) * buffer. If the pckt has been seen before, the buffer expired (ETIME) * and there is no point to try to defrag it again. */ - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) { + if (!_seq_num_lookup(src_host, inbuf->khp_data_v1_seq_num, 1, 0)) { errno = ETIME; return -1; } @@ -94,7 +94,7 @@ static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf) /* * register the pckt as seen */ - _seq_num_set(src_host, inbuf->khp_data_seq_num, 1); + _seq_num_set(src_host, inbuf->khp_data_v1_seq_num, 1); /* * see if there is a free buffer @@ -140,7 +140,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t if (!defrag_buf->in_use) { memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf)); defrag_buf->in_use = 1; - defrag_buf->pckt_seq = inbuf->khp_data_seq_num; + defrag_buf->pckt_seq = inbuf->khp_data_v1_seq_num; } /* @@ -151,7 +151,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t /* * check if we already received this fragment */ - if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) { + if (defrag_buf->frag_map[inbuf->khp_data_v1_frag_seq]) { /* * if we have received this fragment and we didn't clear the buffer * it means that we don't have all fragments yet @@ -163,7 +163,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t * we need to handle the last packet with gloves due to its different size */ - if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) { + if (inbuf->khp_data_v1_frag_seq == inbuf->khp_data_v1_frag_num) { defrag_buf->last_frag_size = *len; /* @@ -177,7 +177,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t if (!defrag_buf->frag_size) { defrag_buf->last_first = 1; memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len), - inbuf->khp_data_userdata, + inbuf->khp_data_v1_userdata, *len); } } else { @@ -185,23 +185,23 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t } if (defrag_buf->frag_size) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); + memmove(defrag_buf->buf + ((inbuf->khp_data_v1_frag_seq - 1) * defrag_buf->frag_size), + inbuf->khp_data_v1_userdata, *len); } defrag_buf->frag_recv++; - defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; + defrag_buf->frag_map[inbuf->khp_data_v1_frag_seq] = 1; /* * check if we received all the fragments */ - if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) { + if (defrag_buf->frag_recv == inbuf->khp_data_v1_frag_num) { /* * special case the last pckt */ if (defrag_buf->last_first) { - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size), + memmove(defrag_buf->buf + ((inbuf->khp_data_v1_frag_num - 1) * defrag_buf->frag_size), defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size), defrag_buf->last_frag_size); } @@ -210,12 +210,12 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t * recalculate packet lenght */ - *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; + *len = ((inbuf->khp_data_v1_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size; /* * copy the pckt back in the user data */ - memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len); + memmove(inbuf->khp_data_v1_userdata, defrag_buf->buf, *len); /* * free this buffer @@ -262,41 +262,41 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc pthread_mutex_unlock(&knet_h->handle_stats_mutex); } - inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num); - channel = inbuf->khp_data_channel; + inbuf->khp_data_v1_seq_num = ntohs(inbuf->khp_data_v1_seq_num); + channel = inbuf->khp_data_v1_channel; src_host->got_data = 1; - if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) { + if (!_seq_num_lookup(src_host, inbuf->khp_data_v1_seq_num, 0, 0)) { if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) { log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered"); } return; } - if (inbuf->khp_data_frag_num > 1) { + if (inbuf->khp_data_v1_frag_num > 1) { /* * len as received from the socket also includes extra stuff * that the defrag code doesn't care about. So strip it * here and readd only for repadding once we are done * defragging */ - len = len - KNET_HEADER_DATA_SIZE; + len = len - KNET_HEADER_DATA_V1_SIZE; if (pckt_defrag(knet_h, inbuf, &len)) { return; } - len = len + KNET_HEADER_DATA_SIZE; + len = len + KNET_HEADER_DATA_V1_SIZE; } - if (inbuf->khp_data_compress) { + if (inbuf->khp_data_v1_compress) { ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; uint64_t compress_time; clock_gettime(CLOCK_MONOTONIC, &start_time); - err = decompress(knet_h, inbuf->khp_data_compress, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, + err = decompress(knet_h, inbuf->khp_data_v1_compress, + (const unsigned char *)inbuf->khp_data_v1_userdata, + len - KNET_HEADER_DATA_V1_SIZE, knet_h->recv_from_links_buf_decompress, &decmp_outlen); @@ -325,8 +325,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc knet_h->stats.rx_compressed_original_bytes += decmp_outlen; knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE; - memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); - len = decmp_outlen + KNET_HEADER_DATA_SIZE; + memmove(inbuf->khp_data_v1_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen); + len = decmp_outlen + KNET_HEADER_DATA_V1_SIZE; } else { knet_h->stats.rx_failed_to_decompress++; pthread_mutex_unlock(&knet_h->handle_stats_mutex); @@ -351,8 +351,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc bcast = knet_h->dst_host_filter_fn( knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - len - KNET_HEADER_DATA_SIZE, + (const unsigned char *)inbuf->khp_data_v1_userdata, + len - KNET_HEADER_DATA_V1_SIZE, KNET_NOTIFY_RX, knet_h->host_id, inbuf->kh_node, @@ -399,8 +399,8 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc memset(iov_out, 0, sizeof(iov_out)); retry: - iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen; - iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE); + iov_out[0].iov_base = (void *) inbuf->khp_data_v1_userdata + outlen; + iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_V1_SIZE); outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1); if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) { @@ -420,7 +420,7 @@ static void process_data(knet_handle_t knet_h, struct knet_host *src_host, struc return; } if ((size_t)outlen == iov_out[0].iov_len) { - _seq_num_set(src_host, inbuf->khp_data_seq_num, 0); + _seq_num_set(src_host, inbuf->khp_data_v1_seq_num, 0); } } @@ -505,6 +505,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc default: log_warn(knet_h, KNET_SUB_RX, "Parsing ping onwire version %u not supported", inbuf->kh_version); return; + break; } if (src_link->dynamic == KNET_LINK_DYNIP) { @@ -569,6 +570,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud(knet_h, src_link, inbuf); return; /* Don't need to unlock link_stats_mutex */ + break; case KNET_HEADER_TYPE_PMTUD_REPLY: src_link->status.stats.rx_pmtu_packets++; src_link->status.stats.rx_pmtu_bytes += len; @@ -576,9 +578,11 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc pthread_mutex_unlock(&src_link->link_stats_mutex); process_pmtud_reply(knet_h, src_link, inbuf); return; + break; default: pthread_mutex_unlock(&src_link->link_stats_mutex); return; + break; } pthread_mutex_unlock(&src_link->link_stats_mutex); } diff --git a/libknet/threads_tx.c b/libknet/threads_tx.c index 7c1cfa882..e7793ba94 100644 --- a/libknet/threads_tx.c +++ b/libknet/threads_tx.c @@ -141,197 +141,46 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, return err; } -static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync) +static int _dispatch_to_local(knet_handle_t knet_h, unsigned char* data, size_t inlen, int8_t channel) { - size_t outlen, frag_len; - struct knet_host *dst_host; - knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; - size_t dst_host_ids_entries_temp = 0; - knet_node_id_t dst_host_ids[KNET_MAX_HOST]; - size_t dst_host_ids_entries = 0; - int bcast = 1; - struct iovec iov_out[PCKT_FRAG_MAX][2]; - int iovcnt_out = 2; - uint8_t frag_idx; - unsigned int temp_data_mtu; - size_t host_idx; - int send_mcast = 0; - struct knet_header *inbuf; - int savederrno = 0; - int err = 0; - seq_num_t tx_seq_num; - struct knet_mmsghdr msg[PCKT_FRAG_MAX]; - int msgs_to_send, msg_idx; - unsigned int i; - int j; - int send_local = 0; - int data_compressed = 0; - size_t uncrypted_frag_size; - int stats_locked = 0, stats_err = 0; - - inbuf = knet_h->recv_from_sock_buf; - inbuf->kh_type = KNET_HEADER_TYPE_DATA; - inbuf->kh_version = knet_h->onwire_ver; - inbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; - inbuf->khp_data_frag_seq = 0; - inbuf->kh_node = htons(knet_h->host_id); - - if (knet_h->enabled != 1) { - log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); - savederrno = ECANCELED; - err = -1; - goto out_unlock; - } - - if (knet_h->dst_host_filter_fn) { - bcast = knet_h->dst_host_filter_fn( - knet_h->dst_host_filter_fn_private_data, - (const unsigned char *)inbuf->khp_data_userdata, - inlen, - KNET_NOTIFY_TX, - knet_h->host_id, - knet_h->host_id, - &channel, - dst_host_ids_temp, - &dst_host_ids_entries_temp); - if (bcast < 0) { - log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast); - savederrno = EFAULT; - err = -1; - goto out_unlock; - } + int err = 0, savederrno = 0; + const unsigned char *buf = data; + ssize_t buflen = inlen; + struct knet_link *local_link = knet_h->host_index[knet_h->host_id]->link; - if ((!bcast) && (!dst_host_ids_entries_temp)) { - log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } - - if ((!bcast) && - (dst_host_ids_entries_temp > KNET_MAX_HOST)) { - log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); - savederrno = EINVAL; - err = -1; - goto out_unlock; - } +local_retry: + err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); + savederrno = errno; + if (err < 0) { + log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); + local_link->status.stats.tx_data_errors++; + goto out; } - - /* Send to localhost if appropriate and enabled */ - if (knet_h->has_loop_link) { - send_local = 0; - if (bcast) { - send_local = 1; - } else { - for (i=0; i< dst_host_ids_entries_temp; i++) { - if (dst_host_ids_temp[i] == knet_h->host_id) { - send_local = 1; - } - } - } - if (send_local) { - const unsigned char *buf = inbuf->khp_data_userdata; - ssize_t buflen = inlen; - struct knet_link *local_link; - - local_link = knet_h->host_index[knet_h->host_id]->link; - - local_retry: - err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen); - if (err < 0) { - log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno)); - local_link->status.stats.tx_data_errors++; - } - if (err > 0 && err < buflen) { - log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); - local_link->status.stats.tx_data_retries++; - buf += err; - buflen -= err; - goto local_retry; - } - if (err == buflen) { - local_link->status.stats.tx_data_packets++; - local_link->status.stats.tx_data_bytes += inlen; - } - } + if (err > 0 && err < buflen) { + log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen); + local_link->status.stats.tx_data_retries++; + buf += err; + buflen -= err; + goto local_retry; } - - if (is_sync) { - if ((bcast) || - ((!bcast) && (dst_host_ids_entries_temp > 1))) { - log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); - savederrno = E2BIG; - err = -1; - goto out_unlock; - } - } - - /* - * check destinations hosts before spending time - * in fragmenting/encrypting packets to save - * time processing data for unreachable hosts. - * for unicast, also remap the destination data - * to skip unreachable hosts. - */ - - if (!bcast) { - dst_host_ids_entries = 0; - for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { - dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; - if (!dst_host) { - continue; - } - if (!(dst_host->host_id == knet_h->host_id && - knet_h->has_loop_link) && - dst_host->status.reachable) { - dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx]; - dst_host_ids_entries++; - } - } - if (!dst_host_ids_entries) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } - } else { - send_mcast = 0; - for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { - if (!(dst_host->host_id == knet_h->host_id && - knet_h->has_loop_link) && - dst_host->status.reachable) { - send_mcast = 1; - break; - } - } - if (!send_mcast) { - savederrno = EHOSTDOWN; - err = -1; - goto out_unlock; - } + if (err == buflen) { + local_link->status.stats.tx_data_packets++; + local_link->status.stats.tx_data_bytes += inlen; } +out: + errno = savederrno; + return err; +} - if (!knet_h->data_mtu) { - /* - * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough - */ - log_debug(knet_h, KNET_SUB_TX, - "Received data packet but data MTU is still unknown." - " Packet might not be delivered." - " Assuming minimum IPv4 MTU (%d)", - KNET_PMTUD_MIN_MTU_V4); - temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; - } else { - /* - * take a copy of the mtu to avoid value changing under - * our feet while we are sending a fragmented pckt - */ - temp_data_mtu = knet_h->data_mtu; - } +static int _compress_data(knet_handle_t knet_h, unsigned char* data, size_t *inlen, int *data_compressed) +{ + int err = 0, savederrno = 0; + int stats_locked = 0, stats_err = 0; /* * compress data */ - if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) { + if ((knet_h->compress_model > 0) && (*inlen > knet_h->compress_threshold)) { size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS; struct timespec start_time; struct timespec end_time; @@ -339,7 +188,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan clock_gettime(CLOCK_MONOTONIC, &start_time); err = compress(knet_h, - (const unsigned char *)inbuf->khp_data_userdata, inlen, + data, *inlen, knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen); savederrno = errno; @@ -349,7 +198,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; - goto out_unlock; + goto out; } stats_locked = 1; /* Collect stats */ @@ -370,13 +219,13 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno)); } else { knet_h->stats.tx_compressed_packets++; - knet_h->stats.tx_compressed_original_bytes += inlen; + knet_h->stats.tx_compressed_original_bytes += *inlen; knet_h->stats.tx_compressed_size_bytes += cmp_outlen; - if (cmp_outlen < inlen) { - memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen); - inlen = cmp_outlen; - data_compressed = 1; + if (cmp_outlen < *inlen) { + memmove(data, knet_h->send_to_links_buf_compress, cmp_outlen); + *inlen = cmp_outlen; + *data_compressed = 1; } else { knet_h->stats.tx_unable_to_compress++; } @@ -388,35 +237,31 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err)); err = -1; savederrno = stats_err; - goto out_unlock; + goto out; } } - if (knet_h->compress_model > 0 && !data_compressed) { + if (knet_h->compress_model > 0 && !*data_compressed) { knet_h->stats.tx_uncompressed_packets++; } pthread_mutex_unlock(&knet_h->handle_stats_mutex); stats_locked = 0; - /* - * prepare the outgoing buffers - */ - - frag_len = inlen; - frag_idx = 0; +out: + errno = savederrno; + return err; +} - inbuf->khp_data_bcast = bcast; - inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu); - inbuf->khp_data_channel = channel; - if (data_compressed) { - inbuf->khp_data_compress = knet_h->compress_model; - } else { - inbuf->khp_data_compress = 0; - } +static int _get_tx_seq_num(knet_handle_t knet_h, seq_num_t *tx_seq_num) +{ + int savederrno = 0; - if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) { + savederrno = pthread_mutex_lock(&knet_h->tx_seq_num_mutex); + if (savederrno) { log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock"); - goto out_unlock; + errno = savederrno; + return -1; } + knet_h->tx_seq_num++; /* * force seq_num 0 to detect a node that has crashed and rejoining @@ -429,8 +274,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan /* * cache the value in locked context */ - tx_seq_num = knet_h->tx_seq_num; - inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num); + *tx_seq_num = knet_h->tx_seq_num; pthread_mutex_unlock(&knet_h->tx_seq_num_mutex); /* @@ -444,18 +288,251 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan * rollover of the circular buffer */ - if (tx_seq_num % (SEQ_MAX / 8) == 0) { + if (*tx_seq_num % (SEQ_MAX / 8) == 0) { _send_pings(knet_h, 0); } + return 0; +} + + +static int _get_data_dests(knet_handle_t knet_h, unsigned char* data, size_t inlen, + int8_t *channel, int *bcast, int *send_mcast, int *send_local, + knet_node_id_t *dst_host_ids, size_t *dst_host_ids_entries, + int is_sync) +{ + int err = 0, savederrno = 0; + knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST]; /* store destinations from filter */ + size_t dst_host_ids_entries_temp = 0; + size_t dst_host_ids_entries_temp2 = 0; /* workaround gcc here */ + struct knet_host *dst_host; + size_t host_idx; + + if (knet_h->dst_host_filter_fn) { + *bcast = knet_h->dst_host_filter_fn( + knet_h->dst_host_filter_fn_private_data, + data, + inlen, + KNET_NOTIFY_TX, + knet_h->host_id, + knet_h->host_id, + channel, + dst_host_ids_temp, + &dst_host_ids_entries_temp); + if (*bcast < 0) { + log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", *bcast); + savederrno = EFAULT; + err = -1; + goto out; + } + + if ((!*bcast) && (!dst_host_ids_entries_temp)) { + log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries"); + savederrno = EINVAL; + err = -1; + goto out; + } + + if ((!*bcast) && + (dst_host_ids_entries_temp > KNET_MAX_HOST)) { + log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations"); + savederrno = EINVAL; + err = -1; + goto out; + } + + if (is_sync) { + if ((*bcast) || + ((!*bcast) && (dst_host_ids_entries_temp > 1))) { + log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination"); + savederrno = E2BIG; + err = -1; + goto out; + } + } + } + + /* + * check destinations hosts before spending time + * in fragmenting/encrypting packets to save + * time processing data for unreachable hosts. + * for unicast, also remap the destination data + * to skip unreachable hosts. + */ - if (inbuf->khp_data_frag_num > 1) { - while (frag_idx < inbuf->khp_data_frag_num) { + if (!*bcast) { + *dst_host_ids_entries = dst_host_ids_entries_temp2; + for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) { + dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]]; + if (!dst_host) { + continue; + } + if ((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) { + *send_local = 1; + } + if (!((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) && + dst_host->status.reachable) { + dst_host_ids[dst_host_ids_entries_temp2] = dst_host_ids_temp[host_idx]; + dst_host_ids_entries_temp2++; + } + } + if ((!dst_host_ids_entries_temp2) && (!*send_local)) { + savederrno = EHOSTDOWN; + err = -1; + goto out; + } + *dst_host_ids_entries = dst_host_ids_entries_temp2; + } else { + *send_mcast = 0; + *send_local = 0; + for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) { + if ((dst_host->host_id == knet_h->host_id) && + (knet_h->has_loop_link)) { + *send_local = 1; + } + if (!(dst_host->host_id == knet_h->host_id && + knet_h->has_loop_link) && + dst_host->status.reachable) { + *send_mcast = 1; + } + } + if ((!*send_mcast) && (!*send_local)) { + savederrno = EHOSTDOWN; + err = -1; + goto out; + } + } + +out: + errno = savederrno; + return err; +} + +static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, uint8_t onwire_ver, int is_sync) +{ + int err = 0, savederrno = 0; + struct knet_header *inbuf = knet_h->recv_from_sock_buf; /* all TX packets are stored here regardless of the onwire */ + unsigned char *data; /* onwire neutrual pointer to data to send */ + int data_compressed = 0; /* track data compression to fill the header */ + seq_num_t tx_seq_num; + + int bcast = 1; /* assume all packets are to be broadcasted unless filter tells us differently */ + knet_node_id_t dst_host_ids[KNET_MAX_HOST]; /* store destinations from filter */ + size_t dst_host_ids_entries = 0; + int send_mcast = 0; /* send packets to all nodes or not */ + int send_local = 0; /* send packets to loopback */ + + size_t outlen, frag_len; + struct knet_host *dst_host; + struct iovec iov_out[PCKT_FRAG_MAX][2]; + int iovcnt_out = 2; + uint8_t frag_idx; + unsigned int temp_data_mtu; + size_t host_idx; + struct knet_mmsghdr msg[PCKT_FRAG_MAX]; + int msgs_to_send, msg_idx; + int j; + size_t uncrypted_frag_size; + int stats_err = 0; + + if (knet_h->enabled != 1) { + log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled"); + savederrno = ECANCELED; + err = -1; + goto out_unlock; + } + + switch (onwire_ver) { + case 1: + data = (unsigned char *)inbuf->khp_data_v1_userdata; + break; + default: /* this should never hit as filters are in place in the calling functions */ + log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); + savederrno = EINVAL; + err = -1; + goto out_unlock; + break; + } + + err = _get_data_dests(knet_h, data, inlen, + &channel, &bcast, &send_mcast, &send_local, + dst_host_ids, &dst_host_ids_entries, + is_sync); + if (err < 0) { + savederrno = errno; + goto out_unlock; + } + + /* Send to localhost if appropriate and enabled */ + if (send_local) { + err = _dispatch_to_local(knet_h, data, inlen, channel); + if (err < 0) { + savederrno = errno; + goto out_unlock; + } + } + + err = _compress_data(knet_h, data, &inlen, &data_compressed); + if (err < 0) { + savederrno = errno; + goto out_unlock; + } + + err = _get_tx_seq_num(knet_h, &tx_seq_num); + if (err < 0) { + savederrno = errno; + goto out_unlock; + } + + if (!knet_h->data_mtu) { + /* + * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough + */ + log_debug(knet_h, KNET_SUB_TX, + "Received data packet but data MTU is still unknown." + " Packet might not be delivered." + " Assuming minimum IPv4 MTU (%d)", + KNET_PMTUD_MIN_MTU_V4); + temp_data_mtu = KNET_PMTUD_MIN_MTU_V4; + } else { + /* + * take a copy of the mtu to avoid value changing under + * our feet while we are sending a fragmented pckt + */ + temp_data_mtu = knet_h->data_mtu; + } + + /* + * prepare the outgoing buffers + */ + + frag_len = inlen; + frag_idx = 0; + + inbuf->kh_type = KNET_HEADER_TYPE_DATA; + inbuf->kh_version = onwire_ver; + inbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER; + inbuf->khp_data_v1_frag_seq = 0; + inbuf->kh_node = htons(knet_h->host_id); + inbuf->khp_data_v1_bcast = bcast; + inbuf->khp_data_v1_frag_num = ceil((float)inlen / temp_data_mtu); + inbuf->khp_data_v1_channel = channel; + if (data_compressed) { + inbuf->khp_data_v1_compress = knet_h->compress_model; + } else { + inbuf->khp_data_v1_compress = 0; + } + inbuf->khp_data_v1_seq_num = htons(tx_seq_num); + + if (inbuf->khp_data_v1_frag_num > 1) { + while (frag_idx < inbuf->khp_data_v1_frag_num) { /* * set the iov_base */ iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx]; - iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE; - iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx); + iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_V1_SIZE; + iov_out[frag_idx][1].iov_base = data + (temp_data_mtu * frag_idx); /* * set the len @@ -473,12 +550,12 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan knet_h->send_to_links_buf[frag_idx]->kh_max_ver = inbuf->kh_max_ver; knet_h->send_to_links_buf[frag_idx]->kh_node = htons(knet_h->host_id); knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type; - knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num; - knet_h->send_to_links_buf[frag_idx]->khp_data_frag_seq = frag_idx + 1; - knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast; - knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel; - knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_seq_num = inbuf->khp_data_v1_seq_num; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_num = inbuf->khp_data_v1_frag_num; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_frag_seq = frag_idx + 1; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_bcast = inbuf->khp_data_v1_bcast; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_channel = inbuf->khp_data_v1_channel; + knet_h->send_to_links_buf[frag_idx]->khp_data_v1_compress = inbuf->khp_data_v1_compress; frag_len = frag_len - temp_data_mtu; frag_idx++; @@ -486,7 +563,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan iovcnt_out = 2; } else { iov_out[frag_idx][0].iov_base = (void *)inbuf; - iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE; + iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_V1_SIZE; iovcnt_out = 1; } @@ -496,7 +573,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan uint64_t crypt_time; frag_idx = 0; - while (frag_idx < inbuf->khp_data_frag_num) { + while (frag_idx < inbuf->khp_data_v1_frag_num) { clock_gettime(CLOCK_MONOTONIC, &start_time); if (crypto_encrypt_and_signv( knet_h, @@ -546,7 +623,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan memset(&msg, 0, sizeof(msg)); - msgs_to_send = inbuf->khp_data_frag_num; + msgs_to_send = inbuf->khp_data_v1_frag_num; msg_idx = 0; @@ -584,7 +661,7 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan return err; } -static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t channel) +static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, uint8_t onwire_ver, int8_t channel) { ssize_t inlen = 0; int savederrno = 0, docallback = 0; @@ -593,8 +670,16 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann struct sockaddr_storage address; memset(&iov_in, 0, sizeof(iov_in)); - iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata; - iov_in.iov_len = KNET_MAX_PACKET_SIZE; + + switch (onwire_ver) { + case 1: + iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_v1_userdata; + iov_in.iov_len = KNET_MAX_PACKET_SIZE; + break; + default: + log_warn(knet_h, KNET_SUB_TX, "preparing data onwire version %u not supported", onwire_ver); + break; + } memset(&msg, 0, sizeof(struct msghdr)); msg.msg_name = &address; @@ -632,7 +717,7 @@ static void _handle_send_to_links(knet_handle_t knet_h, int sockfd, int8_t chann knet_h->sockfd[channel].has_error = 1; } } else { - _parse_recv_from_sock(knet_h, inlen, channel, 0); + _parse_recv_from_sock(knet_h, inlen, channel, onwire_ver, 0); } if (docallback) { @@ -652,6 +737,7 @@ void *_handle_send_to_links_thread(void *data) int i, nev; int flush, flush_queue_limit; int8_t channel; + uint8_t onwire_ver; set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED); @@ -705,6 +791,13 @@ void *_handle_send_to_links_thread(void *data) continue; } + if (pthread_mutex_lock(&knet_h->onwire_mutex)) { + log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); + goto out_unlock; + } + onwire_ver = knet_h->onwire_ver; + pthread_mutex_unlock(&knet_h->onwire_mutex); + for (i = 0; i < nev; i++) { for (channel = 0; channel < KNET_DATAFD_MAX; channel++) { if ((knet_h->sockfd[channel].in_use) && @@ -720,10 +813,10 @@ void *_handle_send_to_links_thread(void *data) log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock"); continue; } - _handle_send_to_links(knet_h, events[i].data.fd, channel); + _handle_send_to_links(knet_h, events[i].data.fd, onwire_ver, channel); pthread_mutex_unlock(&knet_h->tx_mutex); } - +out_unlock: pthread_rwlock_unlock(&knet_h->global_rwlock); } @@ -735,6 +828,7 @@ void *_handle_send_to_links_thread(void *data) int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel) { int savederrno = 0, err = 0; + uint8_t onwire_ver; if (!knet_h) { errno = EINVAL; @@ -780,6 +874,13 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len goto out; } + if (pthread_mutex_lock(&knet_h->onwire_mutex)) { + log_debug(knet_h, KNET_SUB_TX, "Unable to get onwire mutex lock"); + goto out; + } + onwire_ver = knet_h->onwire_ver; + pthread_mutex_unlock(&knet_h->onwire_mutex); + savederrno = pthread_mutex_lock(&knet_h->tx_mutex); if (savederrno) { log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s", @@ -788,12 +889,21 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len goto out; } - memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len); - err = _parse_recv_from_sock(knet_h, buff_len, channel, 1); + switch (onwire_ver) { + case 1: + memmove(knet_h->recv_from_sock_buf->khp_data_v1_userdata, buff, buff_len); + break; + default: + log_warn(knet_h, KNET_SUB_TX, "preparing sync data onwire version %u not supported", onwire_ver); + goto out_tx; + break; + } + + err = _parse_recv_from_sock(knet_h, buff_len, channel, onwire_ver, 1); savederrno = errno; +out_tx: pthread_mutex_unlock(&knet_h->tx_mutex); - out: pthread_rwlock_unlock(&knet_h->global_rwlock);