Skip to content

Commit

Permalink
Fixed incorrect state with resume then reconnect (#595)
Browse files Browse the repository at this point in the history
When a resume sequence fails, a full reconnect is attempted. There were
a few issues there with that sequence
- We did not always fire EngineEvent.Restarting, so Room missed tearing down existing participants
- With selective subscriptions, when existing `isDesired` isn't cleared, it will not send a subscribe request when requested
- New tracks were not republished successfully when reconnected (due to sender not being reset early enough)
  • Loading branch information
davidzhao committed Mar 3, 2023
1 parent a9aa74f commit 75776b8
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 48 deletions.
5 changes: 5 additions & 0 deletions .changeset/selfish-bobcats-ring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Fixed incorrect state with resume then reconnect
77 changes: 42 additions & 35 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

private closingLock: Mutex;

private shouldFailNext: boolean = false;

constructor(private options: InternalRoomOptions) {
super();
this.client = new SignalClient();
Expand Down Expand Up @@ -247,7 +249,13 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
});
}

removeTrack(sender: RTCRtpSender) {
/**
* Removes sender from PeerConnection, returning true if it was removed successfully
* and a negotiation is necessary
* @param sender
* @returns
*/
removeTrack(sender: RTCRtpSender): boolean {
if (sender.track && this.pendingTrackResolvers[sender.track.id]) {
const { reject } = this.pendingTrackResolvers[sender.track.id];
if (reject) {
Expand All @@ -257,9 +265,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
try {
this.publisher?.pc.removeTrack(sender);
return true;
} catch (e: unknown) {
log.warn('failed to remove track', { error: e, method: 'removeTrack' });
}
return false;
}

updateMuteStatus(trackSid: string, muted: boolean) {
Expand Down Expand Up @@ -334,7 +344,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

this.handleDisconnect(
'primary peerconnection',
false,
subscriberPrimary
? ReconnectReason.REASON_SUBSCRIBER_FAILED
: ReconnectReason.REASON_PUBLISHER_FAILED,
Expand All @@ -348,7 +357,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (secondaryPC.connectionState === 'failed') {
this.handleDisconnect(
'secondary peerconnection',
false,
subscriberPrimary
? ReconnectReason.REASON_PUBLISHER_FAILED
: ReconnectReason.REASON_SUBSCRIBER_FAILED,
Expand Down Expand Up @@ -419,7 +427,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
};

this.client.onClose = () => {
this.handleDisconnect('signal', false, ReconnectReason.REASON_SIGNAL_DISCONNECTED);
this.handleDisconnect('signal', ReconnectReason.REASON_SIGNAL_DISCONNECTED);
};

this.client.onLeave = (leave?: LeaveRequest) => {
Expand Down Expand Up @@ -690,11 +698,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
// websocket reconnect behavior. if websocket is interrupted, and the PeerConnection
// continues to work, we can reconnect to websocket to continue the session
// after a number of retries, we'll close and give up permanently
private handleDisconnect = (
connection: string,
signalEvents: boolean = false,
disconnectReason?: ReconnectReason,
) => {
private handleDisconnect = (connection: string, disconnectReason?: ReconnectReason) => {
if (this._isClosed) {
return;
}
Expand Down Expand Up @@ -731,12 +735,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

this.clearReconnectTimeout();
this.reconnectTimeout = CriticalTimers.setTimeout(
() => this.attemptReconnect(signalEvents, disconnectReason),
() => this.attemptReconnect(disconnectReason),
delay,
);
};

private async attemptReconnect(signalEvents: boolean = false, reason?: ReconnectReason) {
private async attemptReconnect(reason?: ReconnectReason) {
if (this._isClosed) {
return;
}
Expand All @@ -756,35 +760,26 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
try {
this.attemptingReconnect = true;
if (this.fullReconnectOnNext) {
await this.restartConnection(signalEvents);
await this.restartConnection();
} else {
await this.resumeConnection(signalEvents, reason);
await this.resumeConnection(reason);
}
this.clearPendingReconnect();
this.fullReconnectOnNext = false;
} catch (e) {
this.reconnectAttempts += 1;
let reconnectRequired = false;
let recoverable = true;
let requireSignalEvents = false;
if (e instanceof UnexpectedConnectionState) {
log.debug('received unrecoverable error', { error: e });
// unrecoverable
recoverable = false;
} else if (!(e instanceof SignalReconnectError)) {
// cannot resume
reconnectRequired = true;
}

// when we flip from resume to reconnect
// we need to fire the right reconnecting events
if (reconnectRequired && !this.fullReconnectOnNext) {
this.fullReconnectOnNext = true;
requireSignalEvents = true;
}

if (recoverable) {
this.handleDisconnect('reconnect', requireSignalEvents, ReconnectReason.REASON_UNKOWN);
this.handleDisconnect('reconnect', ReconnectReason.REASON_UNKOWN);
} else {
log.info(
`could not recover connection after ${this.reconnectAttempts} attempts, ${
Expand All @@ -810,16 +805,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
return null;
}

private async restartConnection(emitRestarting: boolean = false) {
private async restartConnection() {
if (!this.url || !this.token) {
// permanent failure, don't attempt reconnection
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
}

log.info(`reconnecting, attempt: ${this.reconnectAttempts}`);
if (emitRestarting || this.reconnectAttempts === 0) {
this.emit(EngineEvent.Restarting);
}
this.emit(EngineEvent.Restarting);

if (this.client.isConnected) {
await this.client.sendLeave();
Expand All @@ -842,17 +835,19 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
throw new SignalReconnectError();
}

if (this.shouldFailNext) {
this.shouldFailNext = false;
throw new Error('simulated failure');
}

await this.waitForPCConnected();
this.client.setReconnected();

// reconnect success
this.emit(EngineEvent.Restarted, joinResponse);
}

private async resumeConnection(
emitResuming: boolean = false,
reason?: ReconnectReason,
): Promise<void> {
private async resumeConnection(reason?: ReconnectReason): Promise<void> {
if (!this.url || !this.token) {
// permanent failure, don't attempt reconnection
throw new UnexpectedConnectionState('could not reconnect, url or token not saved');
Expand All @@ -863,7 +858,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

log.info(`resuming signal connection, attempt ${this.reconnectAttempts}`);
if (emitResuming || this.reconnectAttempts === 0) {
// do not emit for the first attempt, since ICE restart could happen frequently
if (this.reconnectAttempts !== 0) {
this.emit(EngineEvent.Resuming);
}

Expand All @@ -883,6 +879,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
this.emit(EngineEvent.SignalResumed);

if (this.shouldFailNext) {
this.shouldFailNext = false;
throw new Error('simulated failure');
}

this.subscriber.restartingIce = true;

// only restart publisher if it's needed
Expand Down Expand Up @@ -1022,7 +1023,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit

const negotiationTimeout = setTimeout(() => {
reject('negotiation timed out');
this.handleDisconnect('negotiation', false, ReconnectReason.REASON_SIGNAL_DISCONNECTED);
this.handleDisconnect('negotiation', ReconnectReason.REASON_SIGNAL_DISCONNECTED);
}, this.peerConnectionTimeout);

const cleanup = () => {
Expand All @@ -1043,7 +1044,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (e instanceof NegotiationError) {
this.fullReconnectOnNext = true;
}
this.handleDisconnect('negotiation', false, ReconnectReason.REASON_UNKOWN);
this.handleDisconnect('negotiation', ReconnectReason.REASON_UNKOWN);
});
});
}
Expand All @@ -1066,6 +1067,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}
}

/* @internal */
failNext() {
// debugging method to fail the next reconnect/resume attempt
this.shouldFailNext = true;
}

private clearReconnectTimeout() {
if (this.reconnectTimeout) {
CriticalTimers.clearTimeout(this.reconnectTimeout);
Expand All @@ -1081,7 +1088,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
// in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine'
if (this.client.isReconnecting) {
this.clearReconnectTimeout();
this.attemptReconnect(true, ReconnectReason.REASON_SIGNAL_DISCONNECTED);
this.attemptReconnect(ReconnectReason.REASON_SIGNAL_DISCONNECTED);
}
};

Expand Down
12 changes: 11 additions & 1 deletion src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import type { AdaptiveStreamSettings } from './track/types';
import { getNewAudioContext } from './track/utils';
import type { SimulationOptions } from './types';
import {
Future,
createDummyVideoStreamTrack,
Future,
getEmptyAudioStreamTrack,
isWeb,
Mutex,
Expand Down Expand Up @@ -516,6 +516,13 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
},
});
break;
case 'resume-reconnect':
this.engine.failNext();
await this.engine.client.close();
if (this.engine.client.onClose) {
this.engine.client.onClose('simulate resume-reconnect');
}
break;
case 'force-tcp':
case 'force-tls':
req = SimulateScenario.fromPartial({
Expand Down Expand Up @@ -786,6 +793,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
await track.restartTrack();
}
log.debug('publishing new track', {
track: pub.trackSid,
});
await this.localParticipant.publishTrack(track, pub.options);
}
}),
Expand Down
20 changes: 13 additions & 7 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -727,31 +727,34 @@ export default class LocalParticipant extends Participant {
track.stop();
}

let negotiationNeeded = false;
const trackSender = track.sender;
track.sender = undefined;
if (
this.engine.publisher &&
this.engine.publisher.pc.connectionState !== 'closed' &&
track.sender
trackSender
) {
try {
this.engine.removeTrack(track.sender);
if (this.engine.removeTrack(trackSender)) {
negotiationNeeded = true;
}
if (track instanceof LocalVideoTrack) {
for (const [, trackInfo] of track.simulcastCodecs) {
if (trackInfo.sender) {
this.engine.removeTrack(trackInfo.sender);
if (this.engine.removeTrack(trackInfo.sender)) {
negotiationNeeded = true;
}
trackInfo.sender = undefined;
}
}
track.simulcastCodecs.clear();
}
} catch (e) {
log.warn('failed to unpublish track', { error: e, method: 'unpublishTrack' });
} finally {
await this.engine.negotiate();
}
}

track.sender = undefined;

// remove from our maps
this.tracks.delete(publication.trackSid);
switch (publication.kind) {
Expand All @@ -768,6 +771,9 @@ export default class LocalParticipant extends Participant {
this.emit(ParticipantEvent.LocalTrackUnpublished, publication);
publication.setTrack(undefined);

if (negotiationNeeded) {
await this.engine.negotiate();
}
return publication;
}

Expand Down
5 changes: 2 additions & 3 deletions src/room/participant/RemoteParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ export default class RemoteParticipant extends Participant {
}
publication = new RemoteTrackPublication(
kind,
ti.sid,
ti.name,
ti,
this.signalClient.connectOptions?.autoSubscribe,
);
publication.updateInfo(ti);
Expand All @@ -246,7 +245,7 @@ export default class RemoteParticipant extends Participant {
(publishedTrack) => publishedTrack.source === publication?.source,
);
if (existingTrackOfSource && publication.source !== Track.Source.Unknown) {
log.warn(
log.debug(
`received a second track publication for ${this.identity} with the same source: ${publication.source}`,
{
oldTrack: existingTrackOfSource,
Expand Down
5 changes: 3 additions & 2 deletions src/room/track/RemoteTrackPublication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ export default class RemoteTrackPublication extends TrackPublication {

protected fps?: number;

constructor(kind: Track.Kind, id: string, name: string, autoSubscribe: boolean | undefined) {
super(kind, id, name);
constructor(kind: Track.Kind, ti: TrackInfo, autoSubscribe: boolean | undefined) {
super(kind, ti.sid, ti.name);
this.subscribed = autoSubscribe;
this.updateInfo(ti);
}

/**
Expand Down

0 comments on commit 75776b8

Please sign in to comment.