Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prov/net: TX operations now support io_uring #1

Closed
wants to merge 11 commits into from
Closed
4 changes: 4 additions & 0 deletions Makefile.am
Expand Up @@ -112,6 +112,10 @@ common_srcs += include/linux/osd.h
common_srcs += include/unix/osd.h
endif

if HAVE_LIBURING
common_srcs += src/iouring.c
endif

common_hook_srcs = \
prov/hook/src/hook.c \
prov/hook/src/hook_av.c \
Expand Down
32 changes: 32 additions & 0 deletions configure.ac
Expand Up @@ -3,6 +3,7 @@ dnl Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
dnl Copyright (c) 2019-2021 Intel, Inc. All rights reserved.
dnl Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates. All rights reserved.
dnl (C) Copyright 2020 Hewlett Packard Enterprise Development LP
dnl Copyright (c) 2022 DataDirect Networks, Inc. All rights reserved.
dnl
dnl Process this file with autoconf to produce a configure script.

Expand Down Expand Up @@ -522,6 +523,37 @@ AC_DEFINE_UNQUOTED(HAVE_CLOCK_GETTIME, [$have_clock_gettime],
[Define to 1 if clock_gettime is available.])
AM_CONDITIONAL(HAVE_CLOCK_GETTIME, [test $have_clock_gettime -eq 1])

dnl Check for io_uring runtime libraries
AC_ARG_WITH([uring],
[AS_HELP_STRING([--with-uring@<:@=DIR@:>@],
[Enable uring support @<:@default=yes@:>@.
Optional=<Path to where liburing is installed.>])])

have_liburing=0
AS_IF([test x"$with_uring" != x"no"],
[FI_CHECK_PACKAGE([uring],
[liburing.h],
[uring],
[io_uring_queue_init],
[-luring],
[$with_uring],
[],
[have_liburing=1
AC_DEFINE_UNQUOTED([HAVE_LIBURING], [1], [io_uring support])]
[],
[])],
[])

AS_IF([test x"$with_uring" != x"no" && test -n "$with_uring" && test $have_liburing -eq 0],
[AC_MSG_ERROR([io_uring support requested but liburing not available.])],
[])
AM_CONDITIONAL([HAVE_LIBURING], test "$have_liburing" -eq 1)

AS_IF([test x"$with_uring" != x"yes" && test x"$with_uring" != x"no"],
[CPPFLAGS="$CPPFLAGS $uring_CPPFLAGS"
LDFLAGS="$LDFLAGS $uring_LDFLAGS"])
LIBS="$LIBS $uring_LIBS"

dnl Check for CUDA runtime libraries
AC_ARG_WITH([cuda],
[AS_HELP_STRING([--with-cuda=DIR],
Expand Down
142 changes: 141 additions & 1 deletion include/ofi_net.h
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2013-2018 Intel Corporation. All rights reserved.
* Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2022 DataDirect Networks, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -44,6 +45,10 @@
#include <netinet/in.h>
#include <ifaddrs.h>

#ifdef HAVE_LIBURING
#include <liburing.h>
#endif

#include <ofi_osd.h>
#include <ofi_list.h>

Expand Down Expand Up @@ -123,8 +128,140 @@ static inline int ofi_sendall_socket(SOCKET sock, const void *buf, size_t len)
return (size_t) sent != len;
}

static inline ssize_t
ofi_sendv_socket(SOCKET sock, const struct iovec *iov, size_t cnt, int flags)
{
struct msghdr msg;

msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = (struct iovec *) iov;
msg.msg_iovlen = cnt;

return ofi_sendmsg_tcp(sock, &msg, flags);
}

static inline ssize_t
ofi_recvv_socket(SOCKET sock, struct iovec *iov, size_t cnt, int flags)
{
struct msghdr msg;

msg.msg_control = NULL;
msg.msg_controllen = 0;
msg.msg_flags = 0;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = cnt;

return ofi_recvmsg_tcp(sock, &msg, flags);
}

ssize_t ofi_discard_socket(SOCKET sock, size_t len);

/*
* Socket API
*/
#ifdef HAVE_LIBURING
typedef struct io_uring ofi_io_uring_t;
#else
typedef int ofi_io_uring_t;
#endif

struct ofi_sockapi {
ofi_io_uring_t *tx_io_uring;
ofi_io_uring_t *rx_io_uring;

ssize_t (*send)(struct ofi_sockapi *sockapi, SOCKET sock, const void *buf,
size_t len, int flags, void *ctx);
ssize_t (*sendv)(struct ofi_sockapi *sockapi, SOCKET sock,
const struct iovec *iov, size_t cnt, int flags, void *ctx);
ssize_t (*recv)(struct ofi_sockapi *sockapi, SOCKET sock, void *buf,
size_t len, int flags, void *ctx);
ssize_t (*recvv)(struct ofi_sockapi *sockapi, SOCKET sock,
struct iovec *iov, size_t cnt, int flags, void *ctx);
};

static inline ssize_t
ofi_sockapi_send_socket(struct ofi_sockapi *sockapi, SOCKET sock, const void *buf,
size_t len, int flags, void *ctx)
{
OFI_UNUSED(sockapi);
OFI_UNUSED(ctx);
return ofi_send_socket(sock, buf, len, flags);
}

static inline ssize_t
ofi_sockapi_sendv_socket(struct ofi_sockapi *sockapi, SOCKET sock,
const struct iovec *iov, size_t cnt, int flags, void *ctx)
{
OFI_UNUSED(sockapi);
OFI_UNUSED(ctx);
return ofi_sendv_socket(sock, iov, cnt, flags);
}

static inline ssize_t
ofi_sockapi_recv_socket(struct ofi_sockapi *sockapi, SOCKET sock, void *buf,
size_t len, int flags, void *ctx)
{
OFI_UNUSED(sockapi);
OFI_UNUSED(ctx);
return ofi_recv_socket(sock, buf, len, flags);
}

static inline ssize_t
ofi_sockapi_recvv_socket(struct ofi_sockapi *sockapi, SOCKET sock,
struct iovec *iov, size_t cnt, int flags, void *ctx)
{
OFI_UNUSED(sockapi);
OFI_UNUSED(ctx);
return ofi_recvv_socket(sock, iov, cnt, flags);
}

#ifdef HAVE_LIBURING
ssize_t ofi_sockapi_send_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
const void *buf, size_t len, int flags, void *ctx);
ssize_t ofi_sockapi_sendv_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
const struct iovec *iov, size_t cnt, int flags,
void *ctx);
ssize_t ofi_sockapi_recv_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
void *buf, size_t len, int flags, void *ctx);
ssize_t ofi_sockapi_recvv_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
struct iovec *iov, size_t cnt, int flags,
void *ctx);
#else
static inline ssize_t
ofi_sockapi_send_iouring(struct ofi_sockapi *sockapi, SOCKET sock, const void *buf,
size_t len, int flags, void *ctx)
{
return -FI_ENOSYS;
}

static inline ssize_t
ofi_sockapi_sendv_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
const struct iovec *iov, size_t cnt, int flags, void *ctx)
{
return -FI_ENOSYS;
}

static inline ssize_t
ofi_sockapi_recv_iouring(struct ofi_sockapi *sockapi, SOCKET sock, void *buf,
size_t len, int flags, void *ctx)
{
return -FI_ENOSYS;
}

static inline ssize_t
ofi_sockapi_recvv_iouring(struct ofi_sockapi *sockapi, SOCKET sock,
struct iovec *iov, size_t cnt, int flags, void *ctx)
{
return -FI_ENOSYS;
}
#endif

/*
* Byte queue - streaming socket staging buffer
*/
Expand Down Expand Up @@ -219,6 +356,7 @@ size_t ofi_byteq_readv(struct ofi_byteq *byteq, struct iovec *iov,
*/
struct ofi_bsock {
SOCKET sock;
struct ofi_sockapi *sockapi;
struct ofi_byteq sq;
struct ofi_byteq rq;
size_t zerocopy_size;
Expand All @@ -227,9 +365,11 @@ struct ofi_bsock {
};

static inline void
ofi_bsock_init(struct ofi_bsock *bsock, ssize_t sbuf_size, ssize_t rbuf_size)
ofi_bsock_init(struct ofi_bsock *bsock, struct ofi_sockapi *sockapi,
ssize_t sbuf_size, ssize_t rbuf_size)
{
bsock->sock = INVALID_SOCKET;
bsock->sockapi = sockapi;
ofi_byteq_init(&bsock->sq, sbuf_size);
ofi_byteq_init(&bsock->rq, rbuf_size);
bsock->zerocopy_size = SIZE_MAX;
Expand Down
2 changes: 2 additions & 0 deletions include/rdma/fi_errno.h
Expand Up @@ -194,6 +194,8 @@ enum {
FI_ENOAV = 267, /* Missing or unavailable address vector */
FI_EOVERRUN = 268, /* Queue has been overrun */
FI_ENORX = 269, /* Receiver not ready, no receive buffers available */
FI_EIOURING_PREP = 270, /* io_uring operation prepared, waiting for submission */
FI_EIOURING_FULL = 271, /* io_uring full */
FI_ERRNO_MAX
};

Expand Down
15 changes: 15 additions & 0 deletions prov/net/src/xnet.h
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2017-2022 Intel Corporation, Inc. All rights reserved.
* Copyright (c) 2022 DataDirect Networks, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -85,6 +86,7 @@ extern size_t xnet_default_tx_size;
extern size_t xnet_default_rx_size;
extern size_t xnet_zerocopy_size;
extern int xnet_disable_autoprog;
extern int xnet_io_uring;

struct xnet_xfer_entry;
struct xnet_ep;
Expand All @@ -106,6 +108,7 @@ enum xnet_state {
enum {
XNET_CLASS_CM = OFI_PROV_SPECIFIC_TCP,
XNET_CLASS_PROGRESS,
XNET_CLASS_IO_URING,
};

struct xnet_port_range {
Expand Down Expand Up @@ -152,6 +155,7 @@ struct xnet_cur_rx {

struct xnet_cur_tx {
size_t data_left;
bool io_uring_busy;
struct xnet_xfer_entry *entry;
};

Expand Down Expand Up @@ -246,6 +250,13 @@ ssize_t xnet_get_conn(struct xnet_rdm *rdm, fi_addr_t dest_addr,
struct xnet_ep *xnet_get_ep(struct xnet_rdm *rdm, fi_addr_t addr);
void xnet_freeall_conns(struct xnet_rdm *rdm);

struct xnet_io_uring
{
struct fid fid;
ofi_io_uring_t ring;
size_t credits;
};

/* Serialization is handled at the progress instance level, using the
* progress locks. A progress instance has 2 locks, only one of which is
* enabled. The other lock will be set to NONE, meaning it is fully disabled.
Expand Down Expand Up @@ -289,6 +300,10 @@ struct xnet_progress {
struct slist event_list;
struct ofi_bufpool *xfer_pool;

struct xnet_io_uring tx_io_uring;
struct xnet_io_uring rx_io_uring;
struct ofi_sockapi sockapi;

struct ofi_dynpoll epoll_fd;

bool auto_progress;
Expand Down
5 changes: 2 additions & 3 deletions prov/net/src/xnet_ep.c
Expand Up @@ -43,7 +43,6 @@ extern struct fi_ops_rma xnet_rma_ops;
extern struct fi_ops_msg xnet_msg_ops;
extern struct fi_ops_tagged xnet_tagged_ops;


void xnet_hdr_none(struct xnet_base_hdr *hdr)
{
/* no-op */
Expand Down Expand Up @@ -622,8 +621,8 @@ int xnet_endpoint(struct fid_domain *domain, struct fi_info *info,
if (ret)
goto err1;

ofi_bsock_init(&ep->bsock, xnet_staging_sbuf_size,
xnet_prefetch_rbuf_size);
ofi_bsock_init(&ep->bsock, &xnet_ep2_progress(ep)->sockapi,
xnet_staging_sbuf_size, xnet_prefetch_rbuf_size);
if (info->handle) {
if (((fid_t) info->handle)->fclass == FI_CLASS_PEP) {
pep = container_of(info->handle, struct xnet_pep,
Expand Down
8 changes: 8 additions & 0 deletions prov/net/src/xnet_init.c
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2017-2022 Intel Corporation. All rights reserved.
* Copyright (c) 2022 DataDirect Networks, Inc. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -64,6 +65,7 @@ size_t xnet_default_tx_size = 256;
size_t xnet_default_rx_size = 256;
size_t xnet_zerocopy_size = SIZE_MAX;
int xnet_disable_autoprog;
int xnet_io_uring;


static void xnet_init_env(void)
Expand Down Expand Up @@ -137,6 +139,12 @@ static void xnet_init_env(void)
"prevent auto-progress thread from starting");
fi_param_get_bool(&xnet_prov, "disable_auto_progress",
&xnet_disable_autoprog);
#ifdef HAVE_LIBURING
fi_param_define(&xnet_prov, "io_uring", FI_PARAM_BOOL,
"Enable io_uring support (default: %d)", xnet_io_uring);
fi_param_get_bool(&xnet_prov, "io_uring",
&xnet_io_uring);
#endif
}

static void xnet_fini(void)
Expand Down