Skip to content

Commit

Permalink
Add support for multiple streams
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett committed Nov 14, 2023
1 parent dc94503 commit 3c4173a
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 9 deletions.
6 changes: 4 additions & 2 deletions Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,10 @@ public final class ConnectionPool<

}

public func connection(_ connection: Connection, didReceiveNewMaxStreamSetting: UInt16) {

public func connectionReceivedNewMaxStreamSetting(_ connection: Connection, newMaxStreamSetting maxStreams: UInt16) {
self.modifyStateAndRunActions { state in
state.stateMachine.connectionReceivedNewMaxStreamSetting(connection.id, newMaxStreamSetting: maxStreams)
}
}

public func run() async {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,50 @@ extension PoolStateMachine {
return self.connections[index].timerScheduled(timer, cancelContinuation: cancelContinuation)
}

// MARK: Changes at runtime

@usableFromInline
struct NewMaxStreamInfo {

@usableFromInline
var index: Int

@usableFromInline
var newMaxStreams: UInt16

@usableFromInline
var oldMaxStreams: UInt16

@usableFromInline
var usedStreams: UInt16

@inlinable
init(index: Int, info: ConnectionState.NewMaxStreamInfo) {
self.index = index
self.newMaxStreams = info.newMaxStreams
self.oldMaxStreams = info.oldMaxStreams
self.usedStreams = info.usedStreams
}
}

@inlinable
mutating func connectionReceivedNewMaxStreamSetting(
_ connectionID: ConnectionID,
newMaxStreamSetting maxStreams: UInt16
) -> NewMaxStreamInfo? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
return nil
}

guard let info = self.connections[index].newMaxStreamSetting(maxStreams) else {
return nil
}

self.stats.availableStreams += maxStreams - info.oldMaxStreams

return NewMaxStreamInfo(index: index, info: info)
}

// MARK: Leasing and releasing

/// Lease a connection, if an idle connection is available.
Expand Down Expand Up @@ -424,9 +468,9 @@ extension PoolStateMachine {

/// Closes the connection at the given index.
@inlinable
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction {
mutating func closeConnectionIfIdle(at index: Int) -> CloseAction? {
guard let closeAction = self.connections[index].closeIfIdle() else {
preconditionFailure("Invalid state: \(self)")
return nil // apparently the connection isn't idle
}

self.stats.idle -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,53 @@ extension PoolStateMachine {
}
}

@usableFromInline
struct NewMaxStreamInfo {
@usableFromInline
var newMaxStreams: UInt16

@usableFromInline
var oldMaxStreams: UInt16

@usableFromInline
var usedStreams: UInt16

@inlinable
init(newMaxStreams: UInt16, oldMaxStreams: UInt16, usedStreams: UInt16) {
self.newMaxStreams = newMaxStreams
self.oldMaxStreams = oldMaxStreams
self.usedStreams = usedStreams
}
}

@inlinable
mutating func newMaxStreamSetting(_ newMaxStreams: UInt16) -> NewMaxStreamInfo? {
switch self.state {
case .starting, .backingOff:
preconditionFailure("Invalid state: \(self.state)")

case .idle(let connection, let oldMaxStreams, let keepAlive, idleTimer: let idleTimer):
self.state = .idle(connection, maxStreams: newMaxStreams, keepAlive: keepAlive, idleTimer: idleTimer)
return NewMaxStreamInfo(
newMaxStreams: newMaxStreams,
oldMaxStreams: oldMaxStreams,
usedStreams: keepAlive.usedStreams
)

case .leased(let connection, let usedStreams, let oldMaxStreams, let keepAlive):
self.state = .leased(connection, usedStreams: usedStreams, maxStreams: newMaxStreams, keepAlive: keepAlive)
return NewMaxStreamInfo(
newMaxStreams: newMaxStreams,
oldMaxStreams: oldMaxStreams,
usedStreams: usedStreams + keepAlive.usedStreams
)

case .closing, .closed:
return nil
}
}


@inlinable
mutating func parkConnection(scheduleKeepAliveTimer: Bool, scheduleIdleTimeoutTimer: Bool) -> Max2Sequence<ConnectionTimer> {
var keepAliveTimer: ConnectionTimer?
Expand Down
46 changes: 41 additions & 5 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,39 @@ struct PoolStateMachine<
}
}

@inlinable
mutating func connectionReceivedNewMaxStreamSetting(
_ connection: ConnectionID,
newMaxStreamSetting maxStreams: UInt16
) -> Action {
guard let info = self.connections.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: maxStreams) else {
return .none()
}

let waitingRequests = self.requestQueue.count

guard waitingRequests > 0 else {
return .none()
}

// the only thing we can do if we receive a new max stream setting is check if the new stream
// setting is higher and then dequeue some waiting requests

guard info.newMaxStreams > info.oldMaxStreams && info.newMaxStreams > info.usedStreams else {
return .none()
}

let leaseStreams = min(info.newMaxStreams - info.oldMaxStreams, info.newMaxStreams - info.usedStreams, UInt16(clamping: waitingRequests))
let requests = self.requestQueue.pop(max: leaseStreams)
precondition(Int(leaseStreams) == requests.count)
let leaseResult = self.connections.leaseConnection(at: info.index, streams: leaseStreams)

return .init(
request: .leaseConnection(requests, leaseResult.connection),
connection: .cancelTimers(.init(leaseResult.timersToCancel))
)
}

@inlinable
mutating func timerScheduled(_ timer: Timer, cancelContinuation: TimerCancellationToken) -> TimerCancellationToken? {
self.connections.timerScheduled(timer.underlying, cancelContinuation: cancelContinuation)
Expand Down Expand Up @@ -445,11 +478,14 @@ struct PoolStateMachine<
}

case .overflow:
let closeAction = self.connections.closeConnectionIfIdle(at: index)
return .init(
request: .none,
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
)
if let closeAction = self.connections.closeConnectionIfIdle(at: index) {
return .init(
request: .none,
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
)
} else {
return .none()
}
}

}
Expand Down
142 changes: 142 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,148 @@ final class ConnectionPoolTests: XCTestCase {
}
}
}

func testLeasingMultipleStreamsFromOneConnectionWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 10
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 connection requests
let requests = (0..<10).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)
var connections = [MockConnection]()

await factory.nextConnectAttempt { connectionID in
return 10
}

for request in requests {
let connection = try await request.future.success
connections.append(connection)
}

// Ensure that all requests got the same connection
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)

// release all 10 leased streams
for connection in connections {
pool.releaseConnection(connection)
}

for _ in 0..<9 {
_ = try? await factory.nextConnectAttempt { connectionID in
throw CancellationError()
}
}

// shutdown
taskGroup.cancelAll()
for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}

func testIncreasingAvailableStreamsWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 0
mutableConfig.maximumConnectionSoftLimit = 1
mutableConfig.maximumConnectionHardLimit = 1
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 connection requests
var requests = (0..<21).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)
var connections = [MockConnection]()

await factory.nextConnectAttempt { connectionID in
return 1
}

let connection = try await requests.first!.future.success
connections.append(connection)
requests.removeFirst()

pool.connectionReceivedNewMaxStreamSetting(connection, newMaxStreamSetting: 21)

for (index, request) in requests.enumerated() {
let connection = try await request.future.success
connections.append(connection)
}

// Ensure that all requests got the same connection
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 1)

requests = (22..<42).map { ConnectionFuture(id: $0) }
pool.leaseConnections(requests)

// release all 21 leased streams in a single call
pool.releaseConnection(connection, streams: 21)

// ensure all 20 new requests got fulfilled
for request in requests {
let connection = try await request.future.success
connections.append(connection)
}

// release all 20 leased streams one by one
for _ in requests {
pool.releaseConnection(connection, streams: 1)
}

// shutdown
taskGroup.cancelAll()
for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}
}

struct ConnectionFuture: ConnectionRequestProtocol {
Expand Down

0 comments on commit 3c4173a

Please sign in to comment.