Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple streams #442

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ public final class ConnectionPool<

}

public func connection(_ connection: Connection, didReceiveNewMaxStreamSetting: UInt16) {

public func connectionReceivedNewMaxStreamSetting(_ connection: Connection, newMaxStreamSetting maxStreams: UInt16) {
self.modifyStateAndRunActions { state in
state.stateMachine.connectionReceivedNewMaxStreamSetting(connection.id, newMaxStreamSetting: maxStreams)
}
}

public func run() async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,50 @@ extension PoolStateMachine {
return self.connections[index].timerScheduled(timer, cancelContinuation: cancelContinuation)
}

// MARK: Changes at runtime

@usableFromInline
struct NewMaxStreamInfo {

@usableFromInline
var index: Int

@usableFromInline
var newMaxStreams: UInt16

@usableFromInline
var oldMaxStreams: UInt16

@usableFromInline
var usedStreams: UInt16

@inlinable
init(index: Int, info: ConnectionState.NewMaxStreamInfo) {
self.index = index
self.newMaxStreams = info.newMaxStreams
self.oldMaxStreams = info.oldMaxStreams
self.usedStreams = info.usedStreams
}
}

@inlinable
mutating func connectionReceivedNewMaxStreamSetting(
_ connectionID: ConnectionID,
newMaxStreamSetting maxStreams: UInt16
) -> NewMaxStreamInfo? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
return nil
}

guard let info = self.connections[index].newMaxStreamSetting(maxStreams) else {
return nil
}

self.stats.availableStreams += maxStreams - info.oldMaxStreams

return NewMaxStreamInfo(index: index, info: info)
}

// MARK: Leasing and releasing

/// Lease a connection, if an idle connection is available.
Expand Down Expand Up @@ -424,9 +468,9 @@ extension PoolStateMachine {

/// Closes the connection at the given index.
@inlinable
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction {
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction? {
guard let closeAction = self.connections[index].closeIfIdle() else {
preconditionFailure("Invalid state: \(self)")
return nil // apparently the connection isn't idle
}

self.stats.idle -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,53 @@ extension PoolStateMachine {
}
}

@usableFromInline
struct NewMaxStreamInfo {
@usableFromInline
var newMaxStreams: UInt16

@usableFromInline
var oldMaxStreams: UInt16

@usableFromInline
var usedStreams: UInt16

@inlinable
init(newMaxStreams: UInt16, oldMaxStreams: UInt16, usedStreams: UInt16) {
self.newMaxStreams = newMaxStreams
self.oldMaxStreams = oldMaxStreams
self.usedStreams = usedStreams
}
}

@inlinable
mutating func newMaxStreamSetting(_ newMaxStreams: UInt16) -> NewMaxStreamInfo? {
switch self.state {
case .starting, .backingOff:
preconditionFailure("Invalid state: \(self.state)")

case .idle(let connection, let oldMaxStreams, let keepAlive, idleTimer: let idleTimer):
self.state = .idle(connection, maxStreams: newMaxStreams, keepAlive: keepAlive, idleTimer: idleTimer)
return NewMaxStreamInfo(
newMaxStreams: newMaxStreams,
oldMaxStreams: oldMaxStreams,
usedStreams: keepAlive.usedStreams
)

case .leased(let connection, let usedStreams, let oldMaxStreams, let keepAlive):
self.state = .leased(connection, usedStreams: usedStreams, maxStreams: newMaxStreams, keepAlive: keepAlive)
return NewMaxStreamInfo(
newMaxStreams: newMaxStreams,
oldMaxStreams: oldMaxStreams,
usedStreams: usedStreams + keepAlive.usedStreams
)

case .closing, .closed:
return nil
}
}


@inlinable
mutating func parkConnection(scheduleKeepAliveTimer: Bool, scheduleIdleTimeoutTimer: Bool) -> Max2Sequence<ConnectionTimer> {
var keepAliveTimer: ConnectionTimer?
Expand Down
46 changes: 41 additions & 5 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,39 @@ struct PoolStateMachine<
}
}

@inlinable
mutating func connectionReceivedNewMaxStreamSetting(
_ connection: ConnectionID,
newMaxStreamSetting maxStreams: UInt16
) -> Action {
guard let info = self.connections.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: maxStreams) else {
return .none()
}

let waitingRequests = self.requestQueue.count

guard waitingRequests > 0 else {
return .none()
}

// the only thing we can do if we receive a new max stream setting is check if the new stream
// setting is higher and then dequeue some waiting requests

guard info.newMaxStreams > info.oldMaxStreams && info.newMaxStreams > info.usedStreams else {
return .none()
}

let leaseStreams = min(info.newMaxStreams - info.oldMaxStreams, info.newMaxStreams - info.usedStreams, UInt16(clamping: waitingRequests))
let requests = self.requestQueue.pop(max: leaseStreams)
precondition(Int(leaseStreams) == requests.count)
let leaseResult = self.connections.leaseConnection(at: info.index, streams: leaseStreams)

return .init(
request: .leaseConnection(requests, leaseResult.connection),
connection: .cancelTimers(.init(leaseResult.timersToCancel))
)
}

@inlinable
mutating func timerScheduled(_ timer: Timer, cancelContinuation: TimerCancellationToken) -> TimerCancellationToken? {
self.connections.timerScheduled(timer.underlying, cancelContinuation: cancelContinuation)
Expand Down Expand Up @@ -445,11 +478,14 @@ struct PoolStateMachine<
}

case .overflow:
let closeAction = self.connections.closeConnectionIfIdle(at: index)
return .init(
request: .none,
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
)
if let closeAction = self.connections.closeConnectionIfIdle(at: index) {
return .init(
request: .none,
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
)
} else {
return .none()
}
}

}
Expand Down
142 changes: 142 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,148 @@ final class ConnectionPoolTests: XCTestCase {
}
}
}

func testLeasingMultipleStreamsFromOneConnectionWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 10
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 connection requests
let requests = (0..<10).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)
var connections = [MockConnection]()

await factory.nextConnectAttempt { connectionID in
return 10
}

for request in requests {
let connection = try await request.future.success
connections.append(connection)
}

// Ensure that all requests got the same connection
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)

// release all 10 leased streams
for connection in connections {
pool.releaseConnection(connection)
}

for _ in 0..<9 {
_ = try? await factory.nextConnectAttempt { connectionID in
throw CancellationError()
}
}

// shutdown
taskGroup.cancelAll()
for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testIncreasingAvailableStreamsWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 connection requests
var requests = (0..<21).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)
var connections = [MockConnection]()

await factory.nextConnectAttempt { connectionID in
return 1
}

let connection = try await requests.first!.future.success
connections.append(connection)
requests.removeFirst()

pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21)

for (index, request) in requests.enumerated() {
let connection = try await request.future.success
connections.append(connection)
}

// Ensure that all requests got the same connection
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)

requests = (22..<42).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)

// release all 21 leased streams in a single call
pool.releaseConnection(connection, streams: 21)

// ensure all 20 new requests got fulfilled
for request in requests {
let connection = try await request.future.success
connections.append(connection)
}

// release all 20 leased streams one by one
for _ in requests {
pool.releaseConnection(connection, streams: 1)
}

// shutdown
taskGroup.cancelAll()
for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}
}

struct ConnectionFuture: ConnectionRequestProtocol {
Expand Down
Loading