Skip to content

Commit

Permalink
thread safe async completer
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroshihorie committed Dec 6, 2023
1 parent c25ce2d commit 196a439
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Sources/LiveKit/Core/DataChannelPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class DataChannelPair: NSObject, Loggable {
_reliableChannel = nil
_lossyChannel = nil

openCompleter.cancel()
openCompleter.reset()

// execute on .webRTC queue
DispatchQueue.liveKitWebRTC.sync {
Expand Down
4 changes: 2 additions & 2 deletions Sources/LiveKit/Core/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ extension Room {

// Start Engine cleanUp sequence

engine.primaryTransportConnectedCompleter.cancel()
engine.publisherTransportConnectedCompleter.cancel()
engine.primaryTransportConnectedCompleter.reset()
engine.publisherTransportConnectedCompleter.reset()

engine._state.mutate {
// if isFullReconnect, keep connection related states
Expand Down
2 changes: 1 addition & 1 deletion Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class SignalClient: MulticastDelegate<SignalClientDelegate> {
_webSocket?.close()
_webSocket = nil

_joinResponseCompleter.cancel()
_joinResponseCompleter.reset()
latestJoinResponse = nil

// Reset state
Expand Down
80 changes: 43 additions & 37 deletions Sources/LiveKit/Support/AsyncCompleter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ actor CompleterMapActor<T> {
public func reset() {
// Reset call completers...
for (_, value) in _completerMap {
value.cancel()
value.reset()
}
// Clear all completers...
_completerMap.removeAll()
Expand All @@ -79,13 +79,15 @@ class AsyncCompleter<T>: Loggable {
private var _returningValue: T?
private var _throwingError: Error?

private let _lock = UnfairLock()

public init(label: String, timeOut: DispatchTimeInterval) {
self.label = label
_timeOut = timeOut
}

deinit {
cancel()
reset()
}

private func _cancelTimer() {
Expand All @@ -94,78 +96,82 @@ class AsyncCompleter<T>: Loggable {
_timeOutBlock = nil
}

public func cancel() {
_cancelTimer()
if _continuation != nil {
log("\(label) cancelled")
public func reset() {
_lock.sync {
_cancelTimer()
if let continuation = _continuation {
log("\(label) cancelled")
continuation.resume(throwing: AsyncCompleterError.cancelled)
}
_continuation = nil
_returningValue = nil
_throwingError = nil
}
_continuation?.resume(throwing: AsyncCompleterError.cancelled)
_continuation = nil
_returningValue = nil
_throwingError = nil
}

public func resume(returning value: T) {
log("\(label)")

_cancelTimer()

_returningValue = value
_continuation?.resume(returning: value)
_continuation = nil
_lock.sync {
_cancelTimer()
_returningValue = value
_continuation?.resume(returning: value)
_continuation = nil
}
}

public func resume(throwing error: Error) {
log("\(label)")

_cancelTimer()

_throwingError = error
_continuation?.resume(throwing: error)
_continuation = nil
_lock.sync {
_cancelTimer()
_throwingError = error
_continuation?.resume(throwing: error)
_continuation = nil
}
}

public func wait() async throws -> T {
// resume(returning:) already called
if let returningValue = _returningValue {
if let returningValue = _lock.sync({ _returningValue }) {
log("\(label) returning value...")
return returningValue
}

// resume(throwing:) already called
if let throwingError = _throwingError {
if let throwingError = _lock.sync({ _throwingError }) {
log("\(label) throwing error...")
throw throwingError
}

log("\(label) waiting...")

// Cancel any previous waits
cancel()
reset()

// Create a cancel-aware timed continuation
return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
// Store reference to continuation
_continuation = continuation

// Create time-out block
let timeOutBlock = DispatchWorkItem { [weak self] in
guard let self else { return }
self.log("\(self.label) timedOut")
self._continuation?.resume(throwing: AsyncCompleterError.timedOut)
self._continuation = nil
self.cancel()
self._lock.sync {
self._continuation?.resume(throwing: AsyncCompleterError.timedOut)
self._continuation = nil
}
self.reset()
}
_lock.sync {
// Schedule time-out block
_queue.asyncAfter(deadline: .now() + _timeOut, execute: timeOutBlock)
// Store reference to continuation
_continuation = continuation
// Store reference to time-out block
_timeOutBlock = timeOutBlock
}

// Schedule time-out block
_queue.asyncAfter(deadline: .now() + _timeOut, execute: timeOutBlock)
// Store reference to time-out block
_timeOutBlock = timeOutBlock
}
} onCancel: {
// Cancel completer when Task gets cancelled
cancel()
reset()
}
}
}
4 changes: 2 additions & 2 deletions Sources/LiveKit/Track/Capturers/VideoCapturer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol {
log("[publish] dimensions: \(String(describing: dimensions))")
dimensionsCompleter.resume(returning: dimensions)
} else {
dimensionsCompleter.cancel()
dimensionsCompleter.reset()
}
}
}
Expand Down Expand Up @@ -160,7 +160,7 @@ public class VideoCapturer: NSObject, Loggable, VideoCapturerProtocol {
$0.capturer?(self, didUpdate: .stopped)
}

dimensionsCompleter.cancel()
dimensionsCompleter.reset()

return true
}
Expand Down

0 comments on commit 196a439

Please sign in to comment.