Skip to content

Commit

Permalink
liburing (#1218)
Browse files Browse the repository at this point in the history
Enable liburing usage for linux (kernel versions >= 6).

Use liburing to relay incoming RTP to Consumers. This greately reduces the
number of system calls as just a single system call is needed to relay the
incoming RTP packet to "every" Consumer, opposite to libuv which requires a
system call to relay the RTP packet to "each" Consumer.

For efficiency, memory buffers are preallocated holding the data payload
being sent.

---------

Co-authored-by: Iñaki Baz Castillo <ibc@aliax.net>
  • Loading branch information
jmillan and ibc committed Nov 27, 2023
1 parent f296e9f commit 6821b55
Show file tree
Hide file tree
Showing 23 changed files with 795 additions and 9 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog


### NEXT

* Enable `liburing` usage for Linux (kernel versions >= 6) ([PR #1218](https://github.com/versatica/mediasoup/pull/1218)).


### 3.13.6

* Replace make + Makefile with Python Invoke library + tasks.py (also fix installation under path with whitespaces) ([PR #1239](https://github.com/versatica/mediasoup/pull/1239)).
Expand Down
20 changes: 19 additions & 1 deletion node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ export type WorkerDump =
channelRequestHandlers : string[];
channelNotificationHandlers : string[];
};
liburing? :
{
sqeProcessCount: number;
sqeMissCount: number;
userDataMissCount: number;
};
};

export type WorkerEvents =
Expand Down Expand Up @@ -814,7 +820,7 @@ export function parseWorkerDumpResponse(
binary: FbsWorker.DumpResponse
): WorkerDump
{
return {
const dump: WorkerDump = {
pid : binary.pid()!,
webRtcServerIds : parseVector(binary, 'webRtcServerIds'),
routerIds : parseVector(binary, 'routerIds'),
Expand All @@ -824,4 +830,16 @@ export function parseWorkerDumpResponse(
channelNotificationHandlers : parseVector(binary.channelMessageHandlers()!, 'channelNotificationHandlers')
}
};

if (binary.liburing())
{
dump.liburing =
{
sqeProcessCount : Number(binary.liburing()!.sqeProcessCount()),
sqeMissCount : Number(binary.liburing()!.sqeMissCount()),
userDataMissCount : Number(binary.liburing()!.userDataMissCount())
};
}

return dump;
}
2 changes: 1 addition & 1 deletion node/src/tests/test-Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ test('worker.createRouter() succeeds', async () =>

await expect(worker.dump())
.resolves
.toEqual(
.toMatchObject(
{
pid : worker.pid,
webRtcServerIds : [],
Expand Down
6 changes: 3 additions & 3 deletions node/src/tests/test-WebRtcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ test('worker.createWebRtcServer() succeeds', async () =>

await expect(worker.dump())
.resolves
.toEqual(
.toMatchObject(
{
pid : worker.pid,
webRtcServerIds : [ webRtcServer.id ],
Expand Down Expand Up @@ -120,7 +120,7 @@ test('worker.createWebRtcServer() without specifying port succeeds', async () =>

await expect(worker.dump())
.resolves
.toEqual(
.toMatchObject(
{
pid : worker.pid,
webRtcServerIds : [ webRtcServer.id ],
Expand Down Expand Up @@ -565,7 +565,7 @@ test('router.createWebRtcTransport() with webRtcServer succeeds and webRtcServer

await expect(worker.dump())
.resolves
.toEqual(
.toMatchObject(
{
pid : worker.pid,
webRtcServerIds : [],
Expand Down
2 changes: 1 addition & 1 deletion node/src/tests/test-Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ test('worker.dump() succeeds', async () =>

await expect(worker.dump())
.resolves
.toEqual(
.toMatchObject(
{
pid : worker.pid,
webRtcServerIds : [],
Expand Down
7 changes: 6 additions & 1 deletion rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::webrtc_server::{
use crate::webrtc_transport::{
WebRtcTransportListen, WebRtcTransportListenInfos, WebRtcTransportOptions,
};
use crate::worker::{ChannelMessageHandlers, WorkerDump, WorkerUpdateSettings};
use crate::worker::{ChannelMessageHandlers, LibUringDump, WorkerDump, WorkerUpdateSettings};
use mediasoup_sys::fbs::{
active_speaker_observer, audio_level_observer, consumer, data_consumer, data_producer,
direct_transport, message, notification, pipe_transport, plain_transport, producer, request,
Expand Down Expand Up @@ -165,6 +165,11 @@ impl Request for WorkerDumpRequest {
.map(|id| id.parse())
.collect::<Result<_, _>>()?,
},
liburing: data.liburing.map(|liburing| LibUringDump {
sqe_process_count: liburing.sqe_process_count,
sqe_miss_count: liburing.sqe_miss_count,
user_data_miss_count: liburing.user_data_miss_count,
}),
})
}
}
Expand Down
9 changes: 9 additions & 0 deletions rust/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,14 @@ pub struct ChannelMessageHandlers {
pub channel_notification_handlers: Vec<Uuid>,
}

#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq)]
#[doc(hidden)]
pub struct LibUringDump {
pub sqe_process_count: u64,
pub sqe_miss_count: u64,
pub user_data_miss_count: u64,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
Expand All @@ -288,6 +296,7 @@ pub struct WorkerDump {
#[serde(rename = "webRtcServerIds")]
pub webrtc_server_ids: Vec<WebRtcServerId>,
pub channel_message_handlers: ChannelMessageHandlers,
pub liburing: Option<LibUringDump>,
}

/// Error that caused [`Worker::create_webrtc_server`] to fail.
Expand Down
8 changes: 8 additions & 0 deletions worker/fbs/liburing.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace FBS.LibUring;

table Dump {
sqe_process_count: uint64;
sqe_miss_count: uint64;
user_data_miss_count: uint64;
}

1 change: 1 addition & 0 deletions worker/fbs/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ flatbuffers_schemas = [
'dataConsumer.fbs',
'dataProducer.fbs',
'directTransport.fbs',
'liburing.fbs',
'log.fbs',
'message.fbs',
'notification.fbs',
Expand Down
2 changes: 2 additions & 0 deletions worker/fbs/worker.fbs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include "liburing.fbs";
include "transport.fbs";

namespace FBS.Worker;
Expand All @@ -12,6 +13,7 @@ table DumpResponse {
web_rtc_server_ids: [string] (required);
router_ids: [string] (required);
channel_message_handlers: ChannelMessageHandlers (required);
liburing: FBS.LibUring.Dump;
}

table ResourceUsageResponse {
Expand Down
104 changes: 104 additions & 0 deletions worker/include/DepLibUring.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#ifndef MS_DEP_LIBURING_HPP
#define MS_DEP_LIBURING_HPP

#include "DepLibUV.hpp"
#include "FBS/liburing.h"
#include <functional>
#include <liburing.h>
#include <queue>

class DepLibUring
{
public:
using onSendCallback = const std::function<void(bool sent)>;

/* Struct for the user data field of SQE and CQE. */
struct UserData
{
uint8_t store[1500]{};
onSendCallback* cb{ nullptr };
size_t idx{ 0 };
};

/* Number of submission queue entries (SQE). */
static constexpr size_t QueueDepth{ 1024 * 4 };

static bool IsRuntimeSupported();
static void ClassInit();
static void ClassDestroy();
static flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder);
static void StartPollingCQEs();
static void StopPollingCQEs();
static bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
static bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
static void Submit();
static void SetActive();
static bool IsActive();

class LibUring;

thread_local static LibUring* liburing;

public:
class LibUring
{
public:
LibUring();
~LibUring();
flatbuffers::Offset<FBS::LibUring::Dump> FillBuffer(flatbuffers::FlatBufferBuilder& builder) const;
void StartPollingCQEs();
void StopPollingCQEs();
bool PrepareSend(
int sockfd, const void* data, size_t len, const struct sockaddr* addr, onSendCallback* cb);
bool PrepareWrite(
int sockfd, const void* data1, size_t len1, const void* data2, size_t len2, onSendCallback* cb);
void Submit();
void SetActive()
{
this->active = true;
}
bool IsActive() const
{
return this->active;
}
io_uring* GetRing()
{
return std::addressof(this->ring);
}
int GetEventFd() const
{
return this->efd;
}
void ReleaseUserDataEntry(size_t idx)
{
this->availableUserDataEntries.push(idx);
}

private:
UserData* GetUserData();

private:
// io_uring instance.
io_uring ring;
// Event file descriptor to watch for completions.
int efd;
// libuv handle used to poll io_uring completions.
uv_poll_t* uvHandle{ nullptr };
// Whether we are currently sending RTP over io_uring.
bool active{ false };
// Pre-allocated UserData entries.
UserData userDataBuffer[QueueDepth]{};
// Indexes of available UserData entries.
std::queue<size_t> availableUserDataEntries;
// Submission queue entry process count.
uint64_t sqeProcessCount{ 0u };
// Submission queue entry miss count.
uint64_t sqeMissCount{ 0u };
// User data miss count.
uint64_t userDataMissCount{ 0u };
};
};

#endif
4 changes: 4 additions & 0 deletions worker/include/handles/TcpConnectionHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class TcpConnectionHandle
uv_tcp_t* uvHandle{ nullptr };
// Others.
struct sockaddr_storage* localAddr{ nullptr };
#ifdef MS_LIBURING_SUPPORTED
// Local file descriptor for io_uring.
uv_os_fd_t fd{ 0u };
#endif
bool closed{ false };
size_t recvBytes{ 0u };
size_t sentBytes{ 0u };
Expand Down
4 changes: 4 additions & 0 deletions worker/include/handles/UdpSocketHandle.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class UdpSocketHandle
// Allocated by this (may be passed by argument).
uv_udp_t* uvHandle{ nullptr };
// Others.
#ifdef MS_LIBURING_SUPPORTED
// Local file descriptor for io_uring.
uv_os_fd_t fd{ 0u };
#endif
bool closed{ false };
size_t recvBytes{ 0u };
size_t sentBytes{ 0u };
Expand Down
22 changes: 22 additions & 0 deletions worker/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,28 @@ if host_machine.system() == 'windows'
]
endif

if host_machine.system() == 'linux'
kernel_version = run_command('uname', '-r').stdout().strip()

# Enable liburing for kernel versions greather than or equal to 6.
if kernel_version[0].to_int() >= 6
liburing_proj = subproject('liburing', default_options: ['default_library=static'], required: true)

dependencies += [
liburing_proj.get_variable('uring'),
]
link_whole += [
liburing_proj.get_variable('liburing')
]
common_sources += [
'src/DepLibUring.cpp',
]
cpp_args += [
'-DMS_LIBURING_SUPPORTED',
]
endif
endif

libmediasoup_worker = library(
'libmediasoup-worker',
name_prefix: '',
Expand Down

0 comments on commit 6821b55

Please sign in to comment.