Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/olive-clouds-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

feat: add support for SimulateScenario
5 changes: 3 additions & 2 deletions packages/livekit-rtc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"@datastructures-js/deque": "1.0.8",
"@livekit/mutex": "^1.0.0",
"@livekit/typed-emitter": "^3.0.0",
"@livekit/rtc-ffi-bindings": "0.12.53",
"@livekit/rtc-ffi-bindings": "0.12.54",
"pino": "^9.0.0",
"pino-pretty": "^13.0.0"
},
Expand All @@ -53,6 +53,7 @@
"prebuild": "node -p \"'export const SDK_VERSION = ' + JSON.stringify(require('./package.json').version) + ';'\" > src/version.ts",
"build": "pnpm prebuild && tsup --onSuccess \"tsc --declaration --emitDeclarationOnly\"",
"lint": "eslint -f unix \"src/**/*.ts\" --ignore-pattern \"src/proto/*\"",
"test": "vitest run src"
"test": "vitest run src",
"test:e2e": "node scripts/run-e2e.mjs"
}
}
97 changes: 97 additions & 0 deletions packages/livekit-rtc/scripts/run-e2e.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0

// Spins up `livekit-server --dev` with a known dev key, runs the e2e
// vitest suite against it, then tears the server down on exit.
//
// Requires `livekit-server` on PATH.
import { spawn } from 'node:child_process';
import net from 'node:net';
import { setTimeout as delay } from 'node:timers/promises';

const KEYS = 'devkey: secret';
const HOST = '127.0.0.1';
const PORT = 7880;
const URL = `ws://${HOST}:${PORT}`;
const API_KEY = 'devkey';
const API_SECRET = 'secret';

async function tcpReady(host, port, timeoutMs) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const ok = await new Promise((resolve) => {
const s = net.createConnection({ host, port });
s.once('connect', () => {
s.end();
resolve(true);
});
s.once('error', () => resolve(false));
});
if (ok) return;
await delay(200);
}
throw new Error(`livekit-server not reachable at ${host}:${port} within ${timeoutMs}ms`);
}

async function isPortOpen(host, port) {
return new Promise((resolve) => {
const s = net.createConnection({ host, port });
s.once('connect', () => {
s.end();
resolve(true);
});
s.once('error', () => resolve(false));
});
}

const reuseExisting = await isPortOpen(HOST, PORT);
let server;
let serverExited = !reuseExisting ? false : true;
if (reuseExisting) {
console.log(`[run-e2e] reusing existing livekit-server on ${HOST}:${PORT}`);
} else {
server = spawn('livekit-server', ['--dev'], {
env: { ...process.env, LIVEKIT_KEYS: KEYS },
stdio: ['ignore', 'inherit', 'inherit'],
});
server.on('exit', (code, signal) => {
serverExited = true;
if (code && code !== 0 && signal !== 'SIGTERM') {
console.error(`livekit-server exited unexpectedly: code=${code} signal=${signal}`);
}
});
}

let testProc;
const stopServer = () => {
if (server && !serverExited) server.kill('SIGTERM');
};
const onSignal = (sig) => {
if (testProc && !testProc.killed) testProc.kill(sig);
stopServer();
};
process.on('SIGINT', () => onSignal('SIGINT'));
process.on('SIGTERM', () => onSignal('SIGTERM'));

try {
await tcpReady(HOST, PORT, 15_000);

const args = ['exec', 'vitest', 'run', 'src/tests/e2e.test.ts', ...process.argv.slice(2)];
testProc = spawn('pnpm', args, {
env: {
...process.env,
LIVEKIT_URL: URL,
LIVEKIT_API_KEY: API_KEY,
LIVEKIT_API_SECRET: API_SECRET,
},
stdio: 'inherit',
});
const code = await new Promise((resolve) => testProc.on('exit', resolve));
process.exitCode = code ?? 0;
} catch (err) {
console.error(err);
process.exitCode = 1;
} finally {
stopServer();
}
7 changes: 6 additions & 1 deletion packages/livekit-rtc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ export {
IceTransportType,
TrackPublishOptions,
} from '@livekit/rtc-ffi-bindings';
export { StreamState, TrackKind, TrackSource } from '@livekit/rtc-ffi-bindings';
export {
SimulateScenarioKind,
StreamState,
TrackKind,
TrackSource,
} from '@livekit/rtc-ffi-bindings';
export { VideoBufferType, VideoCodec, VideoRotation } from '@livekit/rtc-ffi-bindings';
export { ConnectError, Room, RoomEvent, type RoomOptions, type RtcConfiguration } from './room.js';
export { RpcError, type PerformRpcParams, type RpcInvocationData } from './rpc.js';
Expand Down
33 changes: 33 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import {
type IceServer,
IceTransportType,
type RoomInfo,
type SimulateScenarioCallback,
type SimulateScenarioKind,
type SimulateScenarioResponse,
} from '@livekit/rtc-ffi-bindings';
import { TrackKind } from '@livekit/rtc-ffi-bindings';
import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter';
Expand Down Expand Up @@ -325,6 +328,36 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.removeAllListeners();
}

/**
* Trigger a reconnection / chaos scenario for testing. Most useful in
* tests to deterministically force a Resume (signal-only reconnect that
* preserves the PeerConnection and existing publications) or a full
* reconnect (the SDK rebuilds the RtcSession and re-publishes existing
* local tracks; `RoomEvent.Reconnected` fires).
*/
async simulateScenario(scenario: SimulateScenarioKind): Promise<void> {
if (!this.isConnected || !this.ffiHandle) {
throw new Error('simulateScenario requires a connected room');
}
const res = FfiClient.instance.request<SimulateScenarioResponse>({
message: {
case: 'simulateScenario',
value: {
roomHandle: this.ffiHandle.handle,
scenario,
},
},
});
const cb = await FfiClient.instance.waitFor<SimulateScenarioCallback>(
(ev: FfiEvent) =>
ev.message.case === 'simulateScenario' && ev.message.value.asyncId === res.asyncId,
{ signal: this.disconnectController.signal },
);
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
if (cb.error) {
throw new Error(`simulateScenario failed: ${cb.error}`);
}
}

private updateConnectionState(newState: ConnectionState) {
if (this._connectionState === newState) {
return;
Expand Down
153 changes: 153 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import {
Room,
RoomEvent,
RpcError,
SimulateScenarioKind,
TrackKind,
TrackPublishOptions,
TrackSource,
dispose,
Expand Down Expand Up @@ -682,4 +684,155 @@ describeE2E('livekit-rtc e2e', () => {
},
testTimeoutMs,
);

// -- Reconnect scenarios --
//
// Both tests verify the user-visible behavior: after the scenario fires,
// the subscriber continues to receive the publisher's tone. The full
// reconnect test additionally asserts there is exactly one audio
// publication on each side (regression: duplicate-publish bug).

const runReconnectScenario = async (scenario: SimulateScenarioKind) => {
const { rooms } = await connectTestRooms(2);
const [subRoom, pubRoom] = rooms;

const pubRateHz = 48_000;
const source = new AudioSource(pubRateHz, 1);
const track = LocalAudioTrack.createAudioTrack('reconnect_tone', source);
const opts = new TrackPublishOptions();
opts.source = TrackSource.SOURCE_MICROPHONE;
await pubRoom!.localParticipant!.publishTrack(track, opts);

let tonePhase = 0;
const samplesPer10ms = Math.floor(pubRateHz / 100);
const amplitude = 0.8 * 32767;
const sineHz = 60;
let toneRunning = true;
const toneTask = (async () => {
while (toneRunning) {
const frame = AudioFrame.create(pubRateHz, 1, samplesPer10ms);
for (let s = 0; s < samplesPer10ms; s++) {
frame.data[s] = Math.round(
amplitude * Math.sin((2 * Math.PI * sineHz * tonePhase) / pubRateHz),
);
tonePhase++;
}
await source.captureFrame(frame);
}
})();

// Subscriber-side: re-attach an AudioStream every time TrackSubscribed
// fires (a full reconnect may issue TrackUnsubscribed → TrackSubscribed
// with a fresh remote track).
const sub = {
lastFrameAt: 0,
collectFromMs: Number.POSITIVE_INFINITY,
collected: [] as Int16Array[],
readers: [] as ReturnType<AudioStream['getReader']>[],
};
const attach = (remoteTrack: unknown) => {
const stream = new AudioStream(remoteTrack as any, {
sampleRate: pubRateHz,
numChannels: 1,
});
const reader = stream.getReader();
sub.readers.push(reader);
(async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
sub.lastFrameAt = Date.now();
if (sub.lastFrameAt >= sub.collectFromMs) {
sub.collected.push(channelSamples(value, 0));
}
}
} catch {
// reader released
}
})();
};
subRoom!.on(RoomEvent.TrackSubscribed, (t) => attach(t));

try {
await waitFor(() => sub.lastFrameAt > 0 && Date.now() - sub.lastFrameAt < 500, {
timeoutMs: 10_000,
debugName: 'initial audio flow',
});

const simulateAt = Date.now();
await pubRoom!.simulateScenario(scenario);

// Wait for audio to actually flow again post-simulate: a frame
// received well after the simulate AND a fresh latest-frame timestamp.
await waitFor(
() => sub.lastFrameAt >= simulateAt + 500 && Date.now() - sub.lastFrameAt < 300,
{ timeoutMs: 30_000, debugName: 'audio re-established after simulate' },
);
// Drain post-recovery buffer/jitter, then collect a 2s window of
// steady-state samples for tone detection.
await delay(1_500);
sub.collected.length = 0;
sub.collectFromMs = Date.now();
await waitFor(() => sub.collected.reduce((a, s) => a + s.length, 0) >= pubRateHz * 2, {
timeoutMs: 15_000,
debugName: 'post-simulate audio sampling',
});

const totalLen = sub.collected.reduce((a, s) => a + s.length, 0);
const concat = new Int16Array(totalLen);
let off = 0;
for (const s of sub.collected) {
concat.set(s, off);
off += s.length;
}
const detected = estimateFreqHz(concat, pubRateHz);
expect(Math.abs(detected - sineHz)).toBeLessThan(20);

return { rooms, subRoom: subRoom!, pubRoom: pubRoom! };
} finally {
toneRunning = false;
await toneTask;
for (const r of sub.readers) {
try {
r.releaseLock();
} catch {
// ignore
}
}
await track.close();
}
};

itRaw(
'resume keeps audio flowing on the subscriber side',
async () => {
const { rooms } = await runReconnectScenario(SimulateScenarioKind.SIMULATE_SIGNAL_RECONNECT);
await Promise.all(rooms.map((r) => r.disconnect()));
},
testTimeoutMs * 4,
);

itRaw(
'full reconnect keeps audio flowing and ends with one publication on the subscriber',
async () => {
const { rooms, subRoom, pubRoom } = await runReconnectScenario(
SimulateScenarioKind.SIMULATE_FULL_RECONNECT,
);

try {
// Regression: subscriber must see exactly ONE audio publication after
// recovery — not duplicates from the auto-republish path.
const subscriberAudioPubs = Array.from(
subRoom.remoteParticipants
.get(pubRoom.localParticipant!.identity)!
.trackPublications.values(),
).filter((p) => p.kind === TrackKind.KIND_AUDIO);
expect(subscriberAudioPubs.length).toBe(1);
} finally {
await Promise.all(rooms.map((r) => r.disconnect()));
}
},
testTimeoutMs * 4,
);
});
4 changes: 2 additions & 2 deletions packages/livekit-server-sdk/src/SipClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
Pagination,
RoomConfiguration,
SIPHeaderOptions,
SIPMediaEncryption,
SIPOutboundConfig,
} from '@livekit/protocol';
import {
CreateSIPDispatchRuleRequest,
Expand All @@ -29,8 +31,6 @@ import {
SIPDispatchRuleIndividual,
SIPDispatchRuleInfo,
SIPInboundTrunkInfo,
SIPMediaEncryption,
SIPOutboundConfig,
SIPOutboundTrunkInfo,
SIPParticipantInfo,
SIPTransport,
Expand Down
Loading
Loading