Skip to content

Commit

Permalink
async: configureTransports
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Oct 30, 2023
1 parent 19af0f8 commit 4b8468c
Showing 1 changed file with 59 additions and 55 deletions.
114 changes: 59 additions & 55 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -281,78 +281,73 @@ internal class Engine: MulticastDelegate<EngineDelegate> {

internal extension Engine {

func configureTransports(joinResponse: Livekit_JoinResponse) -> Promise<Void> {
func configureTransports(joinResponse: Livekit_JoinResponse) async throws {

Promise<Void>(on: queue) { () -> Void in
log("Configuring transports...")

self.log("configuring transports...")

// this should never happen since Engine is owned by Room
guard let room = self.room else { throw EngineError.state(message: "Room is nil") }

guard self.subscriber == nil, self.publisher == nil else {
self.log("transports already configured")
return
}

// protocol v3
self.subscriberPrimary = joinResponse.subscriberPrimary
self.log("subscriberPrimary: \(joinResponse.subscriberPrimary)")
guard subscriber == nil, publisher == nil else {
log("Transports are already configured")
return
}

let connectOptions = self._state.connectOptions
// protocol v3
subscriberPrimary = joinResponse.subscriberPrimary
log("subscriberPrimary: \(joinResponse.subscriberPrimary)")

// Make a copy, instead of modifying the user-supplied RTCConfiguration object.
let rtcConfiguration = LKRTCConfiguration.liveKitDefault()
let connectOptions = self._state.connectOptions

// Set iceServers provided by the server
rtcConfiguration.iceServers = joinResponse.iceServers.map { $0.toRTCType() }
// Make a copy, instead of modifying the user-supplied RTCConfiguration object.
let rtcConfiguration = LKRTCConfiguration.liveKitDefault()

if !connectOptions.iceServers.isEmpty {
// Override with user provided iceServers
rtcConfiguration.iceServers = connectOptions.iceServers.map { $0.toRTCType() }
}
// Set iceServers provided by the server
rtcConfiguration.iceServers = joinResponse.iceServers.map { $0.toRTCType() }

if joinResponse.clientConfiguration.forceRelay == .enabled {
rtcConfiguration.iceTransportPolicy = .relay
}
if !connectOptions.iceServers.isEmpty {
// Override with user provided iceServers
rtcConfiguration.iceServers = connectOptions.iceServers.map { $0.toRTCType() }
}

let subscriber = try Transport(config: rtcConfiguration,
target: .subscriber,
primary: self.subscriberPrimary,
delegate: self)
if joinResponse.clientConfiguration.forceRelay == .enabled {
rtcConfiguration.iceTransportPolicy = .relay
}

let publisher = try Transport(config: rtcConfiguration,
target: .publisher,
primary: !self.subscriberPrimary,
delegate: self)
let subscriber = try Transport(config: rtcConfiguration,
target: .subscriber,
primary: subscriberPrimary,
delegate: self)

publisher.onOffer = { offer in
self.log("publisher onOffer \(offer.sdp)")
return self.signalClient.sendOffer(offer: offer)
}
let publisher = try Transport(config: rtcConfiguration,
target: .publisher,
primary: !subscriberPrimary,
delegate: self)

// data over pub channel for backwards compatibility
publisher.onOffer = { [weak self] offer in
guard let self = self else { return Promise(EngineError.state(message: "self is nil")) }
log("publisher onOffer \(offer.sdp)")
return signalClient.sendOffer(offer: offer)
}

let publisherReliableDC = publisher.dataChannel(for: LKRTCDataChannel.labels.reliable,
configuration: Engine.createDataChannelConfiguration())
// data over pub channel for backwards compatibility

let publisherLossyDC = publisher.dataChannel(for: LKRTCDataChannel.labels.lossy,
configuration: Engine.createDataChannelConfiguration(maxRetransmits: 0))
let publisherReliableDC = publisher.dataChannel(for: LKRTCDataChannel.labels.reliable,
configuration: Engine.createDataChannelConfiguration())

self.publisherDC.set(reliable: publisherReliableDC)
self.publisherDC.set(lossy: publisherLossyDC)
let publisherLossyDC = publisher.dataChannel(for: LKRTCDataChannel.labels.lossy,
configuration: Engine.createDataChannelConfiguration(maxRetransmits: 0))

self.log("dataChannel.\(String(describing: publisherReliableDC?.label)) : \(String(describing: publisherReliableDC?.channelId))")
self.log("dataChannel.\(String(describing: publisherLossyDC?.label)) : \(String(describing: publisherLossyDC?.channelId))")
publisherDC.set(reliable: publisherReliableDC)
publisherDC.set(lossy: publisherLossyDC)

if !self.subscriberPrimary {
// lazy negotiation for protocol v3+
self.publisherShouldNegotiate()
}
log("dataChannel.\(String(describing: publisherReliableDC?.label)) : \(String(describing: publisherReliableDC?.channelId))")
log("dataChannel.\(String(describing: publisherLossyDC?.label)) : \(String(describing: publisherLossyDC?.channelId))")

self.subscriber = subscriber
self.publisher = publisher
if !subscriberPrimary {
// lazy negotiation for protocol v3+
publisherShouldNegotiate()
}

self.subscriber = subscriber
self.publisher = publisher
}
}

Expand Down Expand Up @@ -417,7 +412,16 @@ internal extension Engine {
}.then(on: queue) { _ in
self._state.mutate { $0.connectStopwatch.split(label: "signal") }
}.then(on: queue) { jr in
self.configureTransports(joinResponse: jr)
Promise(on: self.queue) { resolve, reject in
Task {
do {
try await self.configureTransports(joinResponse: jr)
resolve(())
} catch let error {
reject(error)
}
}
}
}.then(on: queue) {
self.signalClient.resumeResponseQueue()
}.then(on: queue) {
Expand Down

0 comments on commit 4b8468c

Please sign in to comment.