Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 36 additions & 99 deletions Concurrency.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,16 @@
objectVersion = 46;
objects = {

/* Begin PBXAggregateTarget section */
"Concurrency::ConcurrencyPackageTests::ProductTarget" /* ConcurrencyPackageTests */ = {
isa = PBXAggregateTarget;
buildConfigurationList = OBJ_39 /* Build configuration list for PBXAggregateTarget "ConcurrencyPackageTests" */;
buildPhases = (
);
dependencies = (
OBJ_42 /* PBXTargetDependency */,
);
name = ConcurrencyPackageTests;
productName = ConcurrencyPackageTests;
};
/* End PBXAggregateTarget section */

/* Begin PBXBuildFile section */
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */; };
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94840210A4744007E59C8 /* SequenceExecutor.swift */; };
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */; };
41B94846210A4744007E59C8 /* Task.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94842210A4744007E59C8 /* Task.swift */; };
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */; };
OBJ_27 /* AtomicBool.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_9 /* AtomicBool.swift */; };
OBJ_28 /* AtomicInt.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_10 /* AtomicInt.swift */; };
OBJ_29 /* AtomicReference.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_11 /* AtomicReference.swift */; };
OBJ_30 /* CountDownLatch.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_12 /* CountDownLatch.swift */; };
OBJ_37 /* Package.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_6 /* Package.swift */; };
OBJ_48 /* AtomicBoolTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_15 /* AtomicBoolTests.swift */; };
OBJ_49 /* AtomicIntTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_16 /* AtomicIntTests.swift */; };
OBJ_50 /* AtomicReferenceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = OBJ_17 /* AtomicReferenceTests.swift */; };
Expand All @@ -41,16 +31,14 @@
remoteGlobalIDString = "Concurrency::Concurrency";
remoteInfo = Concurrency;
};
412CDD2E20B88EAB00AF5890 /* PBXContainerItemProxy */ = {
isa = PBXContainerItemProxy;
containerPortal = OBJ_1 /* Project object */;
proxyType = 1;
remoteGlobalIDString = "Concurrency::ConcurrencyTests";
remoteInfo = ConcurrencyTests;
};
/* End PBXContainerItemProxy section */

/* Begin PBXFileReference section */
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SerialSequenceExecutor.swift; sourceTree = "<group>"; };
41B94840210A4744007E59C8 /* SequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SequenceExecutor.swift; sourceTree = "<group>"; };
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutor.swift; sourceTree = "<group>"; };
41B94842210A4744007E59C8 /* Task.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Task.swift; sourceTree = "<group>"; };
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentSequenceExecutorTests.swift; sourceTree = "<group>"; };
"Concurrency::Concurrency::Product" /* Concurrency.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; path = Concurrency.framework; sourceTree = BUILT_PRODUCTS_DIR; };
"Concurrency::ConcurrencyTests::Product" /* ConcurrencyTests.xctest */ = {isa = PBXFileReference; lastKnownFileType = file; path = ConcurrencyTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
OBJ_10 /* AtomicInt.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AtomicInt.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -83,6 +71,25 @@
/* End PBXFrameworksBuildPhase section */

/* Begin PBXGroup section */
41B9483E210A4744007E59C8 /* Executor */ = {
isa = PBXGroup;
children = (
41B9483F210A4744007E59C8 /* SerialSequenceExecutor.swift */,
41B94840210A4744007E59C8 /* SequenceExecutor.swift */,
41B94841210A4744007E59C8 /* ConcurrentSequenceExecutor.swift */,
41B94842210A4744007E59C8 /* Task.swift */,
);
path = Executor;
sourceTree = "<group>";
};
41B94847210A4756007E59C8 /* Executor */ = {
isa = PBXGroup;
children = (
41B94848210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift */,
);
path = Executor;
sourceTree = "<group>";
};
OBJ_13 /* Tests */ = {
isa = PBXGroup;
children = (
Expand All @@ -94,6 +101,7 @@
OBJ_14 /* ConcurrencyTests */ = {
isa = PBXGroup;
children = (
41B94847210A4756007E59C8 /* Executor */,
OBJ_15 /* AtomicBoolTests.swift */,
OBJ_16 /* AtomicIntTests.swift */,
OBJ_17 /* AtomicReferenceTests.swift */,
Expand Down Expand Up @@ -133,6 +141,7 @@
OBJ_8 /* Concurrency */ = {
isa = PBXGroup;
children = (
41B9483E210A4744007E59C8 /* Executor */,
OBJ_9 /* AtomicBool.swift */,
OBJ_10 /* AtomicInt.swift */,
OBJ_11 /* AtomicReference.swift */,
Expand Down Expand Up @@ -178,20 +187,6 @@
productReference = "Concurrency::ConcurrencyTests::Product" /* ConcurrencyTests.xctest */;
productType = "com.apple.product-type.bundle.unit-test";
};
"Concurrency::SwiftPMPackageDescription" /* ConcurrencyPackageDescription */ = {
isa = PBXNativeTarget;
buildConfigurationList = OBJ_33 /* Build configuration list for PBXNativeTarget "ConcurrencyPackageDescription" */;
buildPhases = (
OBJ_36 /* Sources */,
);
buildRules = (
);
dependencies = (
);
name = ConcurrencyPackageDescription;
productName = ConcurrencyPackageDescription;
productType = "com.apple.product-type.framework";
};
/* End PBXNativeTarget section */

/* Begin PBXProject section */
Expand All @@ -213,8 +208,6 @@
projectRoot = "";
targets = (
"Concurrency::Concurrency" /* Concurrency */,
"Concurrency::SwiftPMPackageDescription" /* ConcurrencyPackageDescription */,
"Concurrency::ConcurrencyPackageTests::ProductTarget" /* ConcurrencyPackageTests */,
"Concurrency::ConcurrencyTests" /* ConcurrencyTests */,
);
};
Expand All @@ -225,27 +218,24 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 0;
files = (
41B94846210A4744007E59C8 /* Task.swift in Sources */,
OBJ_27 /* AtomicBool.swift in Sources */,
41B94844210A4744007E59C8 /* SequenceExecutor.swift in Sources */,
41B94845210A4744007E59C8 /* ConcurrentSequenceExecutor.swift in Sources */,
OBJ_28 /* AtomicInt.swift in Sources */,
OBJ_29 /* AtomicReference.swift in Sources */,
41B94843210A4744007E59C8 /* SerialSequenceExecutor.swift in Sources */,
OBJ_30 /* CountDownLatch.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
OBJ_36 /* Sources */ = {
isa = PBXSourcesBuildPhase;
buildActionMask = 0;
files = (
OBJ_37 /* Package.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
OBJ_47 /* Sources */ = {
isa = PBXSourcesBuildPhase;
buildActionMask = 0;
files = (
OBJ_48 /* AtomicBoolTests.swift in Sources */,
OBJ_49 /* AtomicIntTests.swift in Sources */,
41B94849210A4756007E59C8 /* ConcurrentSequenceExecutorTests.swift in Sources */,
OBJ_50 /* AtomicReferenceTests.swift in Sources */,
OBJ_51 /* CountDownLatchTests.swift in Sources */,
);
Expand All @@ -254,11 +244,6 @@
/* End PBXSourcesBuildPhase section */

/* Begin PBXTargetDependency section */
OBJ_42 /* PBXTargetDependency */ = {
isa = PBXTargetDependency;
target = "Concurrency::ConcurrencyTests" /* ConcurrencyTests */;
targetProxy = 412CDD2E20B88EAB00AF5890 /* PBXContainerItemProxy */;
};
OBJ_54 /* PBXTargetDependency */ = {
isa = PBXTargetDependency;
target = "Concurrency::Concurrency" /* Concurrency */;
Expand Down Expand Up @@ -336,24 +321,6 @@
};
name = Debug;
};
OBJ_34 /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
LD = /usr/bin/true;
OTHER_SWIFT_FLAGS = "-swift-version 4 -I $(TOOLCHAIN_DIR)/usr/lib/swift/pm/4 -target x86_64-apple-macosx10.10 -sdk /Applications/Xcode.9.3.0.9E145.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.13.sdk";
SWIFT_VERSION = 4.0;
};
name = Debug;
};
OBJ_35 /* Release */ = {
isa = XCBuildConfiguration;
buildSettings = {
LD = /usr/bin/true;
OTHER_SWIFT_FLAGS = "-swift-version 4 -I $(TOOLCHAIN_DIR)/usr/lib/swift/pm/4 -target x86_64-apple-macosx10.10 -sdk /Applications/Xcode.9.3.0.9E145.app/Contents/Developer/Platforms/MacOSX.platform/Developer/SDKs/MacOSX10.13.sdk";
SWIFT_VERSION = 4.0;
};
name = Release;
};
OBJ_4 /* Release */ = {
isa = XCBuildConfiguration;
buildSettings = {
Expand All @@ -375,18 +342,6 @@
};
name = Release;
};
OBJ_40 /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
};
name = Debug;
};
OBJ_41 /* Release */ = {
isa = XCBuildConfiguration;
buildSettings = {
};
name = Release;
};
OBJ_45 /* Debug */ = {
isa = XCBuildConfiguration;
buildSettings = {
Expand Down Expand Up @@ -446,24 +401,6 @@
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
};
OBJ_33 /* Build configuration list for PBXNativeTarget "ConcurrencyPackageDescription" */ = {
isa = XCConfigurationList;
buildConfigurations = (
OBJ_34 /* Debug */,
OBJ_35 /* Release */,
);
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
};
OBJ_39 /* Build configuration list for PBXAggregateTarget "ConcurrencyPackageTests" */ = {
isa = XCConfigurationList;
buildConfigurations = (
OBJ_40 /* Debug */,
OBJ_41 /* Release */,
);
defaultConfigurationIsVisible = 0;
defaultConfigurationName = Release;
};
OBJ_44 /* Build configuration list for PBXNativeTarget "ConcurrencyTests" */ = {
isa = XCConfigurationList;
buildConfigurations = (
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ Provides locking-free synchronization of a mutable object reference. It provides
### `CountDownLatch`
A utility class that allows coordination between threads. A count down latch starts with an initial count. Threads can then decrement the count until it reaches zero, at which point, the suspended waiting thread shall proceed. A `CountDownLatch` behaves differently from a `DispatchSemaphore` once the latch is open. Unlike a semaphore where subsequent waits would still block the caller thread, once a `CountDownLatch` is open, all subsequent waits can directly passthrough.

### `ConcurrentSequenceExecutor`
An execution utility that executes sequences of tasks and returns the final result in a highly concurrent environment.

### `SerialSequenceExecutor`
A debugging executor that executes sequences of tasks and returns the final result serially on the caller thread.

## Installation

### Carthage
Expand Down Expand Up @@ -88,4 +94,4 @@ Or you can follow the steps above to generate a Xcode project and run tests with


## License
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
[![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency.svg?type=large)](https://app.fossa.io/projects/git%2Bgithub.com%2Fuber%2Fswift-concurrency?ref=badge_large)
114 changes: 114 additions & 0 deletions Sources/Concurrency/Executor/ConcurrentSequenceExecutor.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//
// Copyright (c) 2018. Uber Technologies
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

import Foundation

/// An executor that executes sequences of tasks concurrently.
///
/// - seeAlso: `SequenceExecutor`.
/// - seeAlso: `Task`.
public class ConcurrentSequenceExecutor: SequenceExecutor {

/// Initializer.
///
/// - parameter name: The name of the executor.
/// - parameter qos: The quality of service of this executor. This
/// defaults to `userInitiated`.
public init(name: String, qos: DispatchQoS = .userInitiated) {
taskQueue = DispatchQueue(label: "Executor.taskQueue-\(name)", qos: qos, attributes: .concurrent)
}

/// Execute a sequence of tasks concurrently from the given initial task.
///
/// - parameter initialTask: The root task of the sequence of tasks
/// to be executed.
/// - parameter execution: The execution defining the sequence of tasks.
/// When a task completes its execution, this closure is invoked with
/// the task and its produced result. This closure is invoked from
/// multiple threads concurrently, therefore it must be thread-safe.
/// The tasks provided by this closure are executed concurrently.
/// - returns: The execution handle that allows control and monitoring
/// of the sequence of tasks being executed.
public func executeSequence<SequenceResultType>(from initialTask: Task, with execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) -> SequenceExecutionHandle<SequenceResultType> {
let handle: SynchronizedSequenceExecutionHandle<SequenceResultType> = SynchronizedSequenceExecutionHandle()
execute(initialTask, with: handle, execution)
return handle
}

// MARK: - Private

private let taskQueue: DispatchQueue

private func execute<SequenceResultType>(_ task: Task, with sequenceHandle: SynchronizedSequenceExecutionHandle<SequenceResultType>, _ execution: @escaping (Task, Any) -> SequenceExecution<SequenceResultType>) {
taskQueue.async {
guard !sequenceHandle.isCancelled else {
return
}

let result = task.typeErasedExecute()
let nextExecution = execution(task, result)
switch nextExecution {
case .continueSequence(let nextTask):
self.execute(nextTask, with: sequenceHandle, execution)
case .endOfSequence(let result):
sequenceHandle.sequenceDidComplete(with: result)
}
}
}
}

private class SynchronizedSequenceExecutionHandle<SequenceResultType>: SequenceExecutionHandle<SequenceResultType> {

private let latch = CountDownLatch(count: 1)
private let didCancel = AtomicBool(initialValue: false)

// Use a lock to ensure result is properly accessed, since the read
// `await` method may be invoked on a different thread than the write
// `sequenceDidComplete` method.
private let resultLock = NSRecursiveLock()
private var result: SequenceResultType?

fileprivate var isCancelled: Bool {
return didCancel.value
}

fileprivate override func await(withTimeout timeout: TimeInterval?) throws -> SequenceResultType {
let didComplete = latch.await(timeout: timeout)
if !didComplete {
throw SequenceExecutionError.awaitTimeout
}

resultLock.lock()
defer {
resultLock.unlock()
}
// If latch was counted down, the result must have been set. Therefore,
// this forced unwrap is safe.
return result!
}

fileprivate func sequenceDidComplete(with result: SequenceResultType) {
resultLock.lock()
self.result = result
resultLock.unlock()

latch.countDown()
}

fileprivate override func cancel() {
didCancel.compareAndSet(expect: false, newValue: true)
}
}
Loading