Skip to content

Commit

Permalink
Remove Promises dependency for internal completers (livekit#267)
Browse files Browse the repository at this point in the history
* AsyncCompleter

* remove Completer

* clean up default timeouts

* fix logic

* refactor error
  • Loading branch information
hiroshihorie committed Oct 30, 2023
1 parent e94fa5b commit 35d4932
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 155 deletions.
12 changes: 5 additions & 7 deletions Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal class DataChannelPair: NSObject, Loggable {
public let target: Livekit_SignalTarget
public var onDataPacket: OnDataPacket?

public private(set) var openCompleter = Promise<Void>.pending()
public private(set) var openCompleter = AsyncCompleter<Void>(label: "Data channel open", timeOut: .defaultPublisherDataChannelOpen)

// MARK: - Private

Expand Down Expand Up @@ -59,7 +59,7 @@ internal class DataChannelPair: NSObject, Loggable {
channel?.delegate = self

if isOpen {
openCompleter.fulfill(())
openCompleter.resume(returning: ())
}
}

Expand All @@ -68,7 +68,7 @@ internal class DataChannelPair: NSObject, Loggable {
channel?.delegate = self

if isOpen {
openCompleter.fulfill(())
openCompleter.resume(returning: ())
}
}

Expand All @@ -80,9 +80,7 @@ internal class DataChannelPair: NSObject, Loggable {
_reliableChannel = nil
_lossyChannel = nil

// reset completer
openCompleter.reject(InternalError.state(message: "openCompleter did not complete"))
openCompleter = Promise<Void>.pending()
openCompleter.cancel()

// execute on .webRTC queue
DispatchQueue.liveKitWebRTC.sync {
Expand Down Expand Up @@ -136,7 +134,7 @@ extension DataChannelPair: LKRTCDataChannelDelegate {
func dataChannelDidChangeState(_ dataChannel: LKRTCDataChannel) {

if isOpen {
openCompleter.fulfill(())
openCompleter.resume(returning: ())
}
}

Expand Down
8 changes: 4 additions & 4 deletions Sources/LiveKit/Core/Engine+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ extension Engine: TransportDelegate {
log("target: \(transport.target), state: \(pcState)")

// primary connected
if transport.primary {
_state.mutate { $0.primaryTransportConnectedCompleter.set(value: .connected == pcState ? true : nil) }
if transport.primary, case .connected = pcState {
primaryTransportConnectedCompleter.resume(returning: ())
}

// publisher connected
if case .publisher = transport.target {
_state.mutate { $0.publisherTransportConnectedCompleter.set(value: .connected == pcState ? true : nil) }
if case .publisher = transport.target, case .connected = pcState {
publisherTransportConnectedCompleter.resume(returning: ())
}

if _state.connectionState.isConnected {
Expand Down
37 changes: 9 additions & 28 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ internal class Engine: MulticastDelegate<EngineDelegate> {
var connectionState: ConnectionState = .disconnected()
var connectStopwatch = Stopwatch(label: "connect")
var hasPublished: Bool = false
var primaryTransportConnectedCompleter = Completer<Bool>()
var publisherTransportConnectedCompleter = Completer<Bool>()
}

internal let primaryTransportConnectedCompleter = AsyncCompleter<Void>(label: "Primary transport connect", timeOut: .defaultTransportState)
internal let publisherTransportConnectedCompleter = AsyncCompleter<Void>(label: "Publisher transport connect", timeOut: .defaultTransportState)

public var _state: StateSync<State>

public let signalClient = SignalClient()
Expand Down Expand Up @@ -243,19 +244,8 @@ internal class Engine: MulticastDelegate<EngineDelegate> {
publisherShouldNegotiate()
}

let publisherConnectCompleter = _state.mutate {
$0.publisherTransportConnectedCompleter.wait(on: queue,
.defaultTransportState,
throw: { TransportError.timedOut(message: "publisher didn't connect") })
}

return publisherConnectCompleter.then(on: queue) { _ -> Promise<Void> in
self.log("send data: publisher connected...")
// wait for publisherDC to open
return self.publisherDC.openCompleter
}.timeout(.defaultPublisherDataChannelOpen) {
// this should not happen since .wait has its own timeouts
InternalError.state(message: "ensurePublisherConnected() did not complete")
return publisherTransportConnectedCompleter.waitPromise().then(on: queue) { _ in
self.publisherDC.openCompleter.waitPromise()
}
}

Expand Down Expand Up @@ -400,9 +390,7 @@ internal extension Engine {
adaptiveStream: room._state.options.adaptiveStream)
.then(on: queue) {
// wait for joinResponse
self.signalClient._state.mutate { $0.joinResponseCompleter.wait(on: self.queue,
.defaultJoinResponse,
throw: { SignalClientError.timedOut(message: "failed to receive join response") }) }
self.signalClient.joinResponseCompleter.waitPromise()
}.then(on: queue) { _ in
self._state.mutate { $0.connectStopwatch.split(label: "signal") }
}.then(on: queue) { jr in
Expand All @@ -419,9 +407,7 @@ internal extension Engine {
}.then(on: queue) {
self.signalClient.resumeResponseQueue()
}.then(on: queue) {
self._state.mutate { $0.primaryTransportConnectedCompleter.wait(on: self.queue,
.defaultTransportState,
throw: { TransportError.timedOut(message: "primary transport didn't connect") }) }
self.primaryTransportConnectedCompleter.waitPromise()
}.then(on: queue) { _ -> Void in
self._state.mutate { $0.connectStopwatch.split(label: "engine") }
self.log("\(self._state.connectStopwatch)")
Expand Down Expand Up @@ -459,12 +445,9 @@ internal extension Engine {
connectOptions: _state.connectOptions,
reconnectMode: _state.reconnectMode,
adaptiveStream: room._state.options.adaptiveStream).then(on: queue) {

self.log("[reconnect] waiting for socket to connect...")
// Wait for primary transport to connect (if not already)
self._state.mutate { $0.primaryTransportConnectedCompleter.wait(on: self.queue,
.defaultTransportState,
throw: { TransportError.timedOut(message: "primary transport didn't connect") }) }
return self.primaryTransportConnectedCompleter.waitPromise()
}.then(on: queue) { _ in
// send SyncState before offer
self.sendSyncState()
Expand All @@ -480,9 +463,7 @@ internal extension Engine {
self.log("[reconnect] waiting for publisher to connect...")

return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) {
self._state.mutate { $0.publisherTransportConnectedCompleter.wait(on: self.queue,
.defaultTransportState,
throw: { TransportError.timedOut(message: "publisher transport didn't connect") }) }.then { _ in }
self.publisherTransportConnectedCompleter.waitPromise()
}

}.then(on: queue) { () -> Promise<Void> in
Expand Down
18 changes: 3 additions & 15 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ internal extension Room {

// start Engine cleanUp sequence

engine._state.mutate {
$0.primaryTransportConnectedCompleter.reset()
$0.publisherTransportConnectedCompleter.reset()
engine.primaryTransportConnectedCompleter.cancel()
engine.publisherTransportConnectedCompleter.cancel()

engine._state.mutate {
// if isFullReconnect, keep connection related states
$0 = isFullReconnect ? Engine.State(
connectOptions: $0.connectOptions,
Expand Down Expand Up @@ -353,18 +353,6 @@ extension Room {
public func sendSimulate(scenario: SimulateScenario) -> Promise<Void> {
engine.signalClient.sendSimulate(scenario: scenario)
}

public func waitForPrimaryTransportConnect() -> Promise<Bool> {
engine._state.mutate {
$0.primaryTransportConnectedCompleter.wait(on: queue, .defaultTransportState, throw: { TransportError.timedOut(message: "primary transport didn't connect") })
}
}

public func waitForPublisherTransportConnect() -> Promise<Bool> {
engine._state.mutate {
$0.publisherTransportConnectedCompleter.wait(on: queue, .defaultTransportState, throw: { TransportError.timedOut(message: "publisher transport didn't connect") })
}
}
}

// MARK: - Session Migration
Expand Down
32 changes: 16 additions & 16 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

// MARK: - Internal

internal let joinResponseCompleter = AsyncCompleter<Livekit_JoinResponse>(label: "Join response", timeOut: .defaultJoinResponse)
internal var completersForAddTrack = [String: AsyncCompleter<Livekit_TrackInfo>]()

internal struct State: ReconnectableState, Equatable {
var reconnectMode: ReconnectMode?
var connectionState: ConnectionState = .disconnected()
var joinResponseCompleter = Completer<Livekit_JoinResponse>()
var completersForAddTrack = [String: Completer<Livekit_TrackInfo>]()
}

internal var _state = StateSync(State())
Expand Down Expand Up @@ -185,11 +186,12 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
latestJoinResponse = nil

_state.mutate {
for var completer in $0.completersForAddTrack.values {
completer.reset()

for completer in completersForAddTrack.values {
completer.cancel()
}

$0.joinResponseCompleter.reset()
joinResponseCompleter.cancel()

// reset state
$0 = State()
Expand All @@ -209,28 +211,26 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

func completeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) {

_state.mutate {
if var completer = $0.completersForAddTrack[trackCid] {
_state.mutate { _ in
if let completer = completersForAddTrack[trackCid] {
log("[publish] found the completer resolving...")
completer.set(value: trackInfo)
completer.resume(returning: trackInfo)
}
}
}

func prepareCompleter(forAddTrackRequest trackCid: String) -> Promise<Livekit_TrackInfo> {

_state.mutate { state -> Promise<Livekit_TrackInfo> in
_state.mutate { _ -> Promise<Livekit_TrackInfo> in

if state.completersForAddTrack.keys.contains(trackCid) {
if completersForAddTrack.keys.contains(trackCid) {
// reset if already exists
state.completersForAddTrack[trackCid]!.reset()
completersForAddTrack[trackCid]!.cancel()
} else {
state.completersForAddTrack[trackCid] = Completer<Livekit_TrackInfo>()
completersForAddTrack[trackCid] = AsyncCompleter<Livekit_TrackInfo>(label: "Add track: \(trackCid)", timeOut: .defaultPublish)
}

return state.completersForAddTrack[trackCid]!.wait(on: queue,
.defaultPublish,
throw: { EngineError.timedOut(message: "server didn't respond to addTrack request") })
return completersForAddTrack[trackCid]!.waitPromise()
}
}
}
Expand Down Expand Up @@ -316,7 +316,7 @@ private extension SignalClient {
latestJoinResponse = joinResponse
restartPingTimer()
notify { $0.signalClient(self, didReceive: joinResponse) }
_state.mutate { $0.joinResponseCompleter.set(value: joinResponse) }
joinResponseCompleter.resume(returning: joinResponse)

case .answer(let sd):
notify { $0.signalClient(self, didReceiveAnswer: sd.toRTCType()) }
Expand Down
17 changes: 11 additions & 6 deletions Sources/LiveKit/Extensions/TimeInterval.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ import Foundation

/// Default timeout `TimeInterval`s used throughout the SDK.
public extension TimeInterval {
static let defaultCaptureStart: Self = 5
static let defaultConnectivity: Self = 10
static let defaultPublish: Self = 10
static let defaultReconnectAttemptDelay: Self = 2
// the following 3 timeouts are used for a typical connect sequence
static let defaultSocketConnect: Self = 10
static let defaultJoinResponse: Self = 7
static let defaultTransportState: Self = 10
// used for validation mode
static let defaultHTTPConnect: Self = 5
static let defaultPublisherDataChannelOpen: Self = 7
}

public extension DispatchTimeInterval {
static let defaultCaptureStart: Self = .seconds(5)
static let defaultConnectivity: Self = .seconds(10)
static let defaultPublish: Self = .seconds(10)
// the following 3 timeouts are used for a typical connect sequence
static let defaultJoinResponse: Self = .seconds(7)
static let defaultTransportState: Self = .seconds(10)
// used for validation mode
static let defaultPublisherDataChannelOpen: Self = .seconds(7)
}
4 changes: 1 addition & 3 deletions Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public class LocalParticipant: Participant {
self.log("[publish] waiting for dimensions to resolve...")

// wait for dimensions
return track.capturer._state.mutate { $0.dimensionsCompleter.wait(on: self.queue,
.defaultCaptureStart,
throw: { TrackError.timedOut(message: "unable to resolve dimensions") }) }.then(on: self.queue) { $0 }
return track.capturer.dimensionsCompleter.waitPromise()

}.then(on: queue) { dimensions -> Promise<(result: LKRTCRtpTransceiverInit, trackInfo: Livekit_TrackInfo)> in
// request a new track to the server
Expand Down
Loading

0 comments on commit 35d4932

Please sign in to comment.