Skip to content

Commit fd03882

Browse files
authored
Add message buffer (#73)
* Add message buffer * Some improvement * Improve again
1 parent aa26c3f commit fd03882

File tree

8 files changed

+383
-12
lines changed

8 files changed

+383
-12
lines changed

Sources/SignalRClient/HubConnection.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import Foundation
66
public actor HubConnection {
77
private static let defaultTimeout: TimeInterval = 30
88
private static let defaultPingInterval: TimeInterval = 15
9+
private static let defaultStatefulReconnectBufferSize: Int = 100_000_000 // bytes of messages
10+
911
private var invocationBinder: DefaultInvocationBinder
1012
private var invocationHandler: InvocationHandler
1113

@@ -17,6 +19,7 @@ public actor HubConnection {
1719
private let retryPolicy: RetryPolicy
1820
private let keepAliveScheduler: TimeScheduler
1921
private let serverTimeoutScheduler: TimeScheduler
22+
private let statefulReconnectBufferSize: Int
2023

2124
private var connectionStarted: Bool = false
2225
private var receivedHandshakeResponse: Bool = false
@@ -39,9 +42,12 @@ public actor HubConnection {
3942
hubProtocol: HubProtocol,
4043
retryPolicy: RetryPolicy,
4144
serverTimeout: TimeInterval?,
42-
keepAliveInterval: TimeInterval?) {
45+
keepAliveInterval: TimeInterval?,
46+
statefulReconnectBufferSize: Int?) {
4347
self.serverTimeout = serverTimeout ?? HubConnection.defaultTimeout
4448
self.keepAliveInterval = keepAliveInterval ?? HubConnection.defaultPingInterval
49+
self.statefulReconnectBufferSize = statefulReconnectBufferSize ?? HubConnection.defaultStatefulReconnectBufferSize
50+
4551
self.logger = logger
4652
self.retryPolicy = retryPolicy
4753

Sources/SignalRClient/HubConnectionBuilder.swift

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public class HubConnectionBuilder {
1212
private var keepAliveInterval: TimeInterval?
1313
private var url: String?
1414
private var retryPolicy: RetryPolicy?
15+
private var statefulReconnectBufferSize: Int?
1516
private var httpConnectionOptions: HttpConnectionOptions = HttpConnectionOptions()
1617

1718
public init() {}
@@ -79,6 +80,16 @@ public class HubConnectionBuilder {
7980
return self
8081
}
8182

83+
public func withStatefulReconnect() -> HubConnectionBuilder {
84+
return withStatefulReconnect(options: StatefulReconnectOptions())
85+
}
86+
87+
public func withStatefulReconnect(options: StatefulReconnectOptions) -> HubConnectionBuilder {
88+
self.statefulReconnectBufferSize = options.bufferSize
89+
self.httpConnectionOptions.useStatefulReconnect = true
90+
return self
91+
}
92+
8293
public func build() -> HubConnection {
8394
guard let url = url else {
8495
fatalError("url must be set with .withUrl(String:)")
@@ -94,7 +105,8 @@ public class HubConnectionBuilder {
94105
hubProtocol: hubProtocol,
95106
retryPolicy: retryPolicy,
96107
serverTimeout: serverTimeout,
97-
keepAliveInterval: keepAliveInterval)
108+
keepAliveInterval: keepAliveInterval,
109+
statefulReconnectBufferSize: statefulReconnectBufferSize)
98110
}
99111
}
100112

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
import Foundation
5+
6+
actor MessageBuffer {
7+
private var maxBufferSize: Int
8+
private var messages: [BufferedItem] = []
9+
private var bufferedByteCount: Int = 0
10+
private var totalMessageCount: Int = 0
11+
private var lastSendSequenceId: Int = 0
12+
private var nextSendIdx = 0
13+
private var dequeueContinuations: [CheckedContinuation<Bool, Never>] = []
14+
private var closed: Bool = false
15+
16+
init(bufferSize: Int) {
17+
self.maxBufferSize = bufferSize
18+
}
19+
20+
public func enqueue(content: StringOrData) async throws -> Void {
21+
if closed {
22+
throw SignalRError.invalidOperation("Message buffer has closed")
23+
}
24+
25+
var size: Int
26+
switch content {
27+
case .string(let str):
28+
size = str.lengthOfBytes(using: .utf8)
29+
case .data(let data):
30+
size = data.count
31+
}
32+
33+
bufferedByteCount = bufferedByteCount + size
34+
totalMessageCount = totalMessageCount + 1
35+
36+
return await withCheckedContinuation{ continuation in
37+
if (bufferedByteCount > maxBufferSize) {
38+
// If buffer is full, we're tring to backpressure the sending
39+
// id start from 1
40+
messages.append(BufferedItem(content: content, size: size, id: totalMessageCount, continuation: continuation))
41+
} else {
42+
messages.append(BufferedItem(content: content, size: size, id: totalMessageCount, continuation: nil))
43+
continuation.resume()
44+
}
45+
46+
while !dequeueContinuations.isEmpty {
47+
let continuation = dequeueContinuations.removeFirst()
48+
continuation.resume(returning: true)
49+
}
50+
}
51+
}
52+
53+
public func ack(sequenceId: Int) throws -> Bool {
54+
// It might be wrong ack or the ack of previous connection
55+
if (sequenceId <= 0 || sequenceId > lastSendSequenceId) {
56+
return false
57+
}
58+
59+
var ackedCount: Int = 0
60+
for item in messages {
61+
if (item.id <= sequenceId) {
62+
ackedCount = ackedCount + 1
63+
bufferedByteCount = bufferedByteCount - item.size
64+
if let ctu = item.continuation {
65+
ctu.resume()
66+
}
67+
} else if (bufferedByteCount <= maxBufferSize) {
68+
if let ctu = item.continuation {
69+
ctu.resume()
70+
}
71+
} else {
72+
break
73+
}
74+
}
75+
76+
messages = Array(messages.dropFirst(ackedCount))
77+
// sending idx will change because we changes the array
78+
nextSendIdx = nextSendIdx - ackedCount
79+
return true
80+
}
81+
82+
public func WaitToDequeue() async throws -> Bool {
83+
if (nextSendIdx < messages.count) {
84+
return true
85+
}
86+
87+
return await withCheckedContinuation { continuation in
88+
dequeueContinuations.append(continuation)
89+
}
90+
}
91+
92+
public func TryDequeue() throws -> StringOrData? {
93+
if (nextSendIdx < messages.count) {
94+
let item = messages[nextSendIdx]
95+
nextSendIdx = nextSendIdx + 1
96+
lastSendSequenceId = item.id
97+
return item.content
98+
}
99+
return nil
100+
}
101+
102+
public func ResetDequeue() async throws -> Void {
103+
nextSendIdx = 0
104+
lastSendSequenceId = messages.count > 0 ? messages[0].id : 0
105+
while !dequeueContinuations.isEmpty {
106+
let continuation = dequeueContinuations.removeFirst()
107+
continuation.resume(returning: true)
108+
}
109+
}
110+
111+
public func close() {
112+
closed = true
113+
while !dequeueContinuations.isEmpty {
114+
let continuation = dequeueContinuations.removeFirst()
115+
continuation.resume(returning: false)
116+
}
117+
}
118+
119+
private func isInvocationMessage(message: HubMessage) -> Bool {
120+
switch (message.type) {
121+
case .invocation, .streamItem, .completion, .streamInvocation, .cancelInvocation:
122+
return true
123+
case .close, .sequence, .ping, .ack:
124+
return false
125+
}
126+
}
127+
}
128+
129+
private class BufferedItem {
130+
let content: StringOrData
131+
let size: Int
132+
let id: Int
133+
let continuation: CheckedContinuation<Void, Never>?
134+
135+
init(content: StringOrData,
136+
size: Int,
137+
id: Int,
138+
continuation: CheckedContinuation<Void, Never>?) {
139+
self.content = content
140+
self.size = size
141+
self.id = id
142+
self.continuation = continuation
143+
}
144+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
public struct StatefulReconnectOptions {
2+
public var bufferSize: Int?
3+
}

Tests/SignalRClientTests/HubConnection+OnResultTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ final class HubConnectionOnResultTests: XCTestCase {
3131
hubProtocol: hubProtocol,
3232
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
3333
serverTimeout: nil,
34-
keepAliveInterval: nil
34+
keepAliveInterval: nil,
35+
statefulReconnectBufferSize: nil
3536
)
3637

3738
mockConnection.onSend = { data in

Tests/SignalRClientTests/HubConnection+OnTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ final class HubConnectionOnTests: XCTestCase {
2727
hubProtocol: hubProtocol,
2828
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
2929
serverTimeout: nil,
30-
keepAliveInterval: nil
30+
keepAliveInterval: nil,
31+
statefulReconnectBufferSize: nil
3132
)
3233

3334
mockConnection.onSend = { data in

Tests/SignalRClientTests/HubConnectionTests.swift

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ final class HubConnectionTests: XCTestCase {
6767
hubProtocol: hubProtocol,
6868
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
6969
serverTimeout: nil,
70-
keepAliveInterval: nil
70+
keepAliveInterval: nil,
71+
statefulReconnectBufferSize: nil
7172
)
7273
}
7374

@@ -178,7 +179,8 @@ final class HubConnectionTests: XCTestCase {
178179
hubProtocol: hubProtocol,
179180
retryPolicy: DefaultRetryPolicy(retryDelays: [1, 2, 3]), // Add some retry, but in this case, it shouldn't have effect
180181
serverTimeout: nil,
181-
keepAliveInterval: nil
182+
keepAliveInterval: nil,
183+
statefulReconnectBufferSize: nil
182184
)
183185

184186
let expectation = XCTestExpectation(description: "send() should be called")
@@ -206,7 +208,8 @@ final class HubConnectionTests: XCTestCase {
206208
hubProtocol: hubProtocol,
207209
retryPolicy: DefaultRetryPolicy(retryDelays: []),
208210
serverTimeout: nil,
209-
keepAliveInterval: nil
211+
keepAliveInterval: nil,
212+
statefulReconnectBufferSize: nil
210213
)
211214

212215
let sendExpectation = XCTestExpectation(description: "send() should be called")
@@ -247,7 +250,8 @@ final class HubConnectionTests: XCTestCase {
247250
hubProtocol: hubProtocol,
248251
retryPolicy: DefaultRetryPolicy(retryDelays: [0.1, 0.2, 0.3]), // Add some retry
249252
serverTimeout: nil,
250-
keepAliveInterval: nil
253+
keepAliveInterval: nil,
254+
statefulReconnectBufferSize: nil
251255
)
252256

253257
let sendExpectation = XCTestExpectation(description: "send() should be called")
@@ -314,7 +318,8 @@ final class HubConnectionTests: XCTestCase {
314318
hubProtocol: hubProtocol,
315319
retryPolicy: DefaultRetryPolicy(retryDelays: [0.1, 0.2]), // Limited retries
316320
serverTimeout: nil,
317-
keepAliveInterval: nil
321+
keepAliveInterval: nil,
322+
statefulReconnectBufferSize: nil
318323
)
319324

320325
let sendExpectation = XCTestExpectation(description: "send() should be called")
@@ -391,7 +396,8 @@ final class HubConnectionTests: XCTestCase {
391396
hubProtocol: hubProtocol,
392397
retryPolicy: retryPolicy, // Limited retries
393398
serverTimeout: nil,
394-
keepAliveInterval: nil
399+
keepAliveInterval: nil,
400+
statefulReconnectBufferSize: nil
395401
)
396402

397403
let sendExpectation = XCTestExpectation(description: "send() should be called")
@@ -480,7 +486,8 @@ final class HubConnectionTests: XCTestCase {
480486
hubProtocol: hubProtocol,
481487
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
482488
serverTimeout: nil,
483-
keepAliveInterval: keepAliveInterval
489+
keepAliveInterval: keepAliveInterval,
490+
statefulReconnectBufferSize: nil
484491
)
485492

486493
let handshakeExpectation = XCTestExpectation(description: "handshake should be called")
@@ -530,7 +537,8 @@ final class HubConnectionTests: XCTestCase {
530537
hubProtocol: hubProtocol,
531538
retryPolicy: DefaultRetryPolicy(retryDelays: []), // No retry
532539
serverTimeout: 0.1,
533-
keepAliveInterval: 99
540+
keepAliveInterval: 99,
541+
statefulReconnectBufferSize: nil
534542
)
535543

536544
let handshakeExpectation = XCTestExpectation(description: "handshake should be called")

0 commit comments

Comments
 (0)