Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liburing zero copy #1273

Merged
merged 11 commits into from
Dec 22, 2023
6 changes: 6 additions & 0 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class DepLibUring
{
return this->active;
}
bool IsZeroCopyEnabled() const
{
return this->zeroCopyEnabled;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
Expand Down Expand Up @@ -109,6 +113,8 @@ class DepLibUring
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Whether Zero Copy feature is enabled.
bool zeroCopyEnabled{ true };
// Pre-allocated UserData's.
UserData userDatas[QueueDepth]{};
// Indexes of available UserData entries.
Expand Down
90 changes: 56 additions & 34 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,44 @@ inline static void onFdEvent(uv_poll_t* handle, int status, int events)
struct io_uring_cqe* cqe = cqes[i];
auto* userData = static_cast<DepLibUring::UserData*>(io_uring_cqe_get_data(cqe));

// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
if (liburing->IsZeroCopyEnabled())
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
// CQE notification for a zero-copy submission.
if (cqe->flags & IORING_CQE_F_NOTIF)
{
// The send buffer is now in the network card, run the send callback.
if (userData->cb)
{
(*userData->cb)(false);
(*userData->cb)(true);
delete userData->cb;
userData->cb = nullptr;
}

liburing->ReleaseUserDataEntry(userData->idx);
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);
// CQE for a zero-copy submission, a CQE notification will follow.
if (cqe->flags & IORING_CQE_F_MORE)
{
if (cqe->res < 0)
{
if (userData->cb)
{
(*userData->cb)(false);
delete userData->cb;
userData->cb = nullptr;
}
}

// NOTE: Do not release the user data as it will be done upon reception
// of CQE notification.
io_uring_cqe_seen(liburing->GetRing(), cqe);

continue;
continue;
}
}

// Successfull SQE.
Expand Down Expand Up @@ -310,7 +313,19 @@ DepLibUring::LibUring::LibUring()

if (err < 0)
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
if (err == ENOMEM)
jmillan marked this conversation as resolved.
Show resolved Hide resolved
{
this->zeroCopyEnabled = false;

MS_WARN_TAG(
info,
"io_uring_register_buffers() failed due to low memlock limit (ulimit -l), disabling zero copy",
std::strerror(-err));
jmillan marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
MS_THROW_ERROR("io_uring_register_buffers() failed: %s", std::strerror(-err));
}
}
}

Expand Down Expand Up @@ -442,16 +457,23 @@ bool DepLibUring::LibUring::PrepareSend(

socklen_t addrlen = Utils::IP::GetAddressLen(addr);

auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;
if (this->zeroCopyEnabled)
{
auto iovec = this->iovecs[userData->idx];
iovec.iov_len = len;

io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);
io_uring_prep_send_zc(sqe, sockfd, iovec.iov_base, iovec.iov_len, 0, 0);
io_uring_prep_send_set_addr(sqe, addr, addrlen);

// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
// Tell io_uring that we are providing the already registered send buffer
// for zero copy.
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = userData->idx;
}
else
{
io_uring_prep_sendto(sqe, sockfd, userData->store, len, 0, addr, addrlen);
}

this->sqeProcessCount++;

Expand Down