Skip to content

Commit

Permalink
CompleterMapActor pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Oct 31, 2023
1 parent ec42d30 commit 96ac437
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
49 changes: 13 additions & 36 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import Foundation

internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

// MARK: - Types

typealias AddTrackRequestPopulator<R> = (inout Livekit_AddTrackRequest) throws -> R
typealias AddTrackResult<R> = (result: R, trackInfo: Livekit_TrackInfo)

private let queue = DispatchQueue(label: "LiveKitSDK.signalClient", qos: .default)

// MARK: - Public
Expand All @@ -29,7 +34,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {
// MARK: - Internal

internal let joinResponseCompleter = AsyncCompleter<Livekit_JoinResponse>(label: "Join response", timeOut: .defaultJoinResponse)
internal var completersForAddTrack = [String: AsyncCompleter<Livekit_TrackInfo>]()
internal let _addTrackCompleters = CompleterMapActor<Livekit_TrackInfo>(label: "Completers for add track", timeOut: .defaultPublish)

internal struct State: ReconnectableState, Equatable {
var reconnectMode: ReconnectMode?
Expand All @@ -42,7 +47,7 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

// Queue to store requests while reconnecting
private let _requestQueue = AsyncQueueActor<Livekit_SignalRequest>()
private var _responseQueue = AsyncQueueActor<Livekit_SignalResponse>()
private let _responseQueue = AsyncQueueActor<Livekit_SignalResponse>()

private var _webSocket: WebSocket?
private var latestJoinResponse: Livekit_JoinResponse?
Expand Down Expand Up @@ -163,43 +168,16 @@ internal class SignalClient: MulticastDelegate<SignalClientDelegate> {

_state.mutate {

for completer in completersForAddTrack.values {
completer.cancel()
}

joinResponseCompleter.cancel()

// reset state
$0 = State()
}

await _addTrackCompleters.reset()
await _requestQueue.clear()
await _responseQueue.clear()
}

func resumeCompleter(forAddTrackRequest trackCid: String, trackInfo: Livekit_TrackInfo) {

_state.mutate { _ in
if let completer = completersForAddTrack[trackCid] {
completer.resume(returning: trackInfo)
}
}
}

func asyncCompleter(forAddTrackRequest trackCid: String) -> AsyncCompleter<Livekit_TrackInfo> {

_state.mutate { _ in

if completersForAddTrack.keys.contains(trackCid) {
// reset if already exists
completersForAddTrack[trackCid]!.cancel()
} else {
completersForAddTrack[trackCid] = AsyncCompleter<Livekit_TrackInfo>(label: "Add track: \(trackCid)", timeOut: .defaultPublish)
}

return completersForAddTrack[trackCid]!
}
}
}

// MARK: - Private
Expand Down Expand Up @@ -299,7 +277,7 @@ private extension SignalClient {

log("[publish] resolving completer for cid: \(trackPublished.cid)")
// Complete
resumeCompleter(forAddTrackRequest: trackPublished.cid, trackInfo: trackPublished.track)
await _addTrackCompleters.resume(returning: trackPublished.track, for: trackPublished.cid)

case .trackUnpublished(let trackUnpublished):
notify { $0.signalClient(self, didUnpublish: trackUnpublished) }
Expand Down Expand Up @@ -410,9 +388,6 @@ internal extension SignalClient {
try await sendRequest(r)
}

typealias AddTrackRequestPopulator<R> = (inout Livekit_AddTrackRequest) throws -> R
typealias AddTrackResult<R> = (result: R, trackInfo: Livekit_TrackInfo)

func sendAddTrack<R>(cid: String,
name: String,
type: Livekit_TrackType,
Expand All @@ -434,11 +409,13 @@ internal extension SignalClient {
$0.addTrack = addTrackRequest
}

let completer = asyncCompleter(forAddTrackRequest: cid)
// Get completer for this add track request...
let completer = await _addTrackCompleters.completer(for: cid)

// Send the request to server...
try await sendRequest(request)

// Wait for the trackInfo
// Wait for the trackInfo...
let trackInfo = try await completer.wait()

return AddTrackResult(result: populateResult, trackInfo: trackInfo)
Expand Down
40 changes: 40 additions & 0 deletions Sources/LiveKit/Support/AsyncCompleter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,46 @@ internal enum AsyncCompleterError: LiveKitError {
}
}

/// Manages a map of AsyncCompleters
internal actor CompleterMapActor<T> {

public let label: String

private let _timeOut: DispatchTimeInterval
private var _completerMap = [String: AsyncCompleter<T>]()

public init(label: String, timeOut: DispatchTimeInterval) {
self.label = label
self._timeOut = timeOut
}

public func completer(for key: String) -> AsyncCompleter<T> {
// Return completer if already exists...
if let element = _completerMap[key] {
return element
}

let newCompleter = AsyncCompleter<T>(label: label, timeOut: _timeOut)
_completerMap[key] = newCompleter
return newCompleter
}

public func resume(returning value: T, for key: String) {
if let element = _completerMap[key] {
element.resume(returning: value)
}
}

public func reset() {
// Reset call completers...
for (_, value) in _completerMap {
value.cancel()
}
// Clear all completers...
_completerMap.removeAll()
}
}

internal class AsyncCompleter<T>: Loggable {

public let label: String
Expand Down

0 comments on commit 96ac437

Please sign in to comment.