Skip to content

Commit

Permalink
Migrate Transport to async/await (livekit#270)
Browse files Browse the repository at this point in the history
* impl

* use AsyncQueueActor for pending candidates

* minor optimization
  • Loading branch information
hiroshihorie committed Nov 1, 2023
1 parent 00e4947 commit 75dc6cf
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 201 deletions.
25 changes: 12 additions & 13 deletions Sources/LiveKit/Core/Engine+SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ extension Engine: SignalClientDelegate {
return
}

transport.addIceCandidate(iceCandidate).catch(on: queue) { error in
promise(from: transport.add(iceCandidate:), param1: iceCandidate).catch(on: queue) { error in
self.log("failed to add ice candidate for transport: \(transport), error: \(error)", .error)
}
}
Expand All @@ -55,7 +55,7 @@ extension Engine: SignalClientDelegate {
return
}

publisher.setRemoteDescription(answer).catch(on: queue) { error in
promise(from: publisher.set(remoteDescription:), param1: answer).catch(on: queue) { error in
self.log("failed to set remote description, error: \(error)", .error)
}

Expand All @@ -64,23 +64,22 @@ extension Engine: SignalClientDelegate {

func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) {

log("received offer, creating & sending answer...")
log("Received offer, creating & sending answer...")

guard let subscriber = self.subscriber else {
log("failed to send answer, subscriber is nil", .error)
return
}

subscriber.setRemoteDescription(offer).then(on: queue) {
subscriber.createAnswer()
}.then(on: queue) { answer in
subscriber.setLocalDescription(answer)
}.then(on: queue) { answer in
promise(from: signalClient.sendAnswer, param1: answer)
}.then(on: queue) {
self.log("answer sent to signal")
}.catch(on: queue) { error in
self.log("failed to send answer, error: \(error)", .error)
Task {
do {
try await subscriber.set(remoteDescription: offer)
let answer = try await subscriber.createAnswer()
try await subscriber.set(localDescription: answer)
try await signalClient.send(answer: answer)
} catch let error {
log("Failed to send answer with error: \(error)", .error)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/Engine+TransportDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extension Engine: TransportDelegate {
log("target: \(transport.target), state: \(pcState)")

// primary connected
if transport.primary, case .connected = pcState {
if transport.isPrimary, case .connected = pcState {
primaryTransportConnectedCompleter.resume(returning: ())
}

Expand All @@ -35,7 +35,7 @@ extension Engine: TransportDelegate {

if _state.connectionState.isConnected {
// Attempt re-connect if primary or publisher transport failed
if (transport.primary || (_state.hasPublished && transport.target == .publisher)) && [.disconnected, .failed].contains(pcState) {
if (transport.isPrimary || (_state.hasPublished && transport.target == .publisher)) && [.disconnected, .failed].contains(pcState) {
log("[reconnect] starting, reason: transport disconnected or failed")
startReconnect()
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ internal class Engine: MulticastDelegate<EngineDelegate> {
let closeTransportPromises = [self.publisher,
self.subscriber]
.compactMap { $0 }
.map { $0.close() }
.map { promise(from: $0.close) }

return closeTransportPromises.all(on: self.queue)

Expand Down Expand Up @@ -306,9 +306,9 @@ internal extension Engine {
delegate: self)

publisher.onOffer = { [weak self] offer in
guard let self = self else { return Promise(EngineError.state(message: "self is nil")) }
log("publisher onOffer \(offer.sdp)")
return promise(from: signalClient.sendOffer, param1: offer)
guard let self = self else { return }
log("Publisher onOffer \(offer.sdp)")
try await signalClient.send(offer: offer)
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -455,7 +455,7 @@ internal extension Engine {
promise(from: self.sendSyncState)
}.then(on: queue) { () -> Promise<Void> in

self.subscriber?.restartingIce = true
self.subscriber?.isRestartingIce = true

// only if published, continue...
guard let publisher = self.publisher, self._state.hasPublished else {
Expand All @@ -464,7 +464,7 @@ internal extension Engine {

self.log("[reconnect] waiting for publisher to connect...")

return publisher.createAndSendOffer(iceRestart: true).then(on: self.queue) {
return promise(from: publisher.createAndSendOffer, param1: true).then(on: self.queue) {
promise(from: self.publisherTransportConnectedCompleter.wait)
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ internal extension SignalClient {
}
}

func sendOffer(offer: LKRTCSessionDescription) async throws {
func send(offer: LKRTCSessionDescription) async throws {

let r = Livekit_SignalRequest.with {
$0.offer = offer.toPBType()
Expand All @@ -355,7 +355,7 @@ internal extension SignalClient {
try await sendRequest(r)
}

func sendAnswer(answer: LKRTCSessionDescription) async throws {
func send(answer: LKRTCSessionDescription) async throws {

let r = Livekit_SignalRequest.with {
$0.answer = answer.toPBType()
Expand Down
Loading

0 comments on commit 75dc6cf

Please sign in to comment.