/
CombineExtensions.swift
121 lines (104 loc) · 5.15 KB
/
CombineExtensions.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#if canImport(Combine)
import Combine
import Dispatch
extension Future {
/// Helper function to create an already-finished `Future` with the given value.
public static func success(_ output: Output) -> Future<Output, Failure> {
return Future { promise in
promise(.success(output))
}
}
/// Helper function to create an already-finished `Future` with the given error.
public static func failure(_ output: Failure) -> Future<Output, Failure> {
return Future { promise in
promise(.failure(output))
}
}
}
extension Publisher {
/**
Creates a Future from the first output only of the receiver.
Useful for operating on a `Future` when you still want the output of that stream to be a `Future`.
- Warning: If the receiver completes without outputting a value, then the returned `Future` will never be completed.
- Returns: A `Future` that will be completed either when the first value from this publisher is published, or when it fails with an error.
*/
public func firstOutputAsFuture() -> Future<Output, Failure> {
return Future<Output, Failure> { resolver in
let queue = DispatchQueue(label: "firstOutputAsFuture", attributes: .concurrent)
var inProgress: Bool = true
var cancellable: AnyCancellable?
cancellable = self.sink(
receiveCompletion: { completion in
queue.sync {
guard inProgress else { return }
if case let .failure(error) = completion {
resolver(.failure(error))
} else {
// Don't ever resolve this future, I guess.
}
_ = cancellable
}
},
receiveValue: { value in
queue.sync {
guard inProgress else { return }
resolver(.success(value))
inProgress = false
}
}
)
}
}
/**
Forwards events from the receiver to the given `Subject`. Optionally will also finish the subject when the receiver finishes.
This is mostly used for setting up a type that emits events with a `Subject` as an intermediary publisher, where you want the event publisher to be subscribable from an earlier timepoint than the receiver will be available to send events. For example, something like the following:
class MyCoordinator: BaseCoordinator {
private let eventSubject = PassthroughSubject<Event, Never>()
let events: AnyPublisher<Event, Never> { eventSubject.eraseToAnyPublisher() }
let subcomponent:
func start() {
super.start()
subcomponent.publisher.map { value -> Event in
// ...
}.send(to: eventSubject)
}
}
- Parameter subject: The `Subject` to send events to.
- Parameter finishSubject: If `true`, when the receiver receives a completion event, then this will also forward that. Defaults to false, which is likely what you want in the earlier example if you have multiple subcomponents publishing events that are sent to the same `Subject`.
*/
public func send<S: Subject>(to subject: S, finishSubject: Bool = false) -> AnyCancellable where S.Output == Self.Output, S.Failure == Self.Failure {
return sink(receiveCompletion: { completion in
switch completion {
case .finished:
if finishSubject {
subject.send(completion: .finished)
}
case .failure(let error):
subject.send(completion: .failure(error))
}
}, receiveValue: { value in
subject.send(value)
})
}
}
/** Merges the array of `Future`s into a single `Future` of an array of `Output`, keeping the original order they were in (as opposed to the order they finish in)
- Note: If any one future fails, then the entire future will fail.
- Parameter futures: The array of futures to wait for.
- Returns: A `Future` of an array of `Output`, in the order the original futures array was in.
*/
public func mergeManyAndKeepOrder<Output, Failure: Error>(_ futures: [Future<Output, Failure>]) -> Future<[Output], Failure> {
let publishers: [AnyPublisher<(item: Output, index: Int), Failure>] = futures
.enumerated()
.map { (index: Int, publisher: Future<Output, Failure>) -> AnyPublisher<(item: Output, index: Int), Failure> in
return publisher.map { item in (item, index) }.eraseToAnyPublisher()
}
return Publishers.MergeMany(publishers)
.collect()
.map { (values: [(item: Output, index: Int)]) -> [Output] in
return values.sorted { lhs, rhs in
return lhs.index < rhs.index
}.map { $0.item }
}
.firstOutputAsFuture()
}
#endif