Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liburing zero copy #1273

Merged
merged 11 commits into from
Dec 22, 2023
10 changes: 9 additions & 1 deletion worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class DepLibUring
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
Expand All @@ -103,18 +107,22 @@ class DepLibUring
private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
// Event file descriptor to watch for io_uring completions.
int efd;
// libuv handle used to poll io_uring completions.
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether Zero Copy feature is enabled.
bool zeroCopyEnabled{ true };
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
// Pre-allocated SendBuffer's.
SendBuffer sendBuffers[QueueDepth];
// iovec structs to be registered for Zero Copy.
struct iovec iovecs[QueueDepth];
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
Expand Down
2 changes: 2 additions & 0 deletions worker/include/Utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ namespace Utils

static void GetAddressInfo(const struct sockaddr* addr, int& family, std::string& ip, uint16_t& port);

static size_t GetAddressLen(const struct sockaddr* addr);

static bool CompareAddresses(const struct sockaddr* addr1, const struct sockaddr* addr2)
{
// Compare family.
Expand Down
104 changes: 90 additions & 14 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <sys/eventfd.h>
#include <sys/utsname.h>

Expand Down Expand Up @@ -40,27 +41,69 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

if (cqe->res < 0)
if (liburing->IsZeroCopyEnabled())
{
MS_ERROR("sending failed: %s", std::strerror(-cqe->res));
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}
}

// Successfull SQE.
if (cqe->res >= 0)
{
if (userData->cb)
{
(*userData->cb)(false);
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}
}
// Failed SQE.
else
{
if (userData->cb)
{
(*userData->cb)(true);
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

io_uring_cqe_seen(liburing->GetRing(), cqe);
liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);
}
}

Expand Down Expand Up @@ -258,6 +301,32 @@ DepLibUring::LibUring::LibUring()
this->userDatas[i].store = this->sendBuffers[i];
this->availableUserDataEntries.push(i);
}

// Initialize iovecs.
for (size_t i{ 0 }; i < DepLibUring::QueueDepth; ++i)
{
this->iovecs[i].iov_base = this->sendBuffers[i];
this->iovecs[i].iov_len = DepLibUring::SendBufferSize;
}

err = io_uring_register_buffers(std::addressof(this->ring), this->iovecs, DepLibUring::QueueDepth);

if (err < 0)
{
if (err == ENOMEM)
jmillan marked this conversation as resolved.
Show resolved Hide resolved
{
this->zeroCopyEnabled = false;

MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy",
std::strerror(-err));
jmillan marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
}
}
}

DepLibUring::LibUring::~LibUring()
Expand Down Expand Up @@ -372,8 +441,6 @@ bool DepLibUring::LibUring::PrepareSend(
return false;
}

userData->cb = cb;

// The send data buffer belongs to us, no need to memcpy.
if (this->IsDataInSendBuffers(data))
{
Expand All @@ -384,21 +451,30 @@ bool DepLibUring::LibUring::PrepareSend(
std::memcpy(userData->store, data, len);
}

userData->cb = cb;

io_uring_sqe_set_data(sqe, userData);

socklen_t addrlen = 0;
socklen_t addrlen = Utils::IP::GetAddressLen(addr);

if (addr->sa_family == AF_INET)
if (this->zeroCopyEnabled)
{
addrlen = sizeof(struct sockaddr_in);
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else if (addr->sa_family == AF_INET6)
else
{
addrlen = sizeof(struct sockaddr_in6);
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}

io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);

this->sqeProcessCount++;

return true;
Expand Down
23 changes: 23 additions & 0 deletions worker/src/Utils/IP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ namespace Utils
ip.assign(ipBuffer);
}

size_t IP::GetAddressLen(const struct sockaddr* addr)
{
MS_TRACE();

switch (addr->sa_family)
{
case AF_INET:
{
return sizeof(struct sockaddr_in);
}

case AF_INET6:
{
return sizeof(struct sockaddr_in6);
}

default:
{
MS_ABORT("unknown network family: %d", static_cast<int>(addr->sa_family));
}
}
}

void IP::NormalizeIp(std::string& ip)
{
MS_TRACE();
Expand Down