Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListenInfo: Add transport socket flags #1291

Merged
merged 16 commits into from
Jan 3, 2024
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:

env:
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
19 changes: 18 additions & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
Transport,
TransportListenInfo,
TransportListenIp,
TransportProtocol
TransportProtocol,
TransportSocketFlag
} from './Transport';
import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport';
import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport';
Expand Down Expand Up @@ -570,6 +571,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToInteger(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
));
Expand Down Expand Up @@ -749,6 +751,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToInteger(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand All @@ -759,6 +762,7 @@ export class Router<RouterAppData extends AppData = AppData>
rtcpListenInfo.ip,
rtcpListenInfo.announcedIp,
rtcpListenInfo.port,
socketFlagsToInteger(rtcpListenInfo.flags),
rtcpListenInfo.sendBufferSize,
rtcpListenInfo.recvBufferSize
) : undefined,
Expand Down Expand Up @@ -897,6 +901,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToInteger(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand Down Expand Up @@ -1619,3 +1624,15 @@ export function parseRouterDumpResponse(
mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId')
};
}

function socketFlagsToInteger(flags: TransportSocketFlag[] = []): number
{
let flagsInteger = 0;

for (const flag of flags)
{
flagsInteger |= flag;
}

return flagsInteger;
}
24 changes: 24 additions & 0 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type TransportListenInfo =
*/
port?: number;

/**
* Socket flags.
*/
flags?: TransportSocketFlag[];

/**
* Send buffer size (bytes).
*/
Expand Down Expand Up @@ -107,6 +112,25 @@ export type TransportListenIp =
*/
export type TransportProtocol = 'udp' | 'tcp';

/**
* UDP/TCP socket flag.
*/
// NOTE: ESLint absurdly complains about "'TransportSocketFlag' is already
// declared in the upper scope".
// eslint-disable-next-line no-shadow
export enum TransportSocketFlag
{
/**
* Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
*/
IPV6ONLY = FbsTransport.SocketFlag.IPV6ONLY,
/**
* Make different transports bind to the same ip and port (only for UDP).
* Useful for multicast scenarios with plain transport. Use with caution.
*/
UDP_REUSEPORT = FbsTransport.SocketFlag.UDP_REUSEPORT
}

export type TransportTuple =
{
localIp: string;
Expand Down
43 changes: 43 additions & 0 deletions node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import * as os from 'node:os';
// @ts-ignore
import * as pickPort from 'pick-port';
import * as mediasoup from '../';

const IS_WINDOWS = os.platform() === 'win32';

let worker: mediasoup.types.Worker;
let router: mediasoup.types.Router;
let transport: mediasoup.types.PlainTransport;
Expand Down Expand Up @@ -316,6 +319,46 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as
.toThrow(Error);
}, 2000);

if (!IS_WINDOWS)
{
test('two transports listening in same IP:port succeed if UDP_REUSEPORT flag is set', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ]
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : [ mediasoup.types.TransportSocketFlag.UDP_REUSEPORT ]
}
});
}).not.toThrow();

transport1?.close();
transport2?.close();
}, 2000);
}

test('plainTransport.getStats() succeeds', async () =>
{
const data = await transport.getStats();
Expand Down
8 changes: 4 additions & 4 deletions npm-scripts.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import os from 'node:os';
import fs from 'node:fs';
import path from 'node:path';
import * as process from 'node:process';
import * as os from 'node:os';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
import fetch from 'node-fetch';
import tar from 'tar';
Expand Down
23 changes: 23 additions & 0 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ListenInfo {
/// Listening port.
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
/// Socket flags.
#[serde(skip_serializing_if = "Option::is_none")]
pub flags: Option<Vec<SocketFlag>>,
/// Send buffer size (bytes).
#[serde(skip_serializing_if = "Option::is_none")]
pub send_buffer_size: Option<u32>,
Expand Down Expand Up @@ -190,6 +193,26 @@ impl Protocol {
}
}

/// Transport socket flag.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum SocketFlag {
/// Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
IpV6Only,
/// Make different transports bind to the same ip and port (only for UDP).
/// Useful for multicast scenarios with plain transport. Use with caution.
UdpReusePort,
}

impl SocketFlag {
pub(crate) fn to_fbs(self) -> transport::SocketFlag {
match self {
SocketFlag::IpV6Only => transport::SocketFlag::IPV6ONLY,
SocketFlag::UdpReusePort => transport::SocketFlag::UDP_REUSEPORT,
}
}
}

/// ICE candidate
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down
10 changes: 8 additions & 2 deletions worker/fbs/transport.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ enum Protocol: uint8 {
TCP
}

enum SocketFlag: uint8 {
IPV6ONLY = 1,
UDP_REUSEPORT = 2
}

table ListenInfo {
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
ip: string (required);
announced_ip: string;
port: uint16 = 0;
flags: uint8 = 0;
jmillan marked this conversation as resolved.
Show resolved Hide resolved
send_buffer_size: uint32 = 0;
recv_buffer_size: uint32 = 0;
}
Expand Down Expand Up @@ -85,7 +91,7 @@ table Tuple {
local_port: uint16;
remote_ip: string;
remote_port: uint16;
protocol: FBS.Transport.Protocol = UDP;
protocol: Protocol = UDP;
}

table RtpListener {
Expand Down
13 changes: 13 additions & 0 deletions worker/include/Logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

// Usage:
// MS_DEBUG_DEV("Leading text "MS_UINT8_TO_BINARY_PATTERN, MS_UINT8_TO_BINARY(value));
#define MS_UINT8_TO_BINARY_PATTERN "%c%c%c%c%c%c%c%c"
#define MS_UINT8_TO_BINARY(value) \
((value & 0x80) ? '1' : '0'), \
((value & 0x40) ? '1' : '0'), \
((value & 0x20) ? '1' : '0'), \
((value & 0x10) ? '1' : '0'), \
((value & 0x08) ? '1' : '0'), \
((value & 0x04) ? '1' : '0'), \
((value & 0x02) ? '1' : '0'), \
((value & 0x01) ? '1' : '0')

class Logger
{
public:
Expand Down
21 changes: 11 additions & 10 deletions worker/include/RTC/PortManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ namespace RTC
};

public:
static uv_udp_t* BindUdp(std::string& ip)
static uv_udp_t* BindUdp(std::string& ip, uint8_t flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, flags));
}
static uv_udp_t* BindUdp(std::string& ip, uint16_t port)
static uv_udp_t* BindUdp(std::string& ip, uint16_t port, uint8_t flags)
{
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port));
return reinterpret_cast<uv_udp_t*>(Bind(Transport::UDP, ip, port, flags));
}
static uv_tcp_t* BindTcp(std::string& ip)
static uv_tcp_t* BindTcp(std::string& ip, uint8_t flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, flags));
}
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port)
static uv_tcp_t* BindTcp(std::string& ip, uint16_t port, uint8_t flags)
{
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port));
return reinterpret_cast<uv_tcp_t*>(Bind(Transport::TCP, ip, port, flags));
}
static void UnbindUdp(std::string& ip, uint16_t port)
{
Expand All @@ -46,10 +46,11 @@ namespace RTC
}

private:
static uv_handle_t* Bind(Transport transport, std::string& ip);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint8_t flags);
static uv_handle_t* Bind(Transport transport, std::string& ip, uint16_t port, uint8_t flags);
static void Unbind(Transport transport, std::string& ip, uint16_t port);
static std::vector<bool>& GetPorts(Transport transport, const std::string& ip);
static uint8_t ConvertSocketFlags(uint8_t flags);

private:
thread_local static absl::flat_hash_map<std::string, std::vector<bool>> mapUdpIpPorts;
Expand Down
9 changes: 7 additions & 2 deletions worker/include/RTC/TcpServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ namespace RTC
};

public:
TcpServer(Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip);
TcpServer(
Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint16_t port);
Listener* listener, RTC::TcpConnection::Listener* connListener, std::string& ip, uint8_t flags);
TcpServer(
Listener* listener,
RTC::TcpConnection::Listener* connListener,
std::string& ip,
uint16_t port,
uint8_t flags);
~TcpServer() override;

/* Pure virtual methods inherited from ::TcpServerHandle. */
Expand Down
1 change: 1 addition & 0 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ namespace RTC
uint16_t port{ 0u };
uint32_t sendBufferSize{ 0u };
uint32_t recvBufferSize{ 0u };
uint8_t flags{ 0u };
};

private:
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/UdpSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ namespace RTC
};

public:
UdpSocket(Listener* listener, std::string& ip);
UdpSocket(Listener* listener, std::string& ip, uint16_t port);
UdpSocket(Listener* listener, std::string& ip, uint8_t flags);
UdpSocket(Listener* listener, std::string& ip, uint16_t port, uint8_t flags);
~UdpSocket() override;

/* Pure virtual methods inherited from ::UdpSocketHandle. */
Expand Down
22 changes: 7 additions & 15 deletions worker/src/RTC/PipeTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,10 @@ namespace RTC
this->listenInfo.announcedIp.assign(options->listenInfo()->announcedIp()->str());
}

this->listenInfo.port = options->listenInfo()->port();

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_SENDBUFFERSIZE))
{
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
}

if (flatbuffers::IsFieldPresent(
options->listenInfo(), FBS::Transport::ListenInfo::VT_RECVBUFFERSIZE))
{
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
}
this->listenInfo.port = options->listenInfo()->port();
this->listenInfo.sendBufferSize = options->listenInfo()->sendBufferSize();
this->listenInfo.recvBufferSize = options->listenInfo()->recvBufferSize();
this->listenInfo.flags = options->listenInfo()->flags();

this->rtx = options->enableRtx();

Expand All @@ -77,11 +68,12 @@ namespace RTC
// This may throw.
if (this->listenInfo.port != 0)
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.port);
this->udpSocket = new RTC::UdpSocket(
this, this->listenInfo.ip, this->listenInfo.port, this->listenInfo.flags);
}
else
{
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip);
this->udpSocket = new RTC::UdpSocket(this, this->listenInfo.ip, this->listenInfo.flags);
}

if (this->listenInfo.sendBufferSize != 0)
Expand Down