Skip to content

Commit

Permalink
rsockets: Wake-up all waiting threads on poll events
Browse files Browse the repository at this point in the history
In order to have rpoll() block the calling thread until the desired
event occurs, we map rpoll() to poll() on the CQ fd (or the rdma cm
fd if the rsocket is not yet connected).  However, poll() is waiting
on reading an event from the CQ, and not directly on the state of the
rsocket.  This can result in rpoll() behaving differently than poll()
in multi-thread situations.

For example, have two threads call rpoll(), one waiting for POLLIN, the
other POLLOUT.  When a new completion is written to the CQ, an event
will be written to the fd.  It's possible for one thread (say the
POLLOUT one) to wake up, read the CQ event, and process the completion.
The completion could report that data is now available to be read from
the rsocket, and that POLLIN should be signaled.  When the POLLIN
thread wakes up, it finds the CQ fd empty, so continues blocking on
poll in the kernel.  In this case, the thread never exits the kernel.

In the above situation, the application will hang.

Threads blocking in rpoll() should return based on changes to the state
of the rsocket(s) that they are monitoring.  And the state of the
rsocket may be modified by another thread.  To handle this situation,
when the state of an rsocket _may_ have changed, we wake up all threads
blocked in poll(), so they can re-check the state of their rsocket(s).

Note that it's easier to have all threads re-check the rsocket states
than perform more complex state tracking because of the difficulty in
trying to track which rsockets have been passed into multiple rpoll()
calls.

Rpoll() is modified as follows.  When a thread has processed any event,
it halts future calls into poll().  It then writes to an eventfd to
signal all threads blocked in poll() to wake-up.  Only after all threads
return from the kernel are threads allowed to resume calling poll().

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
  • Loading branch information
shefty committed May 14, 2019
1 parent 5706e45 commit 23985e2
Showing 1 changed file with 143 additions and 17 deletions.
160 changes: 143 additions & 17 deletions librdmacm/rsocket.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2008-2014 Intel Corporation. All rights reserved.
* Copyright (c) 2008-2019 Intel Corporation. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -46,6 +46,7 @@
#include <string.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <search.h>
#include <time.h>
#include <byteswap.h>
Expand Down Expand Up @@ -118,6 +119,10 @@ static struct rs_svc listen_svc = {
.run = cm_svc_run
};

static uint32_t pollcnt;
static bool suspendpoll;
static int pollsignal = -1;

static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
Expand Down Expand Up @@ -479,8 +484,8 @@ static int rs_notify_svc(struct rs_svc *svc, struct rsocket *rs, int cmd)
msg.cmd = cmd;
msg.status = EINVAL;
msg.rs = rs;
write_all(svc->sock[0], &msg, sizeof msg);
read_all(svc->sock[0], &msg, sizeof msg);
write_all(svc->sock[0], &msg, sizeof(msg));
read_all(svc->sock[0], &msg, sizeof(msg));
ret = rdma_seterrno(msg.status);
if (svc->cnt)
goto unlock;
Expand Down Expand Up @@ -2983,19 +2988,135 @@ ssize_t rwritev(int socket, const struct iovec *iov, int iovcnt)
return rsendv(socket, iov, iovcnt, 0);
}

/* When mapping rpoll to poll, the events reported on the RDMA
* fd are independent from the events rpoll may be looking for.
* To avoid threads hanging in poll, whenever any event occurs,
* we need to wakeup all threads in poll, so that they can check
* if there has been a change on the rsockets they are monitoring.
* To support this, we 'gate' threads entering and leaving rpoll.
*/
static int rs_pollinit(void)
{
int ret = 0;

pthread_mutex_lock(&mut);
if (pollsignal >= 0)
goto unlock;

pollsignal = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
if (pollsignal < 0)
ret = -errno;

unlock:
pthread_mutex_unlock(&mut);
return ret;
}

/* When an event occurs, we must wait until the state of all rsockets
* has settled. Then we need to re-check the rsocket state prior to
* blocking on poll().
*/
static int rs_poll_enter(void)
{
pthread_mutex_lock(&mut);
if (suspendpoll) {
pthread_mutex_unlock(&mut);
pthread_yield();
return -EBUSY;
}

pollcnt++;
pthread_mutex_unlock(&mut);
return 0;
}

static void rs_poll_exit(void)
{
uint64_t c;
int save_errno;
ssize_t ret;

pthread_mutex_lock(&mut);
if (!--pollcnt) {
/* Keep errno value from poll() call. We try to clear
* a single signal. But there's no guarantee that we'll
* find one. Additional signals indicate that a change
* occurred on an rsocket, which requires all threads to
* re-check before blocking on poll.
*/
save_errno = errno;
ret = read(pollsignal, &c, sizeof(c));
if (ret != sizeof(c))
errno = save_errno;
suspendpoll = 0;
}
pthread_mutex_unlock(&mut);
}

/* When an event occurs, it's possible for a single thread blocked in
* poll to return from the kernel, read the event, and update the state
* of an rsocket. However, that can leave threads blocked in the kernel
* on poll (trying to read the CQ fd), which have had their rsocket
* state set. To avoid those threads remaining blocked in the kernel,
* we must wake them up and ensure that they all return to user space,
* in order to re-check the state of their rsockets.
*
* Because poll is racy wrt updating the rsocket states, we need to
* signal state checks whenever a thread updates the state of a
* monitored rsocket, independent of whether that thread actually
* reads an event from an fd. In other words, we must wake up all
* polling threads whenever poll() indicates that there is a new
* completion to process, and when rpoll() will return a successful
* value after having blocked.
*/
static void rs_poll_stop(void)
{
uint64_t c;
int save_errno;
ssize_t ret;

/* See comment in rs_poll_exit */
save_errno = errno;

pthread_mutex_lock(&mut);
if (!--pollcnt) {
ret = read(pollsignal, &c, sizeof(c));
suspendpoll = 0;
} else if (!suspendpoll) {
suspendpoll = 1;
c = 1;
ret = write(pollsignal, &c, sizeof(c));
} else {
ret = sizeof(c);
}
pthread_mutex_unlock(&mut);

if (ret != sizeof(c))
errno = save_errno;
}

/* We always add the pollsignal read fd to the poll fd set, so
* that we can signal any blocked threads.
*/
static struct pollfd *rs_fds_alloc(nfds_t nfds)
{
static __thread struct pollfd *rfds;
static __thread nfds_t rnfds;

if (nfds > rnfds) {
if (nfds + 1 > rnfds) {
if (rfds)
free(rfds);
else if (rs_pollinit())
return NULL;

rfds = malloc(sizeof(*rfds) * nfds);
rnfds = rfds ? nfds : 0;
rfds = malloc(sizeof(*rfds) * nfds + 1);
rnfds = rfds ? nfds + 1 : 0;
}

if (rfds) {
rfds[nfds].fd = pollsignal;
rfds[nfds].events = POLLIN;
}
return rfds;
}

Expand Down Expand Up @@ -3120,17 +3241,16 @@ static int rs_poll_events(struct pollfd *rfds, struct pollfd *fds, nfds_t nfds)
int i, cnt = 0;

for (i = 0; i < nfds; i++) {
if (!rfds[i].revents)
continue;

rs = idm_lookup(&idm, fds[i].fd);
if (rs) {
fastlock_acquire(&rs->cq_wait_lock);
if (rs->type == SOCK_STREAM)
rs_get_cq_event(rs);
else
ds_get_cq_event(rs);
fastlock_release(&rs->cq_wait_lock);
if (rfds[i].revents) {
fastlock_acquire(&rs->cq_wait_lock);
if (rs->type == SOCK_STREAM)
rs_get_cq_event(rs);
else
ds_get_cq_event(rs);
fastlock_release(&rs->cq_wait_lock);
}
fds[i].revents = rs_poll_rs(rs, fds[i].events, 1, rs_poll_all);
} else {
fds[i].revents = rfds[i].revents;
Expand Down Expand Up @@ -3174,17 +3294,23 @@ int rpoll(struct pollfd *fds, nfds_t nfds, int timeout)
if (ret)
break;

if (rs_poll_enter())
continue;

if (timeout >= 0) {
timeout -= (int) ((rs_time_us() - start_time) / 1000);
if (timeout <= 0)
return 0;
}

ret = poll(rfds, nfds, timeout);
if (ret <= 0)
ret = poll(rfds, nfds + 1, timeout);
if (ret <= 0) {
rs_poll_exit();
break;
}

ret = rs_poll_events(rfds, fds, nfds);
rs_poll_stop();
} while (!ret);

return ret;
Expand Down

0 comments on commit 23985e2

Please sign in to comment.