Skip to content

Commit

Permalink
[onwire] fill in packets with dynamic information
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 9, 2020
1 parent 028eefc commit 54458ed
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 9 deletions.
28 changes: 28 additions & 0 deletions libknet/handle.c
Expand Up @@ -142,6 +142,13 @@ static int _init_locks(knet_handle_t knet_h)
goto exit_fail;
}

savederrno = pthread_mutex_init(&knet_h->onwire_mutex, NULL);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to initialize onwire_mutex mutex: %s",
strerror(savederrno));
goto exit_fail;
}

return 0;

exit_fail:
Expand All @@ -161,6 +168,7 @@ static void _destroy_locks(knet_handle_t knet_h)
pthread_mutex_destroy(&knet_h->tx_seq_num_mutex);
pthread_mutex_destroy(&knet_h->threads_status_mutex);
pthread_mutex_destroy(&knet_h->handle_stats_mutex);
pthread_mutex_destroy(&knet_h->onwire_mutex);
}

static int _init_socks(knet_handle_t knet_h)
Expand Down Expand Up @@ -637,6 +645,26 @@ knet_handle_t knet_handle_new(knet_node_id_t host_id,
knet_h->stats.tx_crypt_time_min = UINT64_MAX;
knet_h->stats.rx_crypt_time_min = UINT64_MAX;

/*
* set onwire version
*/

#ifdef DEBUG
/*
* need this small hack to test the code without new packets and
* override internal defaults
*/
if (getenv("KNET_PROTO_VER")) {
knet_h->onwire_ver = atoi(getenv("KNET_PROTO_VER"));
} else {
knet_h->onwire_ver = KNET_HEADER_ONWIRE_MAX_VER;
}
#else
knet_h->onwire_ver = KNET_HEADER_ONWIRE_MAX_VER;
#endif

log_info(knet_h, KNET_SUB_HANDLE, "Default onwire version: %u", knet_h->onwire_ver);

/*
* init global shlib tracker
*/
Expand Down
2 changes: 2 additions & 0 deletions libknet/internals.h
Expand Up @@ -209,6 +209,8 @@ struct knet_handle {
pthread_mutex_t hb_mutex; /* used to protect heartbeat thread and seq_num broadcasting */
pthread_mutex_t backoff_mutex; /* used to protect dst_link->pong_timeout_adj */
pthread_mutex_t kmtu_mutex; /* used to protect kernel_mtu */
pthread_mutex_t onwire_mutex; /* used to protect onwire version */
uint8_t onwire_ver;
uint32_t kernel_mtu; /* contains the MTU detected by the kernel on a given link */
int pmtud_waiting;
int pmtud_running;
Expand Down
17 changes: 14 additions & 3 deletions libknet/onwire.h
Expand Up @@ -77,7 +77,14 @@ union knet_header_payload {
* starting point
*/

#define KNET_HEADER_VERSION 0x01 /* we currently support only one version */
/*
* Plan is to support MAX_VER with MIN_VER = MAX_VER - 1
* but for the sake of not rewriting the world later on,
* let´s make sure we can support a random range of protocol
* versions
*/
#define KNET_HEADER_ONWIRE_MAX_VER 0x01
#define KNET_HEADER_ONWIRE_MIN_VER 0x01

#define KNET_HEADER_TYPE_DATA 0x00 /* pure data packet */

Expand All @@ -87,12 +94,16 @@ union knet_header_payload {
#define KNET_HEADER_TYPE_PMTUD 0x83 /* Used to determine Path MTU */
#define KNET_HEADER_TYPE_PMTUD_REPLY 0x84 /* reply from remote host */

/*
* this header CANNOT change or onwire compat will break!
*/

struct knet_header {
uint8_t kh_version; /* pckt format/version */
uint8_t kh_version; /* this pckt format/version */
uint8_t kh_type; /* from above defines. Tells what kind of pckt it is */
knet_node_id_t kh_node; /* host id of the source host for this pckt */
uint8_t kh_max_ver; /* max version of the protocol supported by this node */
uint8_t kh_pad1; /* make sure to have space in the header to grow features */
uint8_t kh_pad2;
union knet_header_payload kh_payload; /* union of potential data struct based on kh_type */
} __attribute__((packed));

Expand Down
3 changes: 2 additions & 1 deletion libknet/threads_heartbeat.c
Expand Up @@ -62,7 +62,8 @@ static void send_ping(knet_handle_t knet_h, struct knet_host *dst_host, struct k

if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) {
/* preparing ping buffer */
knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pingbuf->kh_version = knet_h->onwire_ver;
knet_h->pingbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER;
knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
knet_h->pingbuf->kh_node = htons(knet_h->host_id);

Expand Down
3 changes: 2 additions & 1 deletion libknet/threads_pmtud.c
Expand Up @@ -551,7 +551,8 @@ void *_handle_pmtud_link_thread(void *data)
knet_h->data_mtu = calc_min_mtu(knet_h);

/* preparing pmtu buffer */
knet_h->pmtudbuf->kh_version = KNET_HEADER_VERSION;
knet_h->pmtudbuf->kh_version = knet_h->onwire_ver;
knet_h->pmtudbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER;
knet_h->pmtudbuf->kh_type = KNET_HEADER_TYPE_PMTUD;
knet_h->pmtudbuf->kh_node = htons(knet_h->host_id);

Expand Down
13 changes: 11 additions & 2 deletions libknet/threads_rx.c
Expand Up @@ -285,8 +285,17 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
return;
}

if (inbuf->kh_version != KNET_HEADER_VERSION) {
log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
if ((inbuf->kh_version > KNET_HEADER_ONWIRE_MAX_VER) &&
(inbuf->kh_version < KNET_HEADER_ONWIRE_MIN_VER)) {
if (KNET_HEADER_ONWIRE_MAX_VER > 1 ) {
log_debug(knet_h, KNET_SUB_RX,
"Received packet version %u. current node only supports onwire version from %u to %u",
inbuf->kh_version, KNET_HEADER_ONWIRE_MIN_VER, KNET_HEADER_ONWIRE_MAX_VER);
} else {
log_debug(knet_h, KNET_SUB_RX,
"Received packet version %u. current node only supports %u",
inbuf->kh_version, KNET_HEADER_ONWIRE_MAX_VER);
}
return;
}

Expand Down
6 changes: 4 additions & 2 deletions libknet/threads_tx.c
Expand Up @@ -171,7 +171,8 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan

inbuf = knet_h->recv_from_sock_buf;
inbuf->kh_type = KNET_HEADER_TYPE_DATA;
inbuf->kh_version = KNET_HEADER_VERSION;
inbuf->kh_version = knet_h->onwire_ver;
inbuf->kh_max_ver = KNET_HEADER_ONWIRE_MAX_VER;
inbuf->khp_data_frag_seq = 0;
inbuf->kh_node = htons(knet_h->host_id);

Expand Down Expand Up @@ -468,7 +469,8 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
/*
* copy the frag info on all buffers
*/
knet_h->send_to_links_buf[frag_idx]->kh_version = KNET_HEADER_VERSION;
knet_h->send_to_links_buf[frag_idx]->kh_version = inbuf->kh_version;
knet_h->send_to_links_buf[frag_idx]->kh_max_ver = inbuf->kh_max_ver;
knet_h->send_to_links_buf[frag_idx]->kh_node = htons(knet_h->host_id);
knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
Expand Down

0 comments on commit 54458ed

Please sign in to comment.