Skip to content

Commit

Permalink
fix QueueActor
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Jan 2, 2024
1 parent ac357a9 commit 0a3e95f
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 57 deletions.
6 changes: 3 additions & 3 deletions Sources/LiveKit/Core/Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ extension Engine {
// Check cancellation after configuring transports
try Task.checkCancellation()

try await signalClient.resumeResponseQueue()
await signalClient.resumeResponseQueue()
try await primaryTransportConnectedCompleter.wait()
_state.mutate { $0.connectStopwatch.split(label: "engine") }
log("\(_state.connectStopwatch)")
Expand Down Expand Up @@ -426,7 +426,7 @@ extension Engine {

// Update configuration
try await configureTransports(connectResponse: connectResponse)
try await signalClient.resumeResponseQueue()
await signalClient.resumeResponseQueue()
log("[Connect] Waiting for socket to connect...")
// Wait for primary transport to connect (if not already)
try await primaryTransportConnectedCompleter.wait()
Expand All @@ -444,7 +444,7 @@ extension Engine {
}

// always check if there are queued requests
try await signalClient.sendQueuedRequests()
try await signalClient.resumeRequestQueue()
}

// "full" re-connection sequence
Expand Down
60 changes: 31 additions & 29 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,29 @@ class SignalClient: MulticastDelegate<SignalClientDelegate> {
private let _queue = DispatchQueue(label: "LiveKitSDK.signalClient", qos: .default)

// Queue to store requests while reconnecting
private let _requestQueue = AsyncQueueActor<Livekit_SignalRequest>()
private let _responseQueue = AsyncQueueActor<Livekit_SignalResponse>()
private lazy var _requestQueue = QueueActor<Livekit_SignalRequest>(onProcess: { [weak self] request in
guard let self else { return }

do {
// Prepare request data...
guard let data = try? request.serializedData() else {
log("Could not serialize request data", .error)
throw LiveKitError(.failedToConvertData, message: "Failed to convert data")
}

let webSocket = try requireWebSocket()
try await webSocket.send(data: data)

} catch {
log("Failed to send queued request \(request) with error: \(error)", .error)
}
})

private lazy var _responseQueue = QueueActor<Livekit_SignalResponse>(onProcess: { [weak self] response in
guard let self else { return }

await self._process(signalResponse: response)
})

private var _webSocket: WebSocket?
private var _messageLoopTask: Task<Void, Never>?
Expand Down Expand Up @@ -213,25 +234,14 @@ class SignalClient: MulticastDelegate<SignalClientDelegate> {

private extension SignalClient {
// Send request or enqueue while reconnecting
func _sendRequest(_ request: Livekit_SignalRequest, enqueueIfReconnecting: Bool = true) async throws {
guard !(_state.connectionState == .reconnecting && request.canEnqueue() && enqueueIfReconnecting) else {
log("Queuing request while reconnecting, request: \(request)")
await _requestQueue.enqueue(request)
return
}

func _sendRequest(_ request: Livekit_SignalRequest) async throws {
guard _state.connectionState != .disconnected else {
log("connectionState is .disconnected", .error)
throw LiveKitError(.invalidState, message: "connectionState is .disconnected")
}

guard let data = try? request.serializedData() else {
log("Could not serialize request data", .error)
throw LiveKitError(.failedToConvertData, message: "Failed to convert data")
}

let webSocket = try requireWebSocket()
try await webSocket.send(data: data)
let processImmediately = !(_state.connectionState == .reconnecting && request.canEnqueue())
await _requestQueue.process(request, if: processImmediately)
}

func _onWebSocketMessage(message: URLSessionWebSocketTask.Message) {
Expand All @@ -249,7 +259,7 @@ private extension SignalClient {
}

Task {
await _responseQueue.enqueue(response) { await _process(signalResponse: $0) }
await _responseQueue.processIfResumed(response)
}
}

Expand Down Expand Up @@ -345,27 +355,19 @@ private extension SignalClient {
// MARK: - Internal

extension SignalClient {
func resumeResponseQueue() async throws {
try await _responseQueue.resume { response in
await _process(signalResponse: response)
}
func resumeResponseQueue() async {
await _responseQueue.resume()
}
}

// MARK: - Send methods

extension SignalClient {
func sendQueuedRequests() async throws {
func resumeRequestQueue() async throws {
let queueCount = await _requestQueue.count
log("[Connect] Sending queued requests (\(queueCount))...")

try await _requestQueue.resume { element in
do {
try await _sendRequest(element, enqueueIfReconnecting: false)
} catch {
log("Failed to send queued request \(element) with error: \(error)", .error)
}
}
await _requestQueue.resume()
}

func send(offer: LKRTCSessionDescription) async throws {
Expand Down
25 changes: 12 additions & 13 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,16 @@ class Transport: MulticastDelegate<TransportDelegate> {

// forbid direct access to PeerConnection
private let _pc: LKRTCPeerConnection
private var _pendingCandidatesQueue = AsyncQueueActor<LKRTCIceCandidate>()

private lazy var _iceCandidatesQueue = QueueActor<LKRTCIceCandidate>(onProcess: { [weak self] iceCandidate in
guard let self else { return }

do {
try await self._pc.add(iceCandidate)
} catch {
log("Failed to add(iceCandidate:) with error: \(error)", .error)
}
})

// keep reference to cancel later
private var _debounceWorkItem: DispatchWorkItem?
Expand Down Expand Up @@ -103,23 +112,13 @@ class Transport: MulticastDelegate<TransportDelegate> {
}

func add(iceCandidate candidate: LKRTCIceCandidate) async throws {
if remoteDescription != nil, !isRestartingIce {
return try await _pc.add(candidate)
}

await _pendingCandidatesQueue.enqueue(candidate)
await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !isRestartingIce)
}

func set(remoteDescription sd: LKRTCSessionDescription) async throws {
try await _pc.setRemoteDescription(sd)

try await _pendingCandidatesQueue.resume { candidate in
do {
try await add(iceCandidate: candidate)
} catch {
log("Failed to add(iceCandidate:) with error: \(error)", .error)
}
}
await _iceCandidatesQueue.resume()

isRestartingIce = false

Expand Down
6 changes: 6 additions & 0 deletions Sources/LiveKit/Extensions/CustomStringConvertible.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ extension ReconnectMode: CustomStringConvertible {
}
}
}

extension Livekit_SignalResponse: CustomStringConvertible {
var description: String {
"Livekit_SignalResponse(\(String(describing: message)))"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import Foundation

actor AsyncQueueActor<T>: Loggable {
actor QueueActor<T>: Loggable {
typealias OnProcess = (T) async -> Void

// MARK: - Public

public enum State {
Expand All @@ -31,39 +33,52 @@ actor AsyncQueueActor<T>: Loggable {
// MARK: - Private

private var queue = [T]()
private let onProcess: OnProcess

init(onProcess: @escaping OnProcess) {
self.onProcess = onProcess
}

/// Mark as `.suspended`.
func suspend() {
state = .suspended
}

func enqueue(_ value: T) {
log("Queued value: \(value)")
queue.append(value)
/// Only process if `.resumed` state, otherwise enqueue.
func processIfResumed(_ value: T) async {
await process(value, if: state == .resumed)
}

/// Only enqueue if `.suspended` state, otherwise process immediately.
func enqueue(_ value: T, ifResumed process: (T) async -> Void) async {
if case .suspended = state {
queue.append(value)
/// Only process if `condition` is true, otherwise enqueue.
func process(_ value: T, if condition: Bool) async {
if condition {
await onProcess(value)
} else {
await process(value)
queue.append(value)
}
log("process if: \(condition ? "true" : "false"), count: \(queue.count)")
}

func clear() {
if !queue.isEmpty {
log("Clearing queue which is not empty", .warning)
}

queue.removeAll()
state = .resumed
}

/// Mark as `.resumed` and process each element with an async `block`.
func resume(_ block: (T) async throws -> Void) async throws {
func resume() async {
log("resuming...")

state = .resumed
if queue.isEmpty { return }
for element in queue {
// Check cancellation before processing next block...
try Task.checkCancellation()
try await block(element)
// try Task.checkCancellation()
log("resume: processing element...")
await onProcess(element)
}
queue.removeAll()
}
Expand Down
37 changes: 37 additions & 0 deletions Tests/LiveKitTests/QueueActorTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 LiveKit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

@testable import LiveKit
import XCTest

class QueueActorTests: XCTestCase {
private lazy var queue = QueueActor<String> { print($0) }

override func setUpWithError() throws {}

override func tearDown() async throws {}

func testQueueActor01() async throws {
await queue.processIfResumed("Value 0")
await queue.suspend()
await queue.processIfResumed("Value 1")
await queue.processIfResumed("Value 2")
await queue.processIfResumed("Value 3")
await print("Count: \(queue.count)")
await queue.resume()
await print("Count: \(queue.count)")
}
}

0 comments on commit 0a3e95f

Please sign in to comment.