Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ListenInfo: Add transport socket flags #1291

Merged
merged 16 commits into from
Jan 3, 2024
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:

env:
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker-prebuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/mediasoup-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
CC: ${{ matrix.build.cc }}
CXX: ${{ matrix.build.cxx }}
MEDIASOUP_SKIP_WORKER_PREBUILT_DOWNLOAD: 'true'
MEDIASOUP_LOCAL_DEV: 'true'

steps:
- name: Checkout
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
### NEXT

* Avoid modification of user input data ([PR #1285](https://github.com/versatica/mediasoup/pull/1285)).
* `ListenInfo`: Add transport socket flags ([PR #1291](https://github.com/versatica/mediasoup/pull/1291)).


### 3.13.13
Expand Down
17 changes: 16 additions & 1 deletion node/src/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import {
Transport,
TransportListenInfo,
TransportListenIp,
TransportProtocol
TransportProtocol,
TransportSocketFlags
} from './Transport';
import { WebRtcTransport, WebRtcTransportOptions, parseWebRtcTransportDumpResponse } from './WebRtcTransport';
import { PlainTransport, PlainTransportOptions, parsePlainTransportDumpResponse } from './PlainTransport';
Expand Down Expand Up @@ -570,6 +571,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize
));
Expand Down Expand Up @@ -749,6 +751,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand All @@ -759,6 +762,7 @@ export class Router<RouterAppData extends AppData = AppData>
rtcpListenInfo.ip,
rtcpListenInfo.announcedIp,
rtcpListenInfo.port,
socketFlagsToFbs(rtcpListenInfo.flags),
rtcpListenInfo.sendBufferSize,
rtcpListenInfo.recvBufferSize
) : undefined,
Expand Down Expand Up @@ -897,6 +901,7 @@ export class Router<RouterAppData extends AppData = AppData>
listenInfo!.ip,
listenInfo!.announcedIp,
listenInfo!.port,
socketFlagsToFbs(listenInfo!.flags),
listenInfo!.sendBufferSize,
listenInfo!.recvBufferSize
),
Expand Down Expand Up @@ -1619,3 +1624,13 @@ export function parseRouterDumpResponse(
mapDataConsumerIdDataProducerId : parseStringStringVector(binary, 'mapDataConsumerIdDataProducerId')
};
}

export function socketFlagsToFbs(
flags: TransportSocketFlags = {}
): FbsTransport.SocketFlagsT
{
return new FbsTransport.SocketFlagsT(
Boolean(flags.ipv6Only),
Boolean(flags.udpReusePort)
);
}
21 changes: 21 additions & 0 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export type TransportListenInfo =
*/
port?: number;

/**
* Socket flags.
*/
flags?: TransportSocketFlags;

/**
* Send buffer size (bytes).
*/
Expand Down Expand Up @@ -107,6 +112,22 @@ export type TransportListenIp =
*/
export type TransportProtocol = 'udp' | 'tcp';

/**
* UDP/TCP socket flags.
*/
export type TransportSocketFlags =
{
/**
* Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
*/
ipv6Only?: boolean;
/**
* Make different transports bind to the same ip and port (only for UDP).
* Useful for multicast scenarios with plain transport. Use with caution.
*/
udpReusePort?: boolean;
};

export type TransportTuple =
{
localIp: string;
Expand Down
3 changes: 2 additions & 1 deletion node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Logger } from './Logger';
import { EnhancedEventEmitter } from './EnhancedEventEmitter';
import * as ortc from './ortc';
import { Channel } from './Channel';
import { Router, RouterOptions } from './Router';
import { Router, RouterOptions, socketFlagsToFbs } from './Router';
import { WebRtcServer, WebRtcServerOptions } from './WebRtcServer';
import { RtpCodecCapability } from './RtpParameters';
import { AppData } from './types';
Expand Down Expand Up @@ -699,6 +699,7 @@ export class Worker<WorkerAppData extends AppData = AppData>
listenInfo.ip,
listenInfo.announcedIp,
listenInfo.port,
socketFlagsToFbs(listenInfo.flags),
listenInfo.sendBufferSize,
listenInfo.recvBufferSize)
);
Expand Down
80 changes: 80 additions & 0 deletions node/src/tests/test-PlainTransport.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import * as os from 'node:os';
// @ts-ignore
import * as pickPort from 'pick-port';
import * as mediasoup from '../';

const IS_WINDOWS = os.platform() === 'win32';

let worker: mediasoup.types.Worker;
let router: mediasoup.types.Router;
let transport: mediasoup.types.PlainTransport;
Expand Down Expand Up @@ -316,6 +319,83 @@ test('router.createPlainTransport() with non bindable IP rejects with Error', as
.toThrow(Error);
}, 2000);

if (!IS_WINDOWS)
{
test('two transports binding to the same IP:port with udpReusePort flag succeed', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: true }
}
});
}).not.toThrow();

transport1?.close();
transport2?.close();
}, 2000);

test('two transports binding to the same IP:port without udpReusePort flag fails', async () =>
{
let transport1: mediasoup.types.PlainTransport | undefined;
let transport2: mediasoup.types.PlainTransport | undefined;

await expect(async () =>
{
const multicastIp = '224.0.0.1';
const port = await pickPort({ ip: multicastIp, reserveTimeout: 0 });

transport1 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});

transport2 = await router.createPlainTransport(
{
listenInfo :
{
protocol : 'udp',
ip : multicastIp,
port : port,
flags : { udpReusePort: false }
}
});
}).rejects.toThrow();

transport1?.close();
transport2?.close();
}, 2000);
}

test('plainTransport.getStats() succeeds', async () =>
{
const data = await transport.getStats();
Expand Down
8 changes: 4 additions & 4 deletions npm-scripts.mjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import process from 'node:process';
import os from 'node:os';
import fs from 'node:fs';
import path from 'node:path';
import * as process from 'node:process';
import * as os from 'node:os';
import * as fs from 'node:fs';
import * as path from 'node:path';
import { execSync } from 'node:child_process';
import fetch from 'node-fetch';
import tar from 'tar';
Expand Down
1 change: 1 addition & 0 deletions rust/benches/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ async fn init() -> (Worker, Router, WebRtcTransport, WebRtcTransport) {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
1 change: 1 addition & 0 deletions rust/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
2 changes: 2 additions & 0 deletions rust/examples/multiopus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
});
Expand Down Expand Up @@ -235,6 +236,7 @@ impl EchoConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}),
Expand Down
1 change: 1 addition & 0 deletions rust/examples/svc-simulcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl SvcSimulcastConnection {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
1 change: 1 addition & 0 deletions rust/examples/videoroom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ mod participant {
ip: IpAddr::V4(Ipv4Addr::LOCALHOST),
announced_ip: None,
port: None,
flags: None,
send_buffer_size: None,
recv_buffer_size: None,
}));
Expand Down
28 changes: 28 additions & 0 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ pub struct ListenInfo {
/// Listening port.
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
/// Socket flags.
#[serde(skip_serializing_if = "Option::is_none")]
pub flags: Option<SocketFlags>,
/// Send buffer size (bytes).
#[serde(skip_serializing_if = "Option::is_none")]
pub send_buffer_size: Option<u32>,
Expand All @@ -80,12 +83,37 @@ impl ListenInfo {
ip: self.ip.to_string(),
announced_ip: self.announced_ip.map(|ip| ip.to_string()),
port: self.port.unwrap_or(0),
flags: Box::new(self.flags.unwrap_or_default().to_fbs()),
send_buffer_size: self.send_buffer_size.unwrap_or(0),
recv_buffer_size: self.recv_buffer_size.unwrap_or(0),
}
}
}

/// UDP/TCP socket flags.
#[derive(
Default, Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize,
)]
#[serde(rename_all = "camelCase")]
pub struct SocketFlags {
/// Disable dual-stack support so only IPv6 is used (only if ip is IPv6).
/// Defaults to false.
pub ipv6_only: bool,
/// Make different transports bind to the same ip and port (only for UDP).
/// Useful for multicast scenarios with plain transport. Use with caution.
/// Defaults to false.
pub udp_reuse_port: bool,
}

impl SocketFlags {
pub(crate) fn to_fbs(self) -> transport::SocketFlags {
transport::SocketFlags {
ipv6_only: self.ipv6_only,
udp_reuse_port: self.udp_reuse_port,
}
}
}

/// ICE role.
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down