From 7431a87eec77fc1d85d25cea4c7175666d4092ab Mon Sep 17 00:00:00 2001 From: Richard Howell Date: Fri, 30 Nov 2018 12:50:42 -0800 Subject: [PATCH 1/2] add an option to limit max concurrency --- .../Executor/ConcurrentSequenceExecutor.swift | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift b/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift index 5a7be21..5d9a1fc 100644 --- a/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift +++ b/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift @@ -33,8 +33,13 @@ public class ConcurrentSequenceExecutor: SequenceExecutor { /// reported error contains the ID of the task that was being executed /// when the timeout occurred. The tracking does incur a minor /// performance cost. This value defaults to `false`. - public init(name: String, qos: DispatchQoS = .userInitiated, shouldTrackTaskId: Bool = false) { + /// - parameter maxConcurrentTasks: limits the maximum number of tasks + /// run concurrently. Defaults to 0, which will set the max to the number + /// of processors. + public init(name: String, qos: DispatchQoS = .userInitiated, shouldTrackTaskId: Bool = false, maxConcurrentTasks: Int = 0) { taskQueue = DispatchQueue(label: "Executor.taskQueue-\(name)", qos: qos, attributes: .concurrent) + let semaphoreCount = maxConcurrentTasks == 0 ? ProcessInfo().processorCount : maxConcurrentTasks + taskSemaphore = DispatchSemaphore(value: semaphoreCount) self.shouldTrackTaskId = shouldTrackTaskId } @@ -58,10 +63,16 @@ public class ConcurrentSequenceExecutor: SequenceExecutor { // MARK: - Private private let taskQueue: DispatchQueue + private let taskSemaphore: DispatchSemaphore private let shouldTrackTaskId: Bool private func execute(_ task: Task, with sequenceHandle: SynchronizedSequenceExecutionHandle, _ execution: @escaping (Task, Any) -> SequenceExecution) { + taskSemaphore.wait() taskQueue.async { + defer { + self.taskSemaphore.signal() + } + guard !sequenceHandle.isCancelled else { return } From 997b2ad5956230623ea56547cfb630fd3dc7c3b1 Mon Sep 17 00:00:00 2001 From: Richard Howell Date: Fri, 30 Nov 2018 13:07:55 -0800 Subject: [PATCH 2/2] default to Int.max concurrent tasks --- .../Concurrency/Executor/ConcurrentSequenceExecutor.swift | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift b/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift index 5d9a1fc..2409976 100644 --- a/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift +++ b/Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift @@ -34,12 +34,10 @@ public class ConcurrentSequenceExecutor: SequenceExecutor { /// when the timeout occurred. The tracking does incur a minor /// performance cost. This value defaults to `false`. /// - parameter maxConcurrentTasks: limits the maximum number of tasks - /// run concurrently. Defaults to 0, which will set the max to the number - /// of processors. - public init(name: String, qos: DispatchQoS = .userInitiated, shouldTrackTaskId: Bool = false, maxConcurrentTasks: Int = 0) { + /// run concurrently. Defaults to Int.max. + public init(name: String, qos: DispatchQoS = .userInitiated, shouldTrackTaskId: Bool = false, maxConcurrentTasks: Int = Int.max) { taskQueue = DispatchQueue(label: "Executor.taskQueue-\(name)", qos: qos, attributes: .concurrent) - let semaphoreCount = maxConcurrentTasks == 0 ? ProcessInfo().processorCount : maxConcurrentTasks - taskSemaphore = DispatchSemaphore(value: semaphoreCount) + taskSemaphore = DispatchSemaphore(value: maxConcurrentTasks) self.shouldTrackTaskId = shouldTrackTaskId }