-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathQLParallelSegment+OperationRunner.swift
84 lines (74 loc) · 2.69 KB
/
QLParallelSegment+OperationRunner.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import Dispatch
internal extension QLParallelSegment {
class OperationBox {
var id: AnyHashable
var operation: ParallelOperation
var completed: Bool
var value: Any?
var queue: DispatchQueue?
init(_ id: AnyHashable,
_ operation: @escaping ParallelOperation,
_ queue: DispatchQueue?) {
self.id = id
self.operation = operation
self.completed = false
self.value = nil
self.queue = queue
}
}
class OperationRunner: Hashable {
let completionQueue = DispatchQueue.init(label: "QLParallelSegment.CompletionQueue")
init(_ id: AnyHashable,
_ operations: [(AnyHashable, ParallelOperation)],
_ queues: [AnyHashable:DispatchQueue]) {
self.id = id
self.operations = operations.map {
return OperationBox($0.0, $0.1, queues[$0.0])
}
}
var id: AnyHashable
var operations: [OperationBox] = []
var completion: ( ([(AnyHashable, Any?)])->() )!
var errorCompletion: ( (Error)->() )!
func run(_ input: Input?) {
for opBox in self.operations {
let queue = opBox.queue ?? DispatchQueue.main
queue.async { self.runOperation(input, opBox) }
}
}
func runOperation(_ input: Input?, _ opBox: OperationBox) {
do { try opBox.operation(input, { output in
opBox.value = output
opBox.completed = true
self.allCompleted = true })
} catch {
self.errorCompletion(error)
}
}
var allCompleted: Bool {
get {
var isCompleted: Bool = false
completionQueue.sync { isCompleted = self.__allCompleted }
return isCompleted
}
set {
completionQueue.sync {
if (self.__allCompleted == true) { return }
self.__allCompleted = self.operations
.reduce(true, { ($1.completed) ? $0 : false } )
if (self.__allCompleted == true) {
self.completion(operations.map { ($0.id, $0.value) })
}
}
}
}
private var __allCompleted: Bool = false
static func == (lhs: QLParallelSegment<Input, Output>.OperationRunner,
rhs: QLParallelSegment<Input, Output>.OperationRunner) -> Bool {
return lhs.id == rhs.id
}
func hash(into hasher: inout Hasher) {
hasher.combine(id)
}
}
}