Skip to content

Commit

Permalink
rsockets: Use service thread to accept connections
Browse files Browse the repository at this point in the history
Rsockets currently uses the application thread to drive the
progress state of new connections.  However, some applications
expect that new connections can be established without the
application taking specific actions.  For example, some
apps do this sequence:

s = socket();
c = socket();
fcntl(s, O_NONBLOCK);
listen(s);
connect(c);
a = accept(s);

In rsockets, this hangs at connect because nothing processes the
incoming connection request.  This problem was reported when
integrating rsockets in the Java Development Kit.

To better support the socket semantic, move the processing of
connection requests to a service thread.  We setup a
socketpair on any listening socket.  After a connection has been
established, the listening socket is notified via the socketpair.
With this change, a single thread app can connect to itself and
transfer data.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
  • Loading branch information
shefty committed May 13, 2019
1 parent 0a6f2da commit b60c79d
Showing 1 changed file with 148 additions and 42 deletions.
190 changes: 148 additions & 42 deletions librdmacm/rsocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ enum {
RS_SVC_REM_DGRAM,
RS_SVC_ADD_KEEPALIVE,
RS_SVC_REM_KEEPALIVE,
RS_SVC_MOD_KEEPALIVE
RS_SVC_MOD_KEEPALIVE,
RS_SVC_ADD_CM,
RS_SVC_REM_CM,
};

struct rs_svc_msg {
Expand Down Expand Up @@ -109,6 +111,11 @@ static struct rs_svc tcp_svc = {
.context_size = sizeof(*tcp_svc_timeouts),
.run = tcp_svc_run
};
static void *cm_svc_run(void *arg);
static struct rs_svc listen_svc = {
.context_size = sizeof(struct pollfd),
.run = cm_svc_run
};

static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
Expand Down Expand Up @@ -252,6 +259,7 @@ enum rs_state {
#define RS_OPT_MSG_SEND (1 << 1)
#define RS_OPT_UDP_SVC (1 << 2)
#define RS_OPT_KEEPALIVE (1 << 3)
#define RS_OPT_CM_SVC (1 << 4)

union socket_addr {
struct sockaddr sa;
Expand Down Expand Up @@ -311,6 +319,7 @@ struct rsocket {
struct rdma_cm_id *cm_id;
uint64_t tcp_opts;
unsigned int keepalive_time;
int accept_queue[2];

unsigned int ctrl_seqno;
unsigned int ctrl_max_seqno;
Expand Down Expand Up @@ -645,7 +654,9 @@ static int rs_set_nonblocking(struct rsocket *rs, int arg)
if (rs->cm_id->recv_cq_channel)
ret = fcntl(rs->cm_id->recv_cq_channel->fd, F_SETFL, arg);

if (!ret && rs->state < rs_connected)
if (rs->state == rs_listening)
ret = fcntl(rs->accept_queue[0], F_SETFL, arg);
else if (!ret && rs->state < rs_connected)
ret = fcntl(rs->cm_id->channel->fd, F_SETFL, arg);
} else {
ret = fcntl(rs->epfd, F_SETFL, arg);
Expand Down Expand Up @@ -1027,6 +1038,11 @@ static void rs_free(struct rsocket *rs)
rdma_destroy_id(rs->cm_id);
}

if (rs->accept_queue[0] > 0 || rs->accept_queue[1] > 0) {
close(rs->accept_queue[0]);
close(rs->accept_queue[1]);
}

fastlock_destroy(&rs->map_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
Expand Down Expand Up @@ -1204,59 +1220,61 @@ int rlisten(int socket, int backlog)
if (!rs)
return ERR(EBADF);

if (rs->state != rs_listening) {
ret = rdma_listen(rs->cm_id, backlog);
if (!ret)
rs->state = rs_listening;
} else {
ret = 0;
if (rs->state == rs_listening)
return 0;

ret = rdma_listen(rs->cm_id, backlog);
if (ret)
return ret;

ret = socketpair(AF_UNIX, SOCK_STREAM, 0, rs->accept_queue);
if (ret)
return ret;

if (rs->fd_flags & O_NONBLOCK) {
ret = set_fd_nonblock(rs->accept_queue[0], true);
if (ret)
return ret;
}
return ret;

ret = set_fd_nonblock(rs->cm_id->channel->fd, true);
if (ret)
return ret;

ret = rs_notify_svc(&listen_svc, rs, RS_SVC_ADD_CM);
if (ret)
return ret;

rs->state = rs_listening;
return 0;
}

/*
* Nonblocking is usually not inherited between sockets, but we need to
* inherit it here to establish the connection only. This is needed to
* prevent rdma_accept from blocking until the remote side finishes
* establishing the connection. If we were to allow rdma_accept to block,
* then a single thread cannot establish a connection with itself, or
* two threads which try to connect to each other can deadlock trying to
* form a connection.
*
* Data transfers on the new socket remain blocking unless the user
* specifies otherwise through rfcntl.
*/
int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
/* Accepting new connection requests is currently a blocking operation */
static void rs_accept(struct rsocket *rs)
{
struct rsocket *rs, *new_rs;
struct rsocket *new_rs;
struct rdma_conn_param param;
struct rs_conn_data *creq, cresp;
struct rdma_cm_id *cm_id;
int ret;

rs = idm_lookup(&idm, socket);
if (!rs)
return ERR(EBADF);
ret = rdma_get_request(rs->cm_id, &cm_id);
if (ret)
return;

new_rs = rs_alloc(rs, rs->type);
if (!new_rs)
return ERR(ENOMEM);

ret = rdma_get_request(rs->cm_id, &new_rs->cm_id);
if (ret)
goto err;
new_rs->cm_id = cm_id;

ret = rs_insert(new_rs, new_rs->cm_id->channel->fd);
if (ret < 0)
goto err;

creq = (struct rs_conn_data *)
(new_rs->cm_id->event->param.conn.private_data + rs_conn_data_offset(rs));
if (creq->version != 1) {
ret = ERR(ENOTSUP);
if (creq->version != 1)
goto err;
}

if (rs->fd_flags & O_NONBLOCK)
set_fd_nonblock(new_rs->cm_id->channel->fd, true);

ret = rs_create_ep(new_rs);
if (ret)
Expand All @@ -1275,13 +1293,34 @@ int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
else
goto err;

write_all(rs->accept_queue[1], &new_rs, sizeof(new_rs));
return;

err:
rdma_reject(cm_id, NULL, 0);
if (new_rs)
rs_free(new_rs);
}

int raccept(int socket, struct sockaddr *addr, socklen_t *addrlen)
{
struct rsocket *rs, *new_rs;
int ret;

rs = idm_lookup(&idm, socket);
if (!rs)
return ERR(EBADF);

if (rs->state != rs_listening)
return ERR(EBADF);

ret = read(rs->accept_queue[0], &new_rs, sizeof(new_rs));
if (ret != sizeof(new_rs))
return ret;

if (addr && addrlen)
rgetpeername(new_rs->index, addr, addrlen);
return new_rs->index;

err:
rs_free(new_rs);
return ret;
}

static int rs_do_connect(struct rsocket *rs)
Expand Down Expand Up @@ -2991,7 +3030,7 @@ static int rs_poll_rs(struct rsocket *rs, int events,
}

if (rs->state == rs_listening) {
fds.fd = rs->cm_id->channel->fd;
fds.fd = rs->accept_queue[0];
fds.events = events;
fds.revents = 0;
poll(&fds, 1, 0);
Expand Down Expand Up @@ -3315,8 +3354,10 @@ int rclose(int socket)
if (rs->type == SOCK_STREAM) {
if (rs->state & rs_connected)
rshutdown(socket, SHUT_RDWR);
else if (rs->opts & RS_OPT_KEEPALIVE)
if (rs->opts & RS_OPT_KEEPALIVE)
rs_notify_svc(&tcp_svc, rs, RS_SVC_REM_KEEPALIVE);
if (rs->opts & RS_OPT_CM_SVC)
rs_notify_svc(&listen_svc, rs, RS_SVC_REM_CM);
} else {
ds_shutdown(rs);
}
Expand Down Expand Up @@ -4360,3 +4401,68 @@ static void *tcp_svc_run(void *arg)

return NULL;
}

static void cm_svc_process_sock(struct rs_svc *svc)
{
struct rs_svc_msg msg;
struct pollfd *fds;

read_all(svc->sock[1], &msg, sizeof(msg));
switch (msg.cmd) {
case RS_SVC_ADD_CM:
msg.status = rs_svc_add_rs(svc, msg.rs);
if (!msg.status) {
msg.rs->opts |= RS_OPT_CM_SVC;
fds = svc->contexts;
fds[svc->cnt].fd = msg.rs->cm_id->channel->fd;
fds[svc->cnt].events = POLLIN;
fds[svc->cnt].revents = 0;
}
break;
case RS_SVC_REM_CM:
msg.status = rs_svc_rm_rs(svc, msg.rs);
if (!msg.status)
msg.rs->opts &= ~RS_OPT_CM_SVC;
break;
case RS_SVC_NOOP:
msg.status = 0;
break;
default:
break;
}
write_all(svc->sock[1], &msg, sizeof(msg));
}

static void *cm_svc_run(void *arg)
{
struct rs_svc *svc = arg;
struct pollfd *fds;
struct rs_svc_msg msg;
int i, ret;

ret = rs_svc_grow_sets(svc, 4);
if (ret) {
msg.status = ret;
write_all(svc->sock[1], &msg, sizeof(msg));
return (void *) (uintptr_t) ret;
}

fds = svc->contexts;
fds[0].fd = svc->sock[1];
fds[0].events = POLLIN;
do {
for (i = 0; i <= svc->cnt; i++)
fds[i].revents = 0;

poll(fds, svc->cnt + 1, -1);
if (fds[0].revents)
cm_svc_process_sock(svc);

for (i = 1; i <= svc->cnt; i++) {
if (fds[i].revents)
rs_accept(svc->rss[i]);
}
} while (svc->cnt >= 1);

return NULL;
}

0 comments on commit b60c79d

Please sign in to comment.