Skip to content

Commit

Permalink
expose webrtc queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Sep 21, 2023
1 parent b2353db commit 1cd132c
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ internal class DataChannelPair: NSObject, Loggable {
openCompleter = Promise<Void>.pending()

// execute on .webRTC queue
return Promise(on: .webRTC) {
return Promise(on: .liveKitWebRTC) {
r?.close()
l?.close()
}
Expand Down
30 changes: 15 additions & 15 deletions Sources/LiveKit/Core/Engine+WebRTC.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,59 +110,59 @@ internal extension Engine {

static func createPeerConnection(_ configuration: RTCConfiguration,
constraints: RTCMediaConstraints) -> RTCPeerConnection? {
DispatchQueue.webRTC.sync { peerConnectionFactory.peerConnection(with: configuration,
constraints: constraints,
delegate: nil) }
DispatchQueue.liveKitWebRTC.sync { peerConnectionFactory.peerConnection(with: configuration,
constraints: constraints,
delegate: nil) }
}

static func createVideoSource(forScreenShare: Bool) -> RTCVideoSource {
DispatchQueue.webRTC.sync { peerConnectionFactory.videoSource(forScreenCast: forScreenShare) }
DispatchQueue.liveKitWebRTC.sync { peerConnectionFactory.videoSource(forScreenCast: forScreenShare) }
}

static func createVideoTrack(source: RTCVideoSource) -> RTCVideoTrack {
DispatchQueue.webRTC.sync { peerConnectionFactory.videoTrack(with: source,
trackId: UUID().uuidString) }
DispatchQueue.liveKitWebRTC.sync { peerConnectionFactory.videoTrack(with: source,
trackId: UUID().uuidString) }
}

static func createAudioSource(_ constraints: RTCMediaConstraints?) -> RTCAudioSource {
DispatchQueue.webRTC.sync { peerConnectionFactory.audioSource(with: constraints) }
DispatchQueue.liveKitWebRTC.sync { peerConnectionFactory.audioSource(with: constraints) }
}

static func createAudioTrack(source: RTCAudioSource) -> RTCAudioTrack {
DispatchQueue.webRTC.sync { peerConnectionFactory.audioTrack(with: source,
trackId: UUID().uuidString) }
DispatchQueue.liveKitWebRTC.sync { peerConnectionFactory.audioTrack(with: source,
trackId: UUID().uuidString) }
}

static func createDataChannelConfiguration(ordered: Bool = true,
maxRetransmits: Int32 = -1) -> RTCDataChannelConfiguration {
let result = DispatchQueue.webRTC.sync { RTCDataChannelConfiguration() }
let result = DispatchQueue.liveKitWebRTC.sync { RTCDataChannelConfiguration() }
result.isOrdered = ordered
result.maxRetransmits = maxRetransmits
return result
}

static func createDataBuffer(data: Data) -> RTCDataBuffer {
DispatchQueue.webRTC.sync { RTCDataBuffer(data: data, isBinary: true) }
DispatchQueue.liveKitWebRTC.sync { RTCDataBuffer(data: data, isBinary: true) }
}

static func createIceCandidate(fromJsonString: String) throws -> RTCIceCandidate {
try DispatchQueue.webRTC.sync { try RTCIceCandidate(fromJsonString: fromJsonString) }
try DispatchQueue.liveKitWebRTC.sync { try RTCIceCandidate(fromJsonString: fromJsonString) }
}

static func createSessionDescription(type: RTCSdpType, sdp: String) -> RTCSessionDescription {
DispatchQueue.webRTC.sync { RTCSessionDescription(type: type, sdp: sdp) }
DispatchQueue.liveKitWebRTC.sync { RTCSessionDescription(type: type, sdp: sdp) }
}

static func createVideoCapturer() -> RTCVideoCapturer {
DispatchQueue.webRTC.sync { RTCVideoCapturer() }
DispatchQueue.liveKitWebRTC.sync { RTCVideoCapturer() }
}

static func createRtpEncodingParameters(rid: String? = nil,
encoding: MediaEncoding? = nil,
scaleDownBy: Double? = nil,
active: Bool = true) -> RTCRtpEncodingParameters {

let result = DispatchQueue.webRTC.sync { RTCRtpEncodingParameters() }
let result = DispatchQueue.liveKitWebRTC.sync { RTCRtpEncodingParameters() }

result.isActive = active
result.rid = rid
Expand Down
30 changes: 15 additions & 15 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ internal class Transport: MulticastDelegate<TransportDelegate> {
public var onOffer: TransportOnOffer?

public var connectionState: RTCPeerConnectionState {
DispatchQueue.webRTC.sync { pc.connectionState }
DispatchQueue.liveKitWebRTC.sync { pc.connectionState }
}

public var localDescription: RTCSessionDescription? {
DispatchQueue.webRTC.sync { pc.localDescription }
DispatchQueue.liveKitWebRTC.sync { pc.localDescription }
}

public var remoteDescription: RTCSessionDescription? {
DispatchQueue.webRTC.sync { pc.remoteDescription }
DispatchQueue.liveKitWebRTC.sync { pc.remoteDescription }
}

public var signalingState: RTCSignalingState {
DispatchQueue.webRTC.sync { pc.signalingState }
DispatchQueue.liveKitWebRTC.sync { pc.signalingState }
}

public var isConnected: Bool {
Expand All @@ -71,7 +71,7 @@ internal class Transport: MulticastDelegate<TransportDelegate> {
private var pendingCandidates: [RTCIceCandidate] = []

// used for stats timer
private let statsTimer = DispatchQueueTimer(timeInterval: 1, queue: .webRTC)
private let statsTimer = DispatchQueueTimer(timeInterval: 1, queue: .liveKitWebRTC)
private var stats = [String: TrackStats]()

// keep reference to cancel later
Expand All @@ -98,7 +98,7 @@ internal class Transport: MulticastDelegate<TransportDelegate> {

log()

DispatchQueue.webRTC.sync { pc.delegate = self }
DispatchQueue.liveKitWebRTC.sync { pc.delegate = self }
add(delegate: delegate)

statsTimer.handler = { [weak self] in
Expand Down Expand Up @@ -198,7 +198,7 @@ internal class Transport: MulticastDelegate<TransportDelegate> {
self.statsTimer.suspend()

// can be async
DispatchQueue.webRTC.async {
DispatchQueue.liveKitWebRTC.async {
// Stop listening to delegate
self.pc.delegate = nil
// Remove all senders (if any)
Expand Down Expand Up @@ -332,7 +332,7 @@ private extension Transport {

func createOffer(for constraints: [String: String]? = nil) -> Promise<RTCSessionDescription> {

Promise<RTCSessionDescription>(on: .webRTC) { complete, fail in
Promise<RTCSessionDescription>(on: .liveKitWebRTC) { complete, fail in

let mediaConstraints = RTCMediaConstraints(mandatoryConstraints: constraints,
optionalConstraints: nil)
Expand All @@ -351,7 +351,7 @@ private extension Transport {

func setRemoteDescriptionPromise(_ sd: RTCSessionDescription) -> Promise<RTCSessionDescription> {

Promise<RTCSessionDescription>(on: .webRTC) { complete, fail in
Promise<RTCSessionDescription>(on: .liveKitWebRTC) { complete, fail in

self.pc.setRemoteDescription(sd) { error in

Expand All @@ -367,7 +367,7 @@ private extension Transport {

func addIceCandidatePromise(_ candidate: RTCIceCandidate) -> Promise<Void> {

Promise<Void>(on: .webRTC) { complete, fail in
Promise<Void>(on: .liveKitWebRTC) { complete, fail in

self.pc.add(candidate) { error in

Expand All @@ -388,7 +388,7 @@ internal extension Transport {

func createAnswer(for constraints: [String: String]? = nil) -> Promise<RTCSessionDescription> {

Promise<RTCSessionDescription>(on: .webRTC) { complete, fail in
Promise<RTCSessionDescription>(on: .liveKitWebRTC) { complete, fail in

let mediaConstraints = RTCMediaConstraints(mandatoryConstraints: constraints,
optionalConstraints: nil)
Expand All @@ -407,7 +407,7 @@ internal extension Transport {

func setLocalDescription(_ sd: RTCSessionDescription) -> Promise<RTCSessionDescription> {

Promise<RTCSessionDescription>(on: .webRTC) { complete, fail in
Promise<RTCSessionDescription>(on: .liveKitWebRTC) { complete, fail in

self.pc.setLocalDescription(sd) { error in

Expand All @@ -424,7 +424,7 @@ internal extension Transport {
func addTransceiver(with track: RTCMediaStreamTrack,
transceiverInit: RTCRtpTransceiverInit) -> Promise<RTCRtpTransceiver> {

Promise<RTCRtpTransceiver>(on: .webRTC) { complete, fail in
Promise<RTCRtpTransceiver>(on: .liveKitWebRTC) { complete, fail in

guard let transceiver = self.pc.addTransceiver(with: track, init: transceiverInit) else {
fail(EngineError.webRTC(message: "failed to add transceiver"))
Expand All @@ -437,7 +437,7 @@ internal extension Transport {

func removeTrack(_ sender: RTCRtpSender) -> Promise<Void> {

Promise<Void>(on: .webRTC) { complete, fail in
Promise<Void>(on: .liveKitWebRTC) { complete, fail in

guard self.pc.removeTrack(sender) else {
fail(EngineError.webRTC(message: "failed to remove track"))
Expand All @@ -452,7 +452,7 @@ internal extension Transport {
configuration: RTCDataChannelConfiguration,
delegate: RTCDataChannelDelegate? = nil) -> RTCDataChannel? {

let result = DispatchQueue.webRTC.sync { pc.dataChannel(forLabel: label, configuration: configuration) }
let result = DispatchQueue.liveKitWebRTC.sync { pc.dataChannel(forLabel: label, configuration: configuration) }
result?.delegate = delegate
return result
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/LiveKit/Extensions/DispatchQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

import Foundation

internal extension DispatchQueue {
extension DispatchQueue {

static let webRTC = DispatchQueue(label: "LiveKitSDK.webRTC", qos: .default)
// The queue which SDK uses to invoke WebRTC methods
public static let liveKitWebRTC = DispatchQueue(label: "LiveKitSDK.webRTC", qos: .default)
}
2 changes: 1 addition & 1 deletion Sources/LiveKit/Extensions/RTCConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ extension RTCConfiguration {

public static func liveKitDefault() -> RTCConfiguration {

let result = DispatchQueue.webRTC.sync { RTCConfiguration() }
let result = DispatchQueue.liveKitWebRTC.sync { RTCConfiguration() }
result.sdpSemantics = .unifiedPlan
result.continualGatheringPolicy = .gatherContinually
result.candidateNetworkPolicy = .all
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Extensions/RTCMediaConstraints.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extension RTCMediaConstraints {
// optionalConstraints: nil
// )

static let defaultPCConstraints = DispatchQueue.webRTC.sync { RTCMediaConstraints(
static let defaultPCConstraints = DispatchQueue.liveKitWebRTC.sync { RTCMediaConstraints(
mandatoryConstraints: nil,
optionalConstraints: ["DtlsSrtpKeyAgreement": kRTCMediaConstraintsValueTrue]
) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ extension RTCVideoCapturerDelegate {

onResolveSourceDimensions?(sourceDimensions)

DispatchQueue.webRTC.sync {
DispatchQueue.liveKitWebRTC.sync {

let rtcBuffer = RTCCVPixelBuffer(pixelBuffer: pixelBuffer)
let rtcFrame = RTCVideoFrame(buffer: rtcBuffer,
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Participant/LocalParticipant.swift
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class LocalParticipant: Participant {
source: track.source.toPBType(),
encryption: self.room.e2eeManager?.e2eeOptions.encryptionType.toPBType() ?? .none ) { populator in

let transInit = DispatchQueue.webRTC.sync { RTCRtpTransceiverInit() }
let transInit = DispatchQueue.liveKitWebRTC.sync { RTCRtpTransceiverInit() }
transInit.direction = .sendOnly

if let track = track as? LocalVideoTrack {
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Track/AudioManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class AudioManager: Loggable {
/// - setActive: passing true/false will call `AVAudioSession.setActive` internally
public func defaultConfigureAudioSessionFunc(newState: State, oldState: State) {

DispatchQueue.webRTC.async { [weak self] in
DispatchQueue.liveKitWebRTC.async { [weak self] in

guard let self = self else { return }

Expand Down
10 changes: 5 additions & 5 deletions Sources/LiveKit/Track/Capturers/CameraCapturer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CameraCapturer: VideoCapturer {

@objc
public static func captureDevices() -> [AVCaptureDevice] {
DispatchQueue.webRTC.sync { RTCCameraVideoCapturer.captureDevices() }
DispatchQueue.liveKitWebRTC.sync { RTCCameraVideoCapturer.captureDevices() }
}

/// Checks whether both front and back capturing devices exist, and can be switched.
Expand Down Expand Up @@ -81,7 +81,7 @@ public class CameraCapturer: VideoCapturer {
}

init(delegate: RTCVideoCapturerDelegate, options: CameraCaptureOptions) {
self.capturer = DispatchQueue.webRTC.sync { RTCCameraVideoCapturer(delegate: delegate) }
self.capturer = DispatchQueue.liveKitWebRTC.sync { RTCCameraVideoCapturer(delegate: delegate) }
self.options = options
super.init(delegate: delegate)

Expand Down Expand Up @@ -133,7 +133,7 @@ public class CameraCapturer: VideoCapturer {
}

// list of all formats in order of dimensions size
let formats = DispatchQueue.webRTC.sync { RTCCameraVideoCapturer.supportedFormats(for: device) }
let formats = DispatchQueue.liveKitWebRTC.sync { RTCCameraVideoCapturer.supportedFormats(for: device) }
// create an array of sorted touples by dimensions size
let sortedFormats = formats.map({ (format: $0, dimensions: Dimensions(from: CMVideoFormatDescriptionGetDimensions($0.formatDescription))) })
.sorted { $0.dimensions.area < $1.dimensions.area }
Expand Down Expand Up @@ -194,7 +194,7 @@ public class CameraCapturer: VideoCapturer {
}

// return promise that waits for capturer to start
return Promise<Bool>(on: .webRTC) { resolve, fail in
return Promise<Bool>(on: .liveKitWebRTC) { resolve, fail in
// start the RTCCameraVideoCapturer
self.capturer.startCapture(with: device, format: selectedFormat.format, fps: selectedFps) { error in
if let error = error {
Expand Down Expand Up @@ -224,7 +224,7 @@ public class CameraCapturer: VideoCapturer {
return Promise(false)
}

return Promise<Bool>(on: .webRTC) { resolve, _ in
return Promise<Bool>(on: .liveKitWebRTC) { resolve, _ in
// stop the RTCCameraVideoCapturer
self.capturer.stopCapture {
// update internal vars
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Track/Capturers/VideoCapturer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol {
/// `kCVPixelFormatType_420YpCbCr8BiPlanarFullRange`,
/// `kCVPixelFormatType_32BGRA`,
/// `kCVPixelFormatType_32ARGB`.
public static let supportedPixelFormats = DispatchQueue.webRTC.sync { RTCCVPixelBuffer.supportedPixelFormats() }
public static let supportedPixelFormats = DispatchQueue.liveKitWebRTC.sync { RTCCVPixelBuffer.supportedPixelFormats() }

public static func createTimeStampNs() -> Int64 {
let systemTime = ProcessInfo.processInfo.systemUptime
Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Track/Local/LocalAudioTrack.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class LocalAudioTrack: Track, LocalTrack, AudioTrack {
"googAutoGainControl2": options.experimentalAutoGainControl.toString()
]

let audioConstraints = DispatchQueue.webRTC.sync { RTCMediaConstraints(mandatoryConstraints: nil,
optionalConstraints: constraints) }
let audioConstraints = DispatchQueue.liveKitWebRTC.sync { RTCMediaConstraints(mandatoryConstraints: nil,
optionalConstraints: constraints) }

let audioSource = Engine.createAudioSource(audioConstraints)
let rtcTrack = Engine.createAudioTrack(source: audioSource)
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Track/Track.swift
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class Track: NSObject, Loggable {

private weak var transport: Transport?
// private var transceiver: RTCRtpTransceiver?
private let statsTimer = DispatchQueueTimer(timeInterval: 1, queue: .webRTC)
private let statsTimer = DispatchQueueTimer(timeInterval: 1, queue: .liveKitWebRTC)
// Weak reference to the corresponding transport

internal init(name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ extension RemoteTrackPublication {

if let videoTrack = track?.mediaTrack as? RTCVideoTrack {
log("VideoTrack.shouldReceive: \(enabled)")
DispatchQueue.webRTC.sync { videoTrack.shouldReceive = enabled }
DispatchQueue.liveKitWebRTC.sync { videoTrack.shouldReceive = enabled }
}

send(trackSettings: newSettings).catch(on: queue) { [weak self] error in
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Types/IceServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ extension Livekit_ICEServer {
func toRTCType() -> RTCIceServer {
let rtcUsername = !username.isEmpty ? username : nil
let rtcCredential = !credential.isEmpty ? credential : nil
return DispatchQueue.webRTC.sync { RTCIceServer(urlStrings: urls,
username: rtcUsername,
credential: rtcCredential) }
return DispatchQueue.liveKitWebRTC.sync { RTCIceServer(urlStrings: urls,
username: rtcUsername,
credential: rtcCredential) }
}
}

0 comments on commit 1cd132c

Please sign in to comment.