Skip to content

Commit

Permalink
[POC] DO NOT MERGE
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Jun 27, 2019
1 parent 2be8f2f commit 69ad818
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 44 deletions.
22 changes: 16 additions & 6 deletions libknet/handle.c
Expand Up @@ -1173,24 +1173,34 @@ int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
return -1;
}

knet_h->enabled = enabled;

if (enabled) {
knet_h->enabled = enabled;
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
} else {
if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
}
if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
}
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
}

pthread_rwlock_unlock(&knet_h->global_rwlock);

usleep(knet_h->threads_timer_res * 2);
wait_all_threads_flush_queue(knet_h);
if (!enabled) {
usleep(knet_h->threads_timer_res * 4);
wait_all_threads_flush_queue(knet_h);
savederrno = get_global_wrlock(knet_h);
if (savederrno) {
log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}
knet_h->enabled = enabled;
log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
pthread_rwlock_unlock(&knet_h->global_rwlock);
}

errno = 0;
return 0;
Expand Down
24 changes: 9 additions & 15 deletions libknet/threads_rx.c
Expand Up @@ -227,7 +227,7 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t
return 1;
}

static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg, int flush)
static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
{
int err = 0, savederrno = 0;
ssize_t outlen;
Expand Down Expand Up @@ -418,13 +418,9 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}

if (inbuf->kh_type == KNET_HEADER_TYPE_DATA) {
if ((knet_h->enabled != 1) && (flush == KNET_THREAD_QUEUE_FLUSHED)) /* data forward is disabled */
if (knet_h->enabled != 1) /* data forward is disabled */
break;

if (flush) {
log_debug(knet_h, KNET_SUB_RX, "flushing the queue");
}

/* Only update the crypto overhead for data packets. Mainly to be
consistent with TX */
knet_h->stats.rx_crypt_time_ave =
Expand Down Expand Up @@ -725,7 +721,7 @@ static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struc
}
}

static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg, int flush)
static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
{
int err, savederrno;
int i, msg_recv, transport;
Expand Down Expand Up @@ -829,7 +825,7 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct kne
continue;
}
}
_parse_recv_from_links(knet_h, sockfd, &msg[i], flush);
_parse_recv_from_links(knet_h, sockfd, &msg[i]);
break;
}
}
Expand All @@ -840,7 +836,7 @@ static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct kne

void *_handle_recv_from_links_thread(void *data)
{
int i, nev, flush;
int i, nev;
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
struct sockaddr_storage address[PCKT_RX_BUFS];
Expand All @@ -866,7 +862,9 @@ void *_handle_recv_from_links_thread(void *data)
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, knet_h->threads_timer_res / 1000);

flush = get_thread_flush_queue(knet_h, KNET_THREAD_RX);
if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
}

/*
* we use timeout to detect if thread is shutting down
Expand All @@ -876,11 +874,7 @@ void *_handle_recv_from_links_thread(void *data)
}

for (i = 0; i < nev; i++) {
_handle_recv_from_links(knet_h, events[i].data.fd, msg, flush);
}

if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
_handle_recv_from_links(knet_h, events[i].data.fd, msg);
}
}

Expand Down
33 changes: 10 additions & 23 deletions libknet/threads_tx.c
Expand Up @@ -127,7 +127,7 @@ static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host,
return err;
}

static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync, int flush)
static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
{
size_t outlen, frag_len;
struct knet_host *dst_host;
Expand Down Expand Up @@ -158,18 +158,13 @@ static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t chan
inbuf = knet_h->recv_from_sock_buf;

if ((knet_h->enabled != 1) &&
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO) &&
(flush != KNET_THREAD_QUEUE_FLUSH)) { /* data forward is disabled */
(inbuf->kh_type != KNET_HEADER_TYPE_HOST_INFO)) { /* data forward is disabled */
log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
savederrno = ECANCELED;
err = -1;
goto out_unlock;
}

if (flush) {
log_debug(knet_h, KNET_SUB_TX, "flushing the queue");
}

/*
* move this into a separate function to expand on
* extra switching rules
Expand Down Expand Up @@ -603,12 +598,6 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len
return -1;
}

if (!knet_h->enabled) {
savederrno = ECANCELED;
err = 1;
goto out;
}

if (!knet_h->sockfd[channel].in_use) {
savederrno = EINVAL;
err = -1;
Expand All @@ -625,7 +614,7 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len

knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1, KNET_THREAD_QUEUE_FLUSHED);
err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
savederrno = errno;

pthread_mutex_unlock(&knet_h->tx_mutex);
Expand All @@ -637,7 +626,7 @@ int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len
return err;
}

static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type, int flush)
static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
{
ssize_t inlen = 0;
int savederrno = 0, docallback = 0;
Expand Down Expand Up @@ -669,7 +658,7 @@ static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int
}
} else {
knet_h->recv_from_sock_buf->kh_type = type;
_parse_recv_from_sock(knet_h, inlen, channel, 0, flush);
_parse_recv_from_sock(knet_h, inlen, channel, 0);
}

if (docallback) {
Expand All @@ -686,7 +675,7 @@ void *_handle_send_to_links_thread(void *data)
{
knet_handle_t knet_h = (knet_handle_t) data;
struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
int i, nev, type, flush;
int i, nev, type;
int8_t channel;
struct iovec iov_in;
struct msghdr msg;
Expand Down Expand Up @@ -717,7 +706,9 @@ void *_handle_send_to_links_thread(void *data)
while (!shutdown_in_progress(knet_h)) {
nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, knet_h->threads_timer_res / 1000);

flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
if (get_thread_flush_queue(knet_h, KNET_THREAD_TX) == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
}

/*
* we use timeout to detect if thread is shutting down
Expand Down Expand Up @@ -752,15 +743,11 @@ void *_handle_send_to_links_thread(void *data)
log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
continue;
}
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type, flush);
_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
pthread_mutex_unlock(&knet_h->tx_mutex);
}

pthread_rwlock_unlock(&knet_h->global_rwlock);

if (flush == KNET_THREAD_QUEUE_FLUSH) {
set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
}
}

set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
Expand Down

0 comments on commit 69ad818

Please sign in to comment.