Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 46 additions & 61 deletions Sources/Concurrency/CountDownLatch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import Foundation

/// A concurrency utility class that allows coordination between threads. A count down latch
/// starts with an initial count. Threads can then decrement the count until it reaches zero,
/// at which point, the suspended waiting thread shall proceed.
/// at which point, the suspended waiting thread shall proceed. A `CountDownLatch` behaves
/// differently from a `DispatchSemaphore` once the latch is open. Unlike a semaphore where
/// subsequent waits would still block the caller thread, once a `CountDownLatch` is open, all
/// subsequent waits can directly passthrough.
public class CountDownLatch {

/// The initial count of the latch.
Expand All @@ -29,86 +32,68 @@ public class CountDownLatch {
/// - parameter count: The initial count for the latch.
public init(count: Int) {
assert(count > 0, "CountDownLatch must have an initial count that is greater than 0.")

initialCount = count
countDownValue = count
waitingCount = 0
conditionCount = AtomicInt(initialValue: count)
}

/// Decrements the latch's count, resuming all awaiting threads if the count reaches zero.
///
/// - note: If the latch is already open, invoking this method has no effects.
public func countDown() {
// Use the serial queue to read and write to both the count variables. This allows us to
// ensure thread-safe access, while allowing this method to be invoked without blocking
// or any contension. We cannot use atomic integers to replace the queue, since both of
// count variables need to be updated atomically together, to avoid waiting on the latch
// after the semaphore is signaled.
queue.async {
guard self.countDownValue > 0 else {
return
}

self.countDownValue -= 1
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
// lock to be only acquired at the time when the latch switches from closed to open.
guard conditionCount.value > 0 else {
return
}

if self.countDownValue == 0 {
// Wake up all waiting invocations, not just threads. We cannot rely on the returned
// value from dispatch_semaphore_signal since it returns true as long as any thread
// is woken, momentarily. When the same thread invokes await multiple times, semaphore
// signal method returns true even if it only unblocks the first await.
while self.waitingCount > 0 {
self.semaphore.signal()
self.waitingCount -= 1
}
}
let newValue = conditionCount.decrementAndGet()
// Check for <= since multiple threads can perform decrements concurrently.
if newValue <= 0 {
condition.lock()
condition.broadcast()
condition.unlock()
}
}

/// Causes the current thread to suspend until the latch counts down to zero.
///
/// - note: If the current count is already zero, this method returns immediately without suspending the current
/// thread.
/// - note: If the current count is already zero, this method returns immediately without
/// suspending the current thread.
///
/// - parameter timeout: The optional timeout value in seconds. If the latch is not counted down to zero before the
/// timeout, this method returns false. If not defined, the current thread will wait forever until the latch is
/// counted down to zero.
/// - returns: true if the latch is counted down to zero. false if the timeout occurred before the latch reaches
/// zero.
/// - parameter timeout: The optional timeout value in seconds. If the latch is not counted
/// down to zero before the timeout, this method returns false. If not defined, the current
/// thread will wait forever until the latch is counted down to zero.
/// - returns: true if the latch is counted down to zero. false if the timeout occurred before
/// the latch reaches zero.
@discardableResult
public func await(timeout: TimeInterval? = nil) -> Bool {
// Only use the queue to access counts but not the semaphore wait, since we need to ensure
// counts are always accessed from the queue's thread. Do not wait on the semaphore inside
// the queue, since the queue is serial, blocking the queue results in deadlock, since the
// semaphore signal will also be blocked.
let alreadyOpen: Bool = queue.sync {
let alreadyOpen = self.countDownValue <= 0
if !alreadyOpen {
self.waitingCount += 1
}
return alreadyOpen
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
// lock to be only acquired at the time when the latch switches from closed to open.
guard conditionCount.value > 0 else {
return true
}

if alreadyOpen {
return true
let deadline: Date
if let timeout = timeout {
deadline = Date().addingTimeInterval(timeout)
} else {
let deadline: DispatchTime
if let timeout = timeout {
deadline = DispatchTime.now() + DispatchTimeInterval.milliseconds(Int(timeout * 1000))
} else {
deadline = .distantFuture
}
return self.semaphore.wait(timeout: deadline) == .success
deadline = Date.distantFuture
}

condition.lock()
// Check count again after acquiring the lock, before entering waiting. This ensures the caller
// does not enter waiting after the last counting down occurs.
if conditionCount.value > 0 {
return condition.wait(until: deadline)
}
condition.unlock()
return true
}

// MARK: - Private

private let semaphore = DispatchSemaphore(value: 0)
// Use the serial queue to read and write to both the count variables. This allows us to
// ensure thread-safe access, while allowing this method to be invoked without blocking
// or any contension. We cannot use atomic integers to replace the queue, since both of
// count variables need to be updated atomically together, to avoid waiting on the latch
// after the semaphore is signaled.
private let queue = DispatchQueue(label: "CountDownLatch.executeQueue", qos: .userInteractive)

private var countDownValue: Int
private var waitingCount: Int
private let condition = NSCondition()
// Use `AtomicInt` to avoid contention during counting down and waiting. This allows the
// lock to be only acquired at the time when the latch switches from closed to open.
private let conditionCount: AtomicInt
}