Skip to content

Commit

Permalink
Migrate SignalClient to async/await (livekit#268)
Browse files Browse the repository at this point in the history
* improve http request helper

* impl

* impl
  • Loading branch information
hiroshihorie committed Oct 31, 2023
1 parent 0e06a60 commit 33b32f8
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 329 deletions.
2 changes: 1 addition & 1 deletion Sources/LiveKit/Core/Engine+SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ extension Engine: SignalClientDelegate {
}.then(on: queue) { answer in
subscriber.setLocalDescription(answer)
}.then(on: queue) { answer in
self.signalClient.sendAnswer(answer: answer)
promise(from: signalClient.sendAnswer, param1: answer)
}.then(on: queue) {
self.log("answer sent to signal")
}.catch(on: queue) { error in
Expand Down
7 changes: 3 additions & 4 deletions Sources/LiveKit/Core/Engine+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ extension Engine: TransportDelegate {

func transport(_ transport: Transport, didGenerate iceCandidate: LKRTCIceCandidate) {
log("didGenerate iceCandidate")
signalClient.sendCandidate(candidate: iceCandidate,
target: transport.target).catch(on: queue) { error in
self.log("Failed to send candidate, error: \(error)", .error)
}
Task {
try await signalClient.sendCandidate(candidate: iceCandidate, target: transport.target)
}
}

func transport(_ transport: Transport, didAddTrack track: LKRTCMediaStreamTrack, rtpReceiver: LKRTCRtpReceiver, streams: [LKRTCMediaStream]) {
Expand Down
99 changes: 51 additions & 48 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ internal class Engine: MulticastDelegate<EngineDelegate> {
publisherShouldNegotiate()
}

return publisherTransportConnectedCompleter.waitPromise().then(on: queue) { _ in
self.publisherDC.openCompleter.waitPromise()
return promise(from: publisherTransportConnectedCompleter.wait).then(on: queue) { _ in
promise(from: self.publisherDC.openCompleter.wait)
}
}

Expand Down Expand Up @@ -308,7 +308,7 @@ internal extension Engine {
publisher.onOffer = { [weak self] offer in
guard let self = self else { return Promise(EngineError.state(message: "self is nil")) }
log("publisher onOffer \(offer.sdp)")
return signalClient.sendOffer(offer: offer)
return promise(from: signalClient.sendOffer, param1: offer)
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -383,14 +383,15 @@ internal extension Engine {
// this should never happen since Engine is owned by Room
guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) }

return self.signalClient.connect(url,
token,
connectOptions: _state.connectOptions,
reconnectMode: _state.reconnectMode,
adaptiveStream: room._state.options.adaptiveStream)
return promise(from: self.signalClient.connect,
param1: url,
param2: token,
param3: _state.connectOptions,
param4: _state.reconnectMode,
param5: room._state.options.adaptiveStream)
.then(on: queue) {
// wait for joinResponse
self.signalClient.joinResponseCompleter.waitPromise()
promise(from: self.signalClient.joinResponseCompleter.wait)
}.then(on: queue) { _ in
self._state.mutate { $0.connectStopwatch.split(label: "signal") }
}.then(on: queue) { jr in
Expand All @@ -405,9 +406,9 @@ internal extension Engine {
}
}
}.then(on: queue) {
self.signalClient.resumeResponseQueue()
promise(from: self.signalClient.resumeResponseQueue)
}.then(on: queue) {
self.primaryTransportConnectedCompleter.waitPromise()
promise(from: self.primaryTransportConnectedCompleter.wait)
}.then(on: queue) { _ -> Void in
self._state.mutate { $0.connectStopwatch.split(label: "engine") }
self.log("\(self._state.connectStopwatch)")
Expand Down Expand Up @@ -440,38 +441,39 @@ internal extension Engine {
// this should never happen since Engine is owned by Room
guard let room = self.room else { return Promise(EngineError.state(message: "Room is nil")) }

return self.signalClient.connect(url,
token,
connectOptions: _state.connectOptions,
reconnectMode: _state.reconnectMode,
adaptiveStream: room._state.options.adaptiveStream).then(on: queue) {
self.log("[reconnect] waiting for socket to connect...")
// Wait for primary transport to connect (if not already)
return self.primaryTransportConnectedCompleter.waitPromise()
}.then(on: queue) { _ in
// send SyncState before offer
self.sendSyncState()
}.then(on: queue) { () -> Promise<Void> in

self.subscriber?.restartingIce = true

// only if published, continue...
guard let publisher = self.publisher, self._state.hasPublished else {
return Promise(())
}

self.log("[reconnect] waiting for publisher to connect...")

return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) {
self.publisherTransportConnectedCompleter.waitPromise()
}

}.then(on: queue) { () -> Promise<Void> in

self.log("[reconnect] send queued requests")
// always check if there are queued requests
return self.signalClient.sendQueuedRequests()
}
return promise(from: self.signalClient.connect,
param1: url,
param2: token,
param3: _state.connectOptions,
param4: _state.reconnectMode,
param5: room._state.options.adaptiveStream).then(on: queue) {
self.log("[reconnect] waiting for socket to connect...")
// Wait for primary transport to connect (if not already)
return promise(from: self.primaryTransportConnectedCompleter.wait)
}.then(on: queue) { _ in
// send SyncState before offer
self.sendSyncState()
}.then(on: queue) { () -> Promise<Void> in

self.subscriber?.restartingIce = true

// only if published, continue...
guard let publisher = self.publisher, self._state.hasPublished else {
return Promise(())
}

self.log("[reconnect] waiting for publisher to connect...")

return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) {
promise(from: self.publisherTransportConnectedCompleter.wait)
}

}.then(on: queue) { () -> Promise<Void> in

self.log("[reconnect] send queued requests")
// always check if there are queued requests
return promise(from: self.signalClient.sendQueuedRequests)
}
}

// "full" re-connection sequence
Expand Down Expand Up @@ -581,11 +583,12 @@ internal extension Engine {
$0.subscribe = !autoSubscribe
}

return signalClient.sendSyncState(answer: previousAnswer.toPBType(),
offer: previousOffer?.toPBType(),
subscription: subscription,
publishTracks: room._state.localParticipant?.publishedTracksInfo(),
dataChannels: publisherDC.infos())
return promise(from: signalClient.sendSyncState,
param1: previousAnswer.toPBType(),
param2: previousOffer?.toPBType(),
param3: subscription,
param4: room._state.localParticipant?.publishedTracksInfo(),
param5: publisherDC.infos())
}
}

Expand Down
7 changes: 3 additions & 4 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public class Room: NSObject, ObservableObject, Loggable {
// return if already disconnected state
if case .disconnected = connectionState { return Promise(()) }

return engine.signalClient.sendLeave()
return promise(from: engine.signalClient.sendLeave)
.recover(on: queue) { self.log("Failed to send leave, error: \($0)") }
.then(on: queue) { [weak self] in
guard let self = self else { return }
Expand Down Expand Up @@ -349,9 +349,8 @@ internal extension Room {

extension Room {

@discardableResult
public func sendSimulate(scenario: SimulateScenario) -> Promise<Void> {
engine.signalClient.sendSimulate(scenario: scenario)
public func sendSimulate(scenario: SimulateScenario) async throws {
try await engine.signalClient.sendSimulate(scenario: scenario)
}
}

Expand Down
Loading

0 comments on commit 33b32f8

Please sign in to comment.