Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liburing zero copy #1273

Merged
merged 11 commits into from
Dec 22, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### NEXT

* worker: Do not use references for async callbacks ([PR #1274](https://github.com/versatica/mediasoup/pull/1274)).
* liburing: Enable zero copy ([PR #1273](https://github.com/versatica/mediasoup/pull/1273)).


### 3.13.12
Expand Down
10 changes: 9 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class DepLibUring
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
Expand All @@ -103,18 +107,22 @@ class DepLibUring
private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
// Event file descriptor to watch for io_uring completions.
int efd;
// libuv handle used to poll io_uring completions.
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether Zero Copy feature is enabled.
bool zeroCopyEnabled{ true };
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
// Pre-allocated SendBuffer's.
SendBuffer sendBuffers[QueueDepth];
// iovec structs to be registered for Zero Copy.
struct iovec iovecs[QueueDepth];
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
Expand Down
2 changes: 2 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Utils

static void GetAddressInfo(const struct sockaddr* addr, int& family, std::string& ip, uint16_t& port);

static size_t GetAddressLen(const struct sockaddr* addr);

static bool CompareAddresses(const struct sockaddr* addr1, const struct sockaddr* addr2)
{
// Compare family.
Expand Down
106 changes: 92 additions & 14 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <sys/eventfd.h>
#include <sys/utsname.h>

Expand Down Expand Up @@ -40,27 +41,69 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0)
if (liburing->IsZeroCopyEnabled())
{
MS_ERROR("sending failed: %s", std::strerror(-cqe->res));
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}
}

// Successfull SQE.
if (cqe->res >= 0)
{
if (userData->cb)
{
(*userData->cb)(false);
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
}
// Failed SQE.
else
{
if (userData->cb)
{
(*userData->cb)(true);
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

io_uring_cqe_seen(liburing->GetRing(), cqe);
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
}
}

Expand Down Expand Up @@ -258,6 +301,34 @@ DepLibUring::LibUring::LibUring()
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}

// Initialize iovecs.
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->iovecs[i].iov_base = this->sendBuffers[i];
this->iovecs[i].iov_len = DepLibUring::SendBufferSize;
}

err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth);

if (err < 0)
{
int errno = -1 * err;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here seems the variable name. It's not a good idea using errno as a local variable as the compiler threats it specially (see the compiling errors.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this file is using -err in other places for logging purposes, so this change is generating an inconsistency across the file.

Copy link
Member

@ibc ibc Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes done to make it consistent.


if (errno == ENOMEM)
{
this->zeroCopyEnabled = false;

MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy: %s",
std::strerror(errno));
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(errno));
}
}
}

DepLibUring::LibUring::~LibUring()
Expand Down Expand Up @@ -372,8 +443,6 @@ bool DepLibUring::LibUring::PrepareSend(
return false;
}

userData->cb = cb;

// The send data buffer belongs to us, no need to memcpy.
if (this->IsDataInSendBuffers(data))
{
Expand All @@ -384,21 +453,30 @@ bool DepLibUring::LibUring::PrepareSend(
std::memcpy(userData->store, data, len);
}

userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;
socklen_t addrlen = Utils::IP::GetAddressLen(addr);

if (addr->sa_family == AF_INET)
if (this->zeroCopyEnabled)
{
addrlen = sizeof(struct sockaddr_in);
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else if (addr->sa_family == AF_INET6)
else
{
addrlen = sizeof(struct sockaddr_in6);
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}

io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);

this->sqeProcessCount++;

return true;
Expand Down
23 changes: 23 additions & 0 deletions worker/src/Utils/IP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ namespace Utils
ip.assign(ipBuffer);
}

size_t IP::GetAddressLen(const struct sockaddr* addr)
{
MS_TRACE();

switch (addr->sa_family)
{
case AF_INET:
{
return sizeof(struct sockaddr_in);
}

case AF_INET6:
{
return sizeof(struct sockaddr_in6);
}

default:
{
MS_ABORT("unknown network family: %d", static_cast<int>(addr->sa_family));
}
}
}

void IP::NormalizeIp(std::string& ip)
{
MS_TRACE();
Expand Down