Skip to content

Commit

Permalink
[POC] DO NOT MERGE!!!
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Jun 27, 2019
1 parent 9c19879 commit b775899
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 13 deletions.
9 changes: 9 additions & 0 deletions libknet/handle.c
Expand Up @@ -1178,11 +1178,20 @@ int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
if (enabled) {
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
}
if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
}
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}

pthread_rwlock_unlock(&knet_h->global_rwlock);

usleep(knet_h->threads_timer_res * 2);
wait_all_threads_flush_queue(knet_h);

errno = 0;
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions libknet/internals.h
Expand Up @@ -178,6 +178,7 @@ struct knet_handle {
struct knet_header *pingbuf;
struct knet_header *pmtudbuf;
uint8_t threads_status[KNET_THREAD_MAX];
uint8_t threads_flush_queue[KNET_THREAD_MAX];
useconds_t threads_timer_res;
pthread_mutex_t threads_status_mutex;
pthread_t send_to_links_thread;
Expand Down
11 changes: 11 additions & 0 deletions libknet/tests/api_knet_send.c
Expand Up @@ -246,6 +246,17 @@ static void test(uint8_t transport)

flush_logs(logfds[0], stdout);

if (knet_handle_setfwd(knet_h, 0) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
knet_link_clear_config(knet_h, 1, 0);
knet_host_remove(knet_h, 1);
knet_handle_free(knet_h);
flush_logs(logfds[0], stdout);
close_logpipes(logfds);
exit(FAIL);
}

if (wait_for_packet(knet_h, 10, datafd, logfds[0], stdout)) {
printf("Error waiting for packet: %s\n", strerror(errno));
knet_link_set_enable(knet_h, 1, 0, 0);
Expand Down
5 changes: 5 additions & 0 deletions libknet/tests/test-common.c
Expand Up @@ -404,6 +404,11 @@ int knet_handle_stop(knet_handle_t knet_h)
return -1;
}

if (knet_handle_setfwd(knet_h, 0) < 0) {
printf("knet_handle_setfwd failed: %s\n", strerror(errno));
return -1;
}

if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
printf("knet_host_get_host_list failed: %s\n", strerror(errno));
return -1;
Expand Down
62 changes: 62 additions & 0 deletions libknet/threads_common.c
Expand Up @@ -109,6 +109,68 @@ static const char *get_thread_name(uint8_t thread_id)
return "unknown";
}

int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id)
{
uint8_t flush;

if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
return -1;
}

flush = knet_h->threads_flush_queue[thread_id];

pthread_mutex_unlock(&knet_h->threads_status_mutex);
return flush;
}

int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
{
if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock");
return -1;
}

knet_h->threads_flush_queue[thread_id] = status;

log_debug(knet_h, KNET_SUB_HANDLE, "Updated flush queue request for thread %s to %u",
get_thread_name(thread_id), status);

pthread_mutex_unlock(&knet_h->threads_status_mutex);
return 0;
}

int wait_all_threads_flush_queue(knet_handle_t knet_h)
{
uint8_t i = 0, found = 0;

while (!found) {
usleep(knet_h->threads_timer_res);

if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
continue;
}

found = 1;

for (i = 0; i < KNET_THREAD_MAX; i++) {
if (knet_h->threads_flush_queue[i] == KNET_THREAD_QUEUE_FLUSHED) {
continue;
}
log_debug(knet_h, KNET_SUB_HANDLE, "Checking thread: %s queue: %u",
get_thread_name(i),
knet_h->threads_flush_queue[i]);
if (knet_h->threads_flush_queue[i] != KNET_THREAD_QUEUE_FLUSHED) {
found = 0;
}
}

pthread_mutex_unlock(&knet_h->threads_status_mutex);
}

return 0;
}

int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status)
{
if (pthread_mutex_lock(&knet_h->threads_status_mutex) != 0) {
Expand Down
6 changes: 6 additions & 0 deletions libknet/threads_common.h
Expand Up @@ -30,6 +30,9 @@
#endif
#define KNET_THREAD_MAX 32

#define KNET_THREAD_QUEUE_FLUSHED 0
#define KNET_THREAD_QUEUE_FLUSH 1

#define timespec_diff(start, end, diff) \
do { \
if (end.tv_sec > start.tv_sec) \
Expand All @@ -41,6 +44,9 @@ do { \

int shutdown_in_progress(knet_handle_t knet_h);
int get_global_wrlock(knet_handle_t knet_h);
int get_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id);
int set_thread_flush_queue(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
int wait_all_threads_flush_queue(knet_handle_t knet_h);
int set_thread_status(knet_handle_t knet_h, uint8_t thread_id, uint8_t status);
int wait_all_threads_status(knet_handle_t knet_h, uint8_t status);
void force_pmtud_run(knet_handle_t knet_h, uint8_t subsystem, uint8_t reset_mtu);
Expand Down
22 changes: 16 additions & 6 deletions libknet/threads_rx.c
Expand Up @@ -227,7 +227,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
return 1;
}

static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg, int flush)
{
int err = 0, savederrno = 0;
ssize_t outlen;
Expand Down Expand Up @@ -418,9 +418,13 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}

if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if (knet_h->enabled != 1) /* data forward is disabled */
if ((knet_h->enabled != 1) && (flush == KNET_THREAD_QUEUE_FLUSHED)) /* data forward is disabled */
break;

if (flush) {
log_debug(knet_h, KNET_SUB_RX, "flushing the queue");
}

/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
knet_h->stats.rx_crypt_time_ave =
Expand Down Expand Up @@ -721,7 +725,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}
}

static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg, int flush)
{
int err, savederrno;
int i, msg_recv, transport;
Expand Down Expand Up @@ -825,7 +829,7 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct kne
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
_parse_recv_from_links(knet_h, sockfd, &msg[i], flush);
break;
}
}
Expand All @@ -836,7 +840,7 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct kne

void *_handle_recv_from_links_thread(void *data)
{
int i, nev;
int i, nev, flush;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
Expand All @@ -862,6 +866,8 @@ void *_handle_recv_from_links_thread(void *data)
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);

flush = get_thread_flush_queue(knet_h, KNET_THREAD_RX);

/*
* we use timeout to detect if thread is shutting down
*/
Expand All @@ -870,7 +876,11 @@ void *_handle_recv_from_links_thread(void *data)
}

for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
_handle_recv_from_links(knet_h, events[i].data.fd, msg, flush);
}

if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}
}

Expand Down
32 changes: 25 additions & 7 deletions libknet/threads_tx.c
Expand Up @@ -127,7 +127,7 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
return err;
}

static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync, int flush)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
Expand Down Expand Up @@ -158,13 +158,18 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
inbuf = knet_h->recv_from_sock_buf;

if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO) &&
(flush != KNET_THREAD_QUEUE_FLUSH)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}

if (flush) {
log_debug(knet_h, KNET_SUB_TX, "flushing the queue");
}

/*
* move this into a separate function to expand on
* extra switching rules
Expand Down Expand Up @@ -598,6 +603,12 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len
return -1;
}

if (!knet_h->enabled) {
savederrno = ECANCELED;
err = 1;
goto out;
}

if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
Expand All @@ -614,7 +625,7 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len

knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1, KNET_THREAD_QUEUE_FLUSHED);
savederrno = errno;

pthread_mutex_unlock(&knet_h->tx_mutex);
Expand All @@ -626,7 +637,7 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len
return err;
}

static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type, int flush)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
Expand Down Expand Up @@ -658,7 +669,7 @@ static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int
}
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0);
_parse_recv_from_sock(knet_h, inlen, channel, 0, flush);
}

if (docallback) {
Expand All @@ -675,7 +686,7 @@ void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type;
int i, nev, type, flush;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
Expand Down Expand Up @@ -706,6 +717,8 @@ void *_handle_send_to_links_thread(void *data)
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);

flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);

/*
* we use timeout to detect if thread is shutting down
*/
Expand Down Expand Up @@ -739,10 +752,15 @@ void *_handle_send_to_links_thread(void *data)
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type, flush);
pthread_mutex_unlock(&knet_h->tx_mutex);
}

pthread_rwlock_unlock(&knet_h->global_rwlock);

if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
}
}

set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
Expand Down

0 comments on commit b775899

Please sign in to comment.