Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/net: TX operations now support io_uring #1

Closed
wants to merge 11 commits into from
3 changes: 3 additions & 0 deletions prov/net/src/xnet.h
Expand Up @@ -155,6 +155,7 @@ struct xnet_cur_rx {

struct xnet_cur_tx {
size_t data_left;
bool io_uring_busy;
struct xnet_xfer_entry *entry;
};

Expand Down Expand Up @@ -253,6 +254,7 @@ struct xnet_io_uring
{
struct fid fid;
ofi_io_uring_t ring;
size_t credits;
};

/* Serialization is handled at the progress instance level, using the
Expand Down Expand Up @@ -300,6 +302,7 @@ struct xnet_progress {

struct xnet_io_uring tx_io_uring;
struct xnet_io_uring rx_io_uring;
struct ofi_sockapi sockapi;

struct ofi_dynpoll epoll_fd;

Expand Down
12 changes: 2 additions & 10 deletions prov/net/src/xnet_ep.c
Expand Up @@ -43,14 +43,6 @@ extern struct fi_ops_rma xnet_rma_ops;
extern struct fi_ops_msg xnet_msg_ops;
extern struct fi_ops_tagged xnet_tagged_ops;

static struct ofi_sockapi xnet_sockapi =
{
.send = ofi_sockapi_send_socket,
.sendv = ofi_sockapi_sendv_socket,
.recv = ofi_sockapi_recv_socket,
.recvv = ofi_sockapi_recvv_socket,
};

void xnet_hdr_none(struct xnet_base_hdr *hdr)
{
/* no-op */
Expand Down Expand Up @@ -629,8 +621,8 @@ int xnet_endpoint(struct fid_domain *domain, struct fi_info *info,
if (ret)
goto err1;

ofi_bsock_init(&ep->bsock, &xnet_sockapi, xnet_staging_sbuf_size,
xnet_prefetch_rbuf_size);
ofi_bsock_init(&ep->bsock, &xnet_ep2_progress(ep)->sockapi,
xnet_staging_sbuf_size, xnet_prefetch_rbuf_size);
if (info->handle) {
if (((fid_t) info->handle)->fclass == FI_CLASS_PEP) {
pep = container_of(info->handle, struct xnet_pep,
Expand Down
152 changes: 141 additions & 11 deletions prov/net/src/xnet_progress.c
Expand Up @@ -45,8 +45,27 @@

static ssize_t (*xnet_start_op[ofi_op_write + 1])(struct xnet_ep *ep);

static struct ofi_sockapi xnet_sockapi_iouring =
{
.send = ofi_sockapi_send_iouring,
.sendv = ofi_sockapi_sendv_iouring,
.recv = ofi_sockapi_recv_socket,
.recvv = ofi_sockapi_recvv_socket,
};

static struct ofi_sockapi xnet_sockapi_socket =
{
.send = ofi_sockapi_send_socket,
.sendv = ofi_sockapi_sendv_socket,
.recv = ofi_sockapi_recv_socket,
.recvv = ofi_sockapi_recvv_socket,
};

#ifdef HAVE_LIBURING
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll need these ifdef's in the net provider. They already occur in the io_uring code, so all of the calls should be abstracted. Except I guess the calls below are calling io_uring directly? I thought you defined abstractions for these?

Copy link
Owner Author

@sydidelot sydidelot Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you wanted the net provider to manage the io_uring directly (no abstraction).
I think it makes sense to initialize the io_uring in the net provider as it may request specific flags during initialization. I'm not sure we should defined abstractions for that in the common code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was comparing this to the ofi_epoll calls. We make those directly from the providers, but the abstraction hides the implementation, or if it's available. I thought I saw that you had abstractions -- something like ofi_uring_init() -- that the provider could just call.

Copy link
Owner Author

@sydidelot sydidelot Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think I initially misunderstood your statement:

We obviously need an OS abstraction for an io_uring. I would actually introduce a typedef for this, similar to epoll. E.g. struct io_uring ofi_io_uring_t. This abstraction would be mostly a wrapper, with more control given to the provider

By OS abstraction you mean an abstraction in src/iouring.c. My bad, will fix it.

static int xnet_init_io_uring(struct xnet_io_uring *io_uring, size_t nents)
static void xnet_progress_tx(struct xnet_ep *ep);
static void xnet_complete_tx(struct xnet_ep *ep, ssize_t ret);

static int xnet_init_io_uring(struct xnet_io_uring *io_uring, size_t entries)
{
struct io_uring_params params;
int ret;
Expand All @@ -67,12 +86,10 @@ static int xnet_init_io_uring(struct xnet_io_uring *io_uring, size_t nents)
assert(!io_uring_cq_ready(&io_uring->ring));
assert(io_uring_sq_space_left(&io_uring->ring) >= entries);

io_uring->fid.fclass = XNET_CLASS_IO_URING;
/* io_uring rounds up the number of entries to the next power of 2,
* so we could get more entries than initially
* so we could get more entries than initially requested.
*/
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd drop this comment. It's sufficient to note that the return value may be higher then requested. I dno't see where 'entries' is defined or used.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

entries = io_uring_sq_space_left(&io_uring->ring);

io_uring->fid.fclass = XNET_CLASS_IO_URING;
io_uring->credits = io_uring_sq_space_left(&io_uring->ring);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Last patch is modifying a previous patch to fix-up errors. Some of the changes should move into the earlier patch.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. Thanks.

return 0;
}
Expand All @@ -91,10 +108,94 @@ static int xnet_io_uring_fd(struct xnet_io_uring *io_uring)
assert(xnet_io_uring);
return io_uring->ring.ring_fd;
}

static bool xnet_io_uring_needs_submit(struct xnet_io_uring *io_uring)
{
return io_uring_sq_ready(&io_uring->ring);
}

static void xnet_submit_io_uring(struct xnet_io_uring *io_uring)
{
if (xnet_io_uring && io_uring_sq_ready(&io_uring->ring))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check that we're using io_uring is unexpected.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you suggest to help the branch predictor? Something like:

if (!xnet_io_uring) /* likely */
    return;

if (io_uring_sq_ready(&io_uring->ring))
    io_uring_submit(&io_uring->ring);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting that we should be able to assert(xnet_io_uring). It looks odd to have the code call xnet_submit_io_uring() when io_uring isn't even enabled. That is, how did we even get here?

io_uring_submit(&io_uring->ring);
}

static bool xnet_get_io_uring_credit(struct xnet_io_uring *io_uring)
{
if (!xnet_io_uring)
return true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd. I'd either expect to fail here or assert that we're using io_uring.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If xnet_io_uring is false, io_uring support is disabled and the number of credits is initially set to 0.
The function always returns true to bypass the credit system when io_uring is disabled.


if (io_uring->credits > 0) {
io_uring->credits--;
return true;
}
return false;
}

static void xnet_progress_cqe(struct xnet_io_uring *io_uring,
struct io_uring_cqe *cqe)
{
struct xnet_xfer_entry *tx_entry;
struct ofi_bsock *bsock;
struct xnet_ep *ep;

assert(xnet_io_uring);
bsock = (struct ofi_bsock *) cqe->user_data;
assert(bsock);

if (&io_uring->ring == bsock->sockapi->tx_io_uring)
{
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe reverse the check and return to outdent the reset of the function.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. This function will need a revisit when recv operations will go into io_uring.

ep = container_of(bsock, struct xnet_ep, bsock);
tx_entry = ep->cur_tx.entry;
assert(tx_entry);
assert(ep->cur_tx.io_uring_busy);

ep->cur_tx.io_uring_busy = false;
if (cqe->res < 0) {
if (!OFI_SOCK_TRY_SND_RCV_AGAIN(-cqe->res))
xnet_complete_tx(ep, cqe->res);
} else {
ep->cur_tx.data_left -= cqe->res;
if (ep->cur_tx.data_left)
ofi_consume_iov(tx_entry->iov, &tx_entry->iov_cnt,
cqe->res);
else
xnet_complete_tx(ep, FI_SUCCESS);
}
xnet_progress_tx(ep);
}
}

static void xnet_progress_io_uring(struct xnet_io_uring *io_uring)
{
struct io_uring_cqe *cqes[XNET_MAX_EVENTS];
int nready;
int i;

if (!xnet_io_uring)
return;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, looks odd to have this check


nready = io_uring_peek_batch_cqe(&io_uring->ring, cqes, XNET_MAX_EVENTS);
if (!nready)
return;

assert(nready <= XNET_MAX_EVENTS);
for (i = 0; i < nready; i++) {
xnet_progress_cqe(io_uring, cqes[i]);
io_uring->credits++;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a potential problem here with this ordering? We have a completion for a send, but the send may not be done. That falls through to xnet_progress_tx(), which may need to post another transfer, which in turns need another credit. Could we temporarily block the send because we haven't updated the credits?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that! You are right and the credit must be released before executing xnet_progress_cqe(). I will update the code.

}

io_uring_cq_advance(&io_uring->ring, nready);
}
#else
#define xnet_init_io_uring(io_uring, entries) -FI_ENOSYS
#define xnet_destroy_io_uring(io_uring) do {} while(0)
#define xnet_io_uring_fd(io_uring) INVALID_SOCKET
#define xnet_submit_io_uring(io_uring)
#define xnet_io_uring_needs_submit(io_uring) false
#define xnet_get_io_uring_credit(io_uring) true
#define xnet_put_io_uring_credit(io_uring) do {} while(0)
#define xnet_progress_io_uring(io_uring) do {} while(0)
#endif
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you move the io_uring abstraction that you had here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only abstraction I had was for send/recv syscalls (in iouring.c).
I just realized that you wanted to abstract all the calls of io_uring in src/iouring.c. My bad, I will fix that.


static void xnet_update_pollflag(struct xnet_ep *ep, short pollflag, bool set)
Expand Down Expand Up @@ -128,21 +229,28 @@ static ssize_t xnet_send_msg(struct xnet_ep *ep)

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
assert(ep->cur_tx.entry);
if (!xnet_get_io_uring_credit(&xnet_ep2_progress(ep)->tx_io_uring))
return -FI_EAGAIN;

tx_entry = ep->cur_tx.entry;
ret = ofi_bsock_sendv(&ep->bsock, tx_entry->iov, tx_entry->iov_cnt,
&len);
if (ret < 0 && ret != -FI_EINPROGRESS)
if (ret >= 0)
len = ret;
else if (ret == -FI_EIOURING_PREP) {
assert(!ep->cur_tx.io_uring_busy);
ep->cur_tx.io_uring_busy = true;
return ret;

if (ret == -FI_EINPROGRESS) {
}
else if (ret == -FI_EINPROGRESS) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so you need to distinguish between 2 async 'error' codes based on whether using io_uring or async sockets.

/* If a transfer generated multiple async sends, we only
* need to track the last async index to know when the entire
* transfer has completed.
*/
tx_entry->async_index = ep->bsock.async_index;
tx_entry->ctrl_flags |= XNET_ASYNC;
} else {
len = ret;
return ret;
}

ep->cur_tx.data_left -= len;
Expand Down Expand Up @@ -237,12 +345,17 @@ static void xnet_progress_tx(struct xnet_ep *ep)
ssize_t ret;

assert(xnet_progress_locked(xnet_ep2_progress(ep)));
while (ep->cur_tx.entry) {
while (ep->cur_tx.entry && !ep->cur_tx.io_uring_busy) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to look at how io_uring_busy is used. I mostly follow what you're doing here. But I want to understand where the flow differs between using io_uring and async sends and why we have those differences.

ret = xnet_send_msg(ep);
if (OFI_SOCK_TRY_SND_RCV_AGAIN(-ret)) {
xnet_update_pollflag(ep, POLLOUT, true);
return;
}
if (ret == -FI_EIOURING_PREP) {
assert(ep->cur_tx.io_uring_busy);
xnet_update_pollflag(ep, POLLOUT, false);
return;
}

xnet_complete_tx(ep, ret);
}
Expand All @@ -251,7 +364,8 @@ static void xnet_progress_tx(struct xnet_ep *ep)
* have other data to send, we need to try flushing any buffered data.
*/
(void) ofi_bsock_flush(&ep->bsock);
xnet_update_pollflag(ep, POLLOUT, ofi_bsock_tosend(&ep->bsock));
xnet_update_pollflag(ep, POLLOUT, ofi_bsock_tosend(&ep->bsock) ||
!ep->cur_tx.io_uring_busy);
}

static int xnet_queue_ack(struct xnet_xfer_entry *rx_entry)
Expand Down Expand Up @@ -801,6 +915,7 @@ void xnet_tx_queue_insert(struct xnet_ep *ep,
OFI_DBG_SET(tx_entry->hdr.base_hdr.id, ep->tx_id++);
ep->hdr_bswap(&tx_entry->hdr.base_hdr);
xnet_progress_tx(ep);
xnet_progress_io_uring(&xnet_ep2_progress(ep)->tx_io_uring);
} else if (tx_entry->ctrl_flags & XNET_INTERNAL_XFER) {
slist_insert_tail(&tx_entry->entry, &ep->priority_queue);
} else {
Expand Down Expand Up @@ -867,6 +982,9 @@ xnet_handle_events(struct xnet_progress *progress,
case FI_CLASS_CONNREQ:
xnet_run_conn(events[i].data.ptr, pin, pout, perr);
break;
case XNET_CLASS_IO_URING:
xnet_progress_io_uring(events[i].data.ptr);
break;
default:
assert(fid->fclass == XNET_CLASS_PROGRESS);
if (clear_signal)
Expand All @@ -876,6 +994,8 @@ xnet_handle_events(struct xnet_progress *progress,
}

xnet_handle_event_list(progress);
xnet_submit_io_uring(&progress->tx_io_uring);
xnet_submit_io_uring(&progress->rx_io_uring);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these are acting somewhat like a deferred doorbell notification. I would move check for using io_uring here, and wrap both calls.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

}

void xnet_progress_unexp(struct xnet_progress *progress,
Expand Down Expand Up @@ -958,6 +1078,10 @@ int xnet_progress_wait(struct xnet_progress *progress, int timeout)
{
struct ofi_epollfds_event event;

/* We cannot enter blocking if io_uring has entries
* that need submission. */
assert(xnet_io_uring_needs_submit(&progress->tx_io_uring));
assert(xnet_io_uring_needs_submit(&progress->rx_io_uring));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert and comment look like opposites

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aargh, the assert is wrong. Thanks for catching that.

return ofi_dynpoll_wait(&progress->epoll_fd, &event, 1, timeout);
}

Expand Down Expand Up @@ -1162,6 +1286,12 @@ int xnet_init_progress(struct xnet_progress *progress, struct fi_info *info)
POLLIN, &progress->rx_io_uring.fid);
if (ret)
goto err8;

progress->sockapi = xnet_sockapi_iouring;
progress->sockapi.tx_io_uring = &progress->tx_io_uring.ring;
progress->sockapi.rx_io_uring = &progress->rx_io_uring.ring;
} else {
progress->sockapi = xnet_sockapi_socket;
}

return 0;
Expand Down