Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tiny-rivers-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/rtc-node": patch
---

fix: ensure connection state of room is flipped after disconnecting
41 changes: 34 additions & 7 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { Mutex } from '@livekit/mutex';
import { EncryptionState, type EncryptionType } from '@livekit/rtc-ffi-bindings';
import type { FfiEvent } from '@livekit/rtc-ffi-bindings';
import type { DisconnectReason, OwnedParticipant } from '@livekit/rtc-ffi-bindings';
import { DisconnectReason, type OwnedParticipant } from '@livekit/rtc-ffi-bindings';
import type { DataStream_Trailer, DisconnectCallback } from '@livekit/rtc-ffi-bindings';
import {
type ConnectCallback,
Expand Down Expand Up @@ -100,6 +100,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
// preventing them from leaking when the room goes away.
private disconnectController = new AbortController();

// Guards cleanupOnDisconnect so the ConnectionStateChanged/Disconnected
// events fire exactly once, no matter which path (explicit disconnect()
// vs. FFI 'disconnected' event) wins the race.
private hasCleanedUp = false;

private _token?: string;
private _serverUrl?: string;

Expand Down Expand Up @@ -261,6 +266,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
// Reset the abort controller for this connection session so that
// a previous disconnect doesn't immediately cancel new operations.
this.disconnectController = new AbortController();
this.hasCleanedUp = false;
this.localParticipant = new LocalParticipant(
cb.message.value.localParticipant!,
this.ffiEventLock,
Expand Down Expand Up @@ -309,13 +315,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

this.cleanupOnDisconnect();

this.cleanupOnDisconnect(DisconnectReason.CLIENT_INITIATED);
FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);

this.removeAllListeners();
}

private cleanupOnDisconnect() {
// Runs at most once per connection session. The FFI layer and explicit
// disconnect() both race to get here — whichever wins emits the events,
// the other is a no-op. A reconnect via connect() clears hasCleanedUp.
private cleanupOnDisconnect(reason: DisconnectReason = DisconnectReason.CLIENT_INITIATED) {
if (this.hasCleanedUp) return;
this.hasCleanedUp = true;

// Error all in-progress stream controllers to prevent FD leaks.
// Streams that were receiving data but never got a trailer (e.g. the sender
// disconnected mid-transfer) would otherwise keep their ReadableStream open
Expand Down Expand Up @@ -346,6 +358,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
// This causes any in-flight operations (publishData, publishTrack, etc.)
// to reject and clean up their event listeners.
this.disconnectController.abort();

// Only emit ConnectionStateChanged if the FFI 'connectionStateChanged'
// path didn't already flip us to DISCONNECTED.
if (this.connectionState !== ConnectionState.CONN_DISCONNECTED) {
this.connectionState = ConnectionState.CONN_DISCONNECTED;
this.emit(RoomEvent.ConnectionStateChanged, this.connectionState);
}
this.emit(RoomEvent.Disconnected, reason);
}

/**
Expand Down Expand Up @@ -658,13 +678,20 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.emit(RoomEvent.EncryptionError, new Error('internal server error'));
}
} else if (ev.case == 'connectionStateChanged') {
this.connectionState = ev.value.state!;
const newState = ev.value.state!;
// Skip redundant transitions — cleanupOnDisconnect may have already
// flipped us to DISCONNECTED, and we don't want to emit the event twice.
if (this.connectionState === newState) {
return;
}
this.connectionState = newState;
this.emit(RoomEvent.ConnectionStateChanged, this.connectionState);
/*} else if (ev.case == 'connected') {
this.emit(RoomEvent.Connected);*/
} else if (ev.case == 'disconnected') {
this.cleanupOnDisconnect();
this.emit(RoomEvent.Disconnected, ev.value.reason!);
// cleanupOnDisconnect emits RoomEvent.Disconnected itself (guarded by
// hasCleanedUp so it fires exactly once across both disconnect paths).
this.cleanupOnDisconnect(ev.value.reason!);
} else if (ev.case == 'reconnecting') {
this.emit(RoomEvent.Reconnecting);
} else if (ev.case == 'reconnected') {
Expand Down
16 changes: 16 additions & 0 deletions packages/livekit-rtc/src/tests/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,13 +636,29 @@ describeE2E('livekit-rtc e2e', () => {
{ timeoutMs: 5000, debugName: 'all tracks visible' },
);

// Register listeners before disconnecting so we can verify both
// RoomEvent.Disconnected and RoomEvent.ConnectionStateChanged fire
// for every room, even when disconnects race.
const disconnectedEvents = rooms.map((r) =>
waitForRoomEvent(r, RoomEvent.Disconnected, 3_000, (reason) => reason),
);
const connectionStateEvents = rooms.map((r) =>
waitForRoomEvent(r, RoomEvent.ConnectionStateChanged, 3_000, (state) => state),
);

// Disconnect all participants simultaneously
await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]);

await Promise.all(disconnectedEvents);
const observedStates = await Promise.all(connectionStateEvents);

// Verify all rooms are disconnected and remote participant maps are empty
for (const room of rooms) {
expect(room.isConnected).toBe(false);
}
for (const state of observedStates) {
expect(state).toBe(ConnectionState.CONN_DISCONNECTED);
}
},
testTimeoutMs * 2,
);
Expand Down
Loading