@@ -6,6 +6,8 @@ import Foundation
6
6
public actor HubConnection {
7
7
private static let defaultTimeout : TimeInterval = 30
8
8
private static let defaultPingInterval : TimeInterval = 15
9
+ private static let defaultStatefulReconnectBufferSize : Int = 100_000_000 // bytes of messages
10
+
9
11
private var invocationBinder : DefaultInvocationBinder
10
12
private var invocationHandler : InvocationHandler
11
13
@@ -17,6 +19,7 @@ public actor HubConnection {
17
19
private let retryPolicy : RetryPolicy
18
20
private let keepAliveScheduler : TimeScheduler
19
21
private let serverTimeoutScheduler : TimeScheduler
22
+ private let statefulReconnectBufferSize : Int
20
23
21
24
private var connectionStarted : Bool = false
22
25
private var receivedHandshakeResponse : Bool = false
@@ -39,9 +42,12 @@ public actor HubConnection {
39
42
hubProtocol: HubProtocol ,
40
43
retryPolicy: RetryPolicy ,
41
44
serverTimeout: TimeInterval ? ,
42
- keepAliveInterval: TimeInterval ? ) {
45
+ keepAliveInterval: TimeInterval ? ,
46
+ statefulReconnectBufferSize: Int ? ) {
43
47
self . serverTimeout = serverTimeout ?? HubConnection . defaultTimeout
44
48
self . keepAliveInterval = keepAliveInterval ?? HubConnection . defaultPingInterval
49
+ self . statefulReconnectBufferSize = statefulReconnectBufferSize ?? HubConnection . defaultStatefulReconnectBufferSize
50
+
45
51
self . logger = logger
46
52
self . retryPolicy = retryPolicy
47
53
@@ -107,29 +113,77 @@ public actor HubConnection {
107
113
}
108
114
109
115
public func send( method: String , arguments: Any ... ) async throws {
110
- let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( arguments) , streamIds: nil , headers: nil , invocationId: nil )
116
+ let ( nonstreamArguments, streamArguments) = splitStreamArguments ( arguments: arguments)
117
+ let streamIds = await invocationHandler. createClientStreamIds ( count: streamArguments. count)
118
+ let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( nonstreamArguments) , streamIds: streamIds, headers: nil , invocationId: nil )
111
119
let data = try hubProtocol. writeMessage ( message: invocationMessage)
112
120
logger. log ( level: . debug, message: " Sending message to target: \( method) " )
113
121
try await sendMessageInternal ( data)
122
+ launchStreams ( streamIds: streamIds, clientStreams: streamArguments)
123
+ }
124
+
125
+ private func splitStreamArguments( arguments: Any ... ) -> ( [ Any ] , [ any AsyncSequence ] ) {
126
+ var nonstreamArguments : [ Any ] = [ ]
127
+ var streamArguments : [ any AsyncSequence ] = [ ]
128
+ for argument in arguments {
129
+ if let stream = argument as? ( any AsyncSequence ) {
130
+ streamArguments. append ( stream)
131
+ } else {
132
+ nonstreamArguments. append ( argument)
133
+ }
134
+ }
135
+ return ( nonstreamArguments, streamArguments)
114
136
}
115
137
138
+ private func launchStreams( streamIds: [ String ] , clientStreams: [ any AsyncSequence ] ) {
139
+ for i in 0 ..< streamIds. count {
140
+ Task {
141
+ let stream = clientStreams [ i]
142
+ var err : String ? = nil
143
+ do {
144
+ for try await item in stream {
145
+ let streamItem = StreamItemMessage ( invocationId: streamIds [ i] , item: AnyEncodable ( item) , headers: nil )
146
+ let data = try hubProtocol. writeMessage ( message: streamItem)
147
+ try await sendMessageInternal ( data)
148
+ }
149
+ } catch {
150
+ err = " \( error) "
151
+ logger. log ( level: . error, message: " Fail to send client stream message : \( error) " )
152
+ }
153
+ do {
154
+ let completionMessage = CompletionMessage ( invocationId: streamIds [ i] , error: err, result: AnyEncodable ( nil ) , headers: nil )
155
+ let data = try hubProtocol. writeMessage ( message: completionMessage)
156
+ try await sendMessageInternal ( data)
157
+ } catch {
158
+ logger. log ( level: . error, message: " Fail to send client stream complete message : \( error) " )
159
+ }
160
+ }
161
+ }
162
+ }
163
+
116
164
public func invoke( method: String , arguments: Any ... ) async throws -> Void {
165
+ let ( nonstreamArguments, streamArguments) = splitStreamArguments ( arguments: arguments)
166
+ let streamIds = await invocationHandler. createClientStreamIds ( count: streamArguments. count)
117
167
let ( invocationId, tcs) = await invocationHandler. create ( )
118
- let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( arguments ) , streamIds: nil , headers: nil , invocationId: invocationId)
168
+ let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( nonstreamArguments ) , streamIds: streamIds , headers: nil , invocationId: invocationId)
119
169
let data = try hubProtocol. writeMessage ( message: invocationMessage)
120
170
logger. log ( level: . debug, message: " Invoke message to target: \( method) , invocationId: \( invocationId) " )
121
171
try await sendMessageInternal ( data)
172
+ launchStreams ( streamIds: streamIds, clientStreams: streamArguments)
122
173
_ = try await tcs. task ( )
123
174
}
124
175
125
176
public func invoke< TReturn> ( method: String , arguments: Any ... ) async throws -> TReturn {
177
+ let ( nonstreamArguments, streamArguments) = splitStreamArguments ( arguments: arguments)
178
+ let streamIds = await invocationHandler. createClientStreamIds ( count: streamArguments. count)
126
179
let ( invocationId, tcs) = await invocationHandler. create ( )
127
180
invocationBinder. registerReturnValueType ( invocationId: invocationId, types: TReturn . self)
128
- let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( arguments ) , streamIds: nil , headers: nil , invocationId: invocationId)
181
+ let invocationMessage = InvocationMessage ( target: method, arguments: AnyEncodableArray ( nonstreamArguments ) , streamIds: streamIds , headers: nil , invocationId: invocationId)
129
182
do {
130
183
let data = try hubProtocol. writeMessage ( message: invocationMessage)
131
184
logger. log ( level: . debug, message: " Invoke message to target: \( method) , invocationId: \( invocationId) " )
132
185
try await sendMessageInternal ( data)
186
+ launchStreams ( streamIds: streamIds, clientStreams: streamArguments)
133
187
} catch {
134
188
await invocationHandler. cancel ( invocationId: invocationId, error: error)
135
189
invocationBinder. removeReturnValueType ( invocationId: invocationId)
@@ -144,13 +198,16 @@ public actor HubConnection {
144
198
}
145
199
146
200
public func stream< Element> ( method: String , arguments: Any ... ) async throws -> any StreamResult < Element > {
201
+ let ( nonstreamArguments, streamArguments) = splitStreamArguments ( arguments: arguments)
202
+ let streamIds = await invocationHandler. createClientStreamIds ( count: streamArguments. count)
147
203
let ( invocationId, stream) = await invocationHandler. createStream ( )
148
204
invocationBinder. registerReturnValueType ( invocationId: invocationId, types: Element . self)
149
- let StreamInvocationMessage = StreamInvocationMessage ( invocationId: invocationId, target: method, arguments: AnyEncodableArray ( arguments ) , streamIds: nil , headers: nil )
205
+ let StreamInvocationMessage = StreamInvocationMessage ( invocationId: invocationId, target: method, arguments: AnyEncodableArray ( nonstreamArguments ) , streamIds: streamIds , headers: nil )
150
206
do {
151
207
let data = try hubProtocol. writeMessage ( message: StreamInvocationMessage)
152
208
logger. log ( level: . debug, message: " Stream message to target: \( method) , invocationId: \( invocationId) " )
153
209
try await sendMessageInternal ( data)
210
+ launchStreams ( streamIds: streamIds, clientStreams: streamArguments)
154
211
} catch {
155
212
await invocationHandler. cancel ( invocationId: invocationId, error: error)
156
213
invocationBinder. removeReturnValueType ( invocationId: invocationId)
@@ -672,6 +729,14 @@ public actor HubConnection {
672
729
}
673
730
return ( id, stream)
674
731
}
732
+
733
+ func createClientStreamIds( count: Int ) -> [ String ] {
734
+ var streamIds : [ String ] = [ ]
735
+ for _ in 0 ..< count {
736
+ streamIds. append ( nextId ( ) )
737
+ }
738
+ return streamIds
739
+ }
675
740
676
741
func setResult( message: CompletionMessage ) async {
677
742
if let invocation = invocations [ message. invocationId!] {
0 commit comments