-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathstream_support.swift
137 lines (113 loc) · 4.82 KB
/
stream_support.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* stream_support.swift
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import Flow
public protocol FlowStreamOpsAsyncIterator: AsyncIteratorProtocol where Element == AssociatedFutureStream.Element {
associatedtype AssociatedFutureStream: FlowStreamOps
init(_: AssociatedFutureStream)
}
/// Corresponds to `FlowSingleCallbackForSwiftContinuation: Flow.SingleCallback<T>`
public protocol FlowSingleCallbackForSwiftContinuationProtocol<AssociatedFutureStream> {
associatedtype AssociatedFutureStream: FlowStreamOps
typealias Element = AssociatedFutureStream.Element
init()
///
/// ```
/// void set(
/// const void * _Nonnull pointerToContinuationInstance,
/// FutureStream<T> fs,
/// const void * _Nonnull thisPointer)
/// ```
mutating func set(_ continuationPointer: UnsafeRawPointer,
_ stream: Self.AssociatedFutureStream,
_ thisPointer: UnsafeRawPointer)
}
public protocol FlowStreamOps: AsyncSequence
where AsyncIterator: FlowStreamOpsAsyncIterator,
AsyncIterator.Element == Self.Element,
AsyncIterator.AssociatedFutureStream == Self,
SingleCB.AssociatedFutureStream == Self {
/// Element type of the stream
associatedtype Element
// : C++ SingleCallback<T> FlowSingleCallbackForSwiftContinuationProtocol
associatedtype SingleCB: FlowSingleCallbackForSwiftContinuationProtocol
where SingleCB.AssociatedFutureStream == Self
/// FIXME: can't typealias like that, we need to repeat it everywhere: rdar://103021742 ([Sema] Crash during self referential generic requirement)
// typealias AsyncIterator = FlowStreamOpsAsyncIteratorAsyncIterator<Self>
// === ---------------------------------------------------------------------
// Exposed Swift capabilities
/// Suspends and awaits for the next element.
///
/// If the stream completes while we're waiting on it, this will return `nil`.
/// Other errors thrown by the stream are re-thrown by this computed property.
var waitNext: Element? {
mutating get async throws
}
/// Implements protocol requirement of `AsyncSequence`, and enables async-for looping over this type.
func makeAsyncIterator() -> AsyncIterator
// === ---------------------------------------------------------------------
// C++ API
func isReady() -> Bool
func isError() -> Bool
mutating func pop() -> Element
mutating func getError() -> Flow.Error
}
/// Default implementations that are good for all adopters of this protocol, generally no need to override.
extension FlowStreamOps {
public var waitNext: Element? {
mutating get async throws {
if self.isReady() {
if self.isError() {
let error = self.getError()
if error.isEndOfStream {
return nil
} else {
throw GeneralFlowError(error)
}
} else {
return self.pop()
}
} else {
var s = SingleCB()
return try await withCheckedThrowingContinuation { (cc: CheckedContinuation<Element, Swift.Error>) in
withUnsafeMutablePointer(to: &s) { ptr in
let ecc = FlowCheckedContinuation<Element>(cc)
withUnsafePointer(to: ecc) { ccPtr in
ptr.pointee.set(UnsafeRawPointer(ccPtr), self, UnsafeRawPointer(ptr))
}
}
}
}
}
}
public func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(self)
}
}
public struct FlowStreamOpsAsyncIteratorAsyncIterator<AssociatedFutureStream>: AsyncIteratorProtocol, FlowStreamOpsAsyncIterator
where AssociatedFutureStream: FlowStreamOps {
public typealias Element = AssociatedFutureStream.Element
var stream: AssociatedFutureStream
public init(_ stream: AssociatedFutureStream) {
self.stream = stream
}
public mutating func next() async throws -> Element? {
try await stream.waitNext
}
}