Skip to content

Commit

Permalink
liburing zero copy (#1273)
Browse files Browse the repository at this point in the history
* liburing zero copy

By pre-registering the send buffers we can make use of zero copy in
order to avoid the kernel memcpy-ing the buffers.

Send buffers are allocated by us and will be set as available once
liburing notifies us about it.

* handle feedback

* handle ENOMEM error when registering buffers

* fix error comparison

* Cosmetic

* Try to make compiled happy

* Do not use errno variable name

* Use error = -1 * err everywhere

* remove uneeded '-1 *' to negate an int

---------

Co-authored-by: Iñaki Baz Castillo <ibc@aliax.net>
  • Loading branch information
jmillan and ibc committed Dec 22, 2023
1 parent 82ed122 commit 3cefa11
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### NEXT

* worker: Do not use references for async callbacks ([PR #1274](https://github.com/versatica/mediasoup/pull/1274)).
* liburing: Enable zero copy ([PR #1273](https://github.com/versatica/mediasoup/pull/1273)).


### 3.13.12
Expand Down
10 changes: 9 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class DepLibUring
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
Expand All @@ -103,18 +107,22 @@ class DepLibUring
private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
// Event file descriptor to watch for io_uring completions.
int efd;
// libuv handle used to poll io_uring completions.
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether Zero Copy feature is enabled.
bool zeroCopyEnabled{ true };
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
// Pre-allocated SendBuffer's.
SendBuffer sendBuffers[QueueDepth];
// iovec structs to be registered for Zero Copy.
struct iovec iovecs[QueueDepth];
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
Expand Down
2 changes: 2 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Utils

static void GetAddressInfo(const struct sockaddr* addr, int& family, std::string& ip, uint16_t& port);

static size_t GetAddressLen(const struct sockaddr* addr);

static bool CompareAddresses(const struct sockaddr* addr1, const struct sockaddr* addr2)
{
// Compare family.
Expand Down
133 changes: 114 additions & 19 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <sys/eventfd.h>
#include <sys/utsname.h>

Expand All @@ -30,37 +31,83 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
// the counter in order to avoid libuv calling this callback indefinitely.
eventfd_t v;
int err = eventfd_read(liburing->GetEventFd(), std::addressof(v));

if (err < 0)
{
MS_ABORT("eventfd_read() failed: %s", std::strerror(-err));
// Get positive errno.
int error = -err;

MS_ABORT("eventfd_read() failed: %s", std::strerror(error));
};

for (unsigned int i{ 0 }; i < count; ++i)
{
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0)
if (liburing->IsZeroCopyEnabled())
{
MS_ERROR("sending failed: %s", std::strerror(-cqe->res));
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}
}

// Successfull SQE.
if (cqe->res >= 0)
{
if (userData->cb)
{
(*userData->cb)(false);
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
}
// Failed SQE.
else
{
if (userData->cb)
{
(*userData->cb)(true);
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

io_uring_cqe_seen(liburing->GetRing(), cqe);
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
}
}

Expand Down Expand Up @@ -234,7 +281,10 @@ DepLibUring::LibUring::LibUring()

if (err < 0)
{
MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(-err));
// Get positive errno.
int error = -err;

MS_THROW_ERROR("io_uring_queue_init() failed: %s", std::strerror(error));
}

// Create an eventfd instance.
Expand All @@ -249,7 +299,10 @@ DepLibUring::LibUring::LibUring()

if (err < 0)
{
MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(-err));
// Get positive errno.
int error = -err;

MS_THROW_ERROR("io_uring_register_eventfd() failed: %s", std::strerror(error));
}

// Initialize available UserData entries.
Expand All @@ -258,6 +311,35 @@ DepLibUring::LibUring::LibUring()
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}

// Initialize iovecs.
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->iovecs[i].iov_base = this->sendBuffers[i];
this->iovecs[i].iov_len = DepLibUring::SendBufferSize;
}

err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth);

if (err < 0)
{
// Get positive errno.
int error = -err;

if (error == ENOMEM)
{
this->zeroCopyEnabled = false;

MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy: %s",
std::strerror(error));
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(error));
}
}
}

DepLibUring::LibUring::~LibUring()
Expand All @@ -269,7 +351,10 @@ DepLibUring::LibUring::~LibUring()

if (err != 0)
{
MS_ABORT("close() failed: %s", std::strerror(-err));
// Get positive errno.
int error = -err;

MS_ABORT("close() failed: %s", std::strerror(error));
}

// Close the ring.
Expand Down Expand Up @@ -372,8 +457,6 @@ bool DepLibUring::LibUring::PrepareSend(
return false;
}

userData->cb = cb;

// The send data buffer belongs to us, no need to memcpy.
if (this->IsDataInSendBuffers(data))
{
Expand All @@ -384,21 +467,30 @@ bool DepLibUring::LibUring::PrepareSend(
std::memcpy(userData->store, data, len);
}

userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;
socklen_t addrlen = Utils::IP::GetAddressLen(addr);

if (addr->sa_family == AF_INET)
if (this->zeroCopyEnabled)
{
addrlen = sizeof(struct sockaddr_in);
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else if (addr->sa_family == AF_INET6)
else
{
addrlen = sizeof(struct sockaddr_in6);
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}

io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);

this->sqeProcessCount++;

return true;
Expand Down Expand Up @@ -482,7 +574,10 @@ void DepLibUring::LibUring::Submit()
}
else
{
MS_ERROR("io_uring_submit() failed: %s", std::strerror(-err));
// Get positive errno.
int error = -err;

MS_ERROR("io_uring_submit() failed: %s", std::strerror(error));
}
}

Expand Down
23 changes: 23 additions & 0 deletions worker/src/Utils/IP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ namespace Utils
ip.assign(ipBuffer);
}

size_t IP::GetAddressLen(const struct sockaddr* addr)
{
MS_TRACE();

switch (addr->sa_family)
{
case AF_INET:
{
return sizeof(struct sockaddr_in);
}

case AF_INET6:
{
return sizeof(struct sockaddr_in6);
}

default:
{
MS_ABORT("unknown network family: %d", static_cast<int>(addr->sa_family));
}
}
}

void IP::NormalizeIp(std::string& ip)
{
MS_TRACE();
Expand Down

0 comments on commit 3cefa11

Please sign in to comment.