Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twelve-toes-battle.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Use stricter union types for oneof messages
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"build": "rollup --config",
"build:watch": "rollup --watch --config rollup.config.js",
"build-docs": "typedoc",
"proto": "protoc --plugin=node_modules/ts-proto/protoc-gen-ts_proto --ts_proto_opt=esModuleInterop=true --ts_proto_out=./src/proto --ts_proto_opt=outputClientImpl=false,useOptionals=true -I./protocol ./protocol/livekit_rtc.proto ./protocol/livekit_models.proto",
"proto": "protoc --plugin=node_modules/ts-proto/protoc-gen-ts_proto --ts_proto_opt=esModuleInterop=true --ts_proto_out=./src/proto --ts_proto_opt=outputClientImpl=false,useOptionals=messages,oneof=unions -I./protocol ./protocol/livekit_rtc.proto ./protocol/livekit_models.proto",
"sample": "rollup --watch --config rollup.config.dev.js",
"lint": "eslint src",
"test": "jest",
Expand Down Expand Up @@ -72,7 +72,7 @@
"rollup-plugin-terser": "7.0.2",
"rollup-plugin-typescript2": "0.32.1",
"ts-jest": "28.0.7",
"ts-proto": "1.121.0",
"ts-proto": "1.121.1",
"typedoc": "0.23.9",
"typedoc-plugin-no-inherit": "1.3.1",
"typescript": "4.7.4"
Expand Down
104 changes: 67 additions & 37 deletions src/api/SignalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'webrtc-adapter';
import log from '../logger';
import {
ClientInfo,
DisconnectReason,
ParticipantInfo,
Room,
SpeakerInfo,
Expand Down Expand Up @@ -51,7 +52,11 @@ export interface SignalOptions {
adaptiveStream?: boolean;
}

const passThroughQueueSignals: Array<keyof SignalRequest> = [
type SignalMessage = SignalRequest['message'];

type SignalKind = NonNullable<SignalMessage>['$case'];

const passThroughQueueSignals: Array<SignalKind> = [
'syncState',
'trickle',
'offer',
Expand All @@ -60,10 +65,8 @@ const passThroughQueueSignals: Array<keyof SignalRequest> = [
'leave',
];

function canPassThroughQueue(req: SignalRequest): boolean {
const canPass =
Object.keys(req).find((key) => passThroughQueueSignals.includes(key as keyof SignalRequest)) !==
undefined;
function canPassThroughQueue(req: SignalMessage): boolean {
const canPass = passThroughQueueSignals.includes(req!.$case);
log.trace('request allowed to bypass queue:', { canPass, req });
return canPass;
}
Expand Down Expand Up @@ -231,10 +234,10 @@ export class SignalClient {

if (!this.isConnected) {
// handle join message only
if (msg.join) {
if (msg.message?.$case === 'join') {
this.isConnected = true;
abortSignal?.removeEventListener('abort', abortHandler);
resolve(msg.join);
resolve(msg.message.join);
} else {
reject(new ConnectionError('did not receive join response'));
}
Expand Down Expand Up @@ -271,6 +274,7 @@ export class SignalClient {
sendOffer(offer: RTCSessionDescriptionInit) {
log.debug('sending offer', offer);
this.sendRequest({
$case: 'offer',
offer: toProtoSessionDescription(offer),
});
}
Expand All @@ -279,13 +283,15 @@ export class SignalClient {
sendAnswer(answer: RTCSessionDescriptionInit) {
log.debug('sending answer');
this.sendRequest({
$case: 'answer',
answer: toProtoSessionDescription(answer),
});
}

sendIceCandidate(candidate: RTCIceCandidateInit, target: SignalTarget) {
log.trace('sending ice candidate', candidate);
this.sendRequest({
$case: 'trickle',
trickle: {
candidateInit: JSON.stringify(candidate),
target,
Expand All @@ -295,6 +301,7 @@ export class SignalClient {

sendMuteTrack(trackSid: string, muted: boolean) {
this.sendRequest({
$case: 'mute',
mute: {
sid: trackSid,
muted,
Expand All @@ -304,24 +311,35 @@ export class SignalClient {

sendAddTrack(req: AddTrackRequest): void {
this.sendRequest({
$case: 'addTrack',
addTrack: AddTrackRequest.fromPartial(req),
});
}

sendUpdateTrackSettings(settings: UpdateTrackSettings) {
this.sendRequest({ trackSetting: settings });
this.sendRequest({
$case: 'trackSetting',
trackSetting: settings,
});
}

sendUpdateSubscription(sub: UpdateSubscription) {
this.sendRequest({ subscription: sub });
this.sendRequest({
$case: 'subscription',
subscription: sub,
});
}

sendSyncState(sync: SyncState) {
this.sendRequest({ syncState: sync });
this.sendRequest({
$case: 'syncState',
syncState: sync,
});
}

sendUpdateVideoLayers(trackSid: string, layers: VideoLayer[]) {
this.sendRequest({
$case: 'updateLayers',
updateLayers: {
trackSid,
layers,
Expand All @@ -331,6 +349,7 @@ export class SignalClient {

sendUpdateSubscriptionPermissions(allParticipants: boolean, trackPermissions: TrackPermission[]) {
this.sendRequest({
$case: 'subscriptionPermission',
subscriptionPermission: {
allParticipants,
trackPermissions,
Expand All @@ -340,21 +359,28 @@ export class SignalClient {

sendSimulateScenario(scenario: SimulateScenario) {
this.sendRequest({
$case: 'simulate',
simulate: scenario,
});
}

async sendLeave() {
await this.sendRequest(SignalRequest.fromPartial({ leave: {} }));
await this.sendRequest({
$case: 'leave',
leave: {
canReconnect: false,
reason: DisconnectReason.CLIENT_INITIATED,
},
});
}

async sendRequest(req: SignalRequest, fromQueue: boolean = false) {
async sendRequest(message: SignalMessage, fromQueue: boolean = false) {
// capture all requests while reconnecting and put them in a queue
// unless the request originates from the queue, then don't enqueue again
const canQueue = !fromQueue && !canPassThroughQueue(req);
const canQueue = !fromQueue && !canPassThroughQueue(message);
if (canQueue && this.isReconnecting) {
this.queuedRequests.push(async () => {
await this.sendRequest(req, true);
await this.sendRequest(message, true);
});
return;
}
Expand All @@ -370,6 +396,9 @@ export class SignalClient {
return;
}

const req = {
message,
};
try {
if (this.useJSON) {
this.ws.send(JSON.stringify(SignalRequest.toJSON(req)));
Expand All @@ -381,67 +410,68 @@ export class SignalClient {
}
}

private handleSignalResponse(msg: SignalResponse) {
if (msg.answer) {
private handleSignalResponse(res: SignalResponse) {
const msg = res.message!;
if (msg.$case === 'answer') {
const sd = fromProtoSessionDescription(msg.answer);
if (this.onAnswer) {
this.onAnswer(sd);
}
} else if (msg.offer) {
} else if (msg.$case === 'offer') {
const sd = fromProtoSessionDescription(msg.offer);
if (this.onOffer) {
this.onOffer(sd);
}
} else if (msg.trickle) {
const candidate: RTCIceCandidateInit = JSON.parse(msg.trickle.candidateInit);
} else if (msg.$case === 'trickle') {
const candidate: RTCIceCandidateInit = JSON.parse(msg.trickle.candidateInit!);
if (this.onTrickle) {
this.onTrickle(candidate, msg.trickle.target);
}
} else if (msg.update) {
} else if (msg.$case === 'update') {
if (this.onParticipantUpdate) {
this.onParticipantUpdate(msg.update.participants);
this.onParticipantUpdate(msg.update.participants ?? []);
}
} else if (msg.trackPublished) {
} else if (msg.$case === 'trackPublished') {
if (this.onLocalTrackPublished) {
this.onLocalTrackPublished(msg.trackPublished);
}
} else if (msg.speakersChanged) {
} else if (msg.$case === 'speakersChanged') {
if (this.onSpeakersChanged) {
this.onSpeakersChanged(msg.speakersChanged.speakers);
this.onSpeakersChanged(msg.speakersChanged.speakers ?? []);
}
} else if (msg.leave) {
} else if (msg.$case === 'leave') {
if (this.onLeave) {
this.onLeave(msg.leave);
}
} else if (msg.mute) {
} else if (msg.$case === 'mute') {
if (this.onRemoteMuteChanged) {
this.onRemoteMuteChanged(msg.mute.sid, msg.mute.muted);
}
} else if (msg.roomUpdate) {
if (this.onRoomUpdate) {
this.onRoomUpdate(msg.roomUpdate.room!);
} else if (msg.$case === 'roomUpdate') {
if (this.onRoomUpdate && msg.roomUpdate.room) {
this.onRoomUpdate(msg.roomUpdate.room);
}
} else if (msg.connectionQuality) {
} else if (msg.$case === 'connectionQuality') {
if (this.onConnectionQuality) {
this.onConnectionQuality(msg.connectionQuality);
}
} else if (msg.streamStateUpdate) {
} else if (msg.$case === 'streamStateUpdate') {
if (this.onStreamStateUpdate) {
this.onStreamStateUpdate(msg.streamStateUpdate);
}
} else if (msg.subscribedQualityUpdate) {
} else if (msg.$case === 'subscribedQualityUpdate') {
if (this.onSubscribedQualityUpdate) {
this.onSubscribedQualityUpdate(msg.subscribedQualityUpdate);
}
} else if (msg.subscriptionPermissionUpdate) {
} else if (msg.$case === 'subscriptionPermissionUpdate') {
if (this.onSubscriptionPermissionUpdate) {
this.onSubscriptionPermissionUpdate(msg.subscriptionPermissionUpdate);
}
} else if (msg.refreshToken) {
} else if (msg.$case === 'refreshToken') {
if (this.onTokenRefresh) {
this.onTokenRefresh(msg.refreshToken);
}
} else if (msg.trackUnpublished) {
} else if (msg.$case === 'trackUnpublished') {
if (this.onLocalTrackUnpublished) {
this.onLocalTrackUnpublished(msg.trackUnpublished);
}
Expand Down Expand Up @@ -507,8 +537,8 @@ function createConnectionParams(token: string, info: ClientInfo, opts?: ConnectO

// ClientInfo
params.set('sdk', 'js');
params.set('version', info.version);
params.set('protocol', info.protocol.toString());
params.set('version', info.version!);
params.set('protocol', info.protocol!.toString());
if (info.deviceModel) {
params.set('device_model', info.deviceModel);
}
Expand Down
4 changes: 3 additions & 1 deletion src/proto/google/protobuf/timestamp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable */
import Long from 'long';
import * as _m0 from 'protobufjs/minimal';
import _m0 from 'protobufjs/minimal';

export const protobufPackage = 'google.protobuf';

Expand Down Expand Up @@ -190,6 +190,8 @@ export type DeepPartial<T> = T extends Builtin
? Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U>
? ReadonlyArray<DeepPartial<U>>
: T extends { $case: string }
? { [K in keyof Omit<T, '$case'>]?: DeepPartial<T[K]> } & { $case: T['$case'] }
: T extends {}
? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
Expand Down
Loading