From 5768447f0deae172d3e09c90dab17d5b8bdb01b8 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 9 Dec 2025 14:37:03 +0000 Subject: [PATCH 1/8] Ensure ConnectionPool doesnt shutdown until all connections are closed --- .gitignore | 1 + .../ConnectionPoolModule/ConnectionPool.swift | 6 +- .../PoolStateMachine+ConnectionGroup.swift | 66 ++++++++-- .../PoolStateMachine+ConnectionState.swift | 4 +- .../PoolStateMachine.swift | 52 ++++++-- .../ConnectionPoolTests.swift | 43 +++++++ ...oolStateMachine+ConnectionGroupTests.swift | 2 +- .../PoolStateMachineTests.swift | 119 +++++++++++++++++- 8 files changed, 269 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index d0d66c24e..64e4ded3d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ DerivedData Package.resolved .swiftpm Tests/LinuxMain.swift +.vscode/ \ No newline at end of file diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 328369677..824f79fc7 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -390,12 +390,16 @@ public final class ConnectionPool< self.closeConnection(connection) self.cancelTimers(timers) - case .shutdown(let cleanup): + case .initiateShutdown(let cleanup): for connection in cleanup.connections { self.closeConnection(connection) } self.cancelTimers(cleanup.timersToCancel) + case .shutdown(let timersToCancel): + self.cancelTimers(timersToCancel) + self.eventContinuation.finish() + case .none: break } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index a8e97ffde..90aa6e828 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -98,6 +98,7 @@ extension PoolStateMachine { self.keepAliveReducesAvailableStreams = keepAliveReducesAvailableStreams } + @usableFromInline var isEmpty: Bool { self.connections.isEmpty } @@ -514,6 +515,48 @@ extension PoolStateMachine { ) } + @usableFromInline + enum CloseConnectionAction { + case close(CloseAction) + case cancelTimers(Max2Sequence) + case doNothing + } + /// Closes the connection at the given index. + @inlinable + mutating func closeConnection(at index: Int) -> CloseConnectionAction { + guard let closeAction = self.connections[index].close() else { + return .doNothing // no action to take + } + + self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0 + self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams + + switch closeAction.previousConnectionState { + case .idle: + self.stats.idle -= 1 + self.stats.closing += 1 + + case .leased: + self.stats.leased -= 1 + self.stats.closing += 1 + + case .closing: + break + + case .backingOff: + break + } + + if let connection = closeAction.connection { + return .close(CloseAction( + connection: connection, + timersToCancel: closeAction.cancelTimers + )) + } else { + return .cancelTimers(closeAction.cancelTimers) + } + } + @inlinable mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? { guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { @@ -567,7 +610,7 @@ extension PoolStateMachine { /// supplied index after this. If nil is returned the connection was closed by the state machine and was /// therefore already removed. @inlinable - mutating func connectionClosed(_ connectionID: Connection.ID) -> ClosedAction { + mutating func connectionClosed(_ connectionID: Connection.ID, shuttingDown: Bool) -> ClosedAction { guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { preconditionFailure("All connections that have been created should say goodbye exactly once!") } @@ -597,7 +640,7 @@ extension PoolStateMachine { } let newConnectionRequest: ConnectionRequest? - if self.connections.count < self.minimumConcurrentConnections { + if !shuttingDown, self.connections.count < self.minimumConcurrentConnections { newConnectionRequest = self.createNewConnection() } else { newConnectionRequest = .none @@ -613,18 +656,19 @@ extension PoolStateMachine { // MARK: Shutdown mutating func triggerForceShutdown(_ cleanup: inout ConnectionAction.Shutdown) { - for var connectionState in self.connections { - guard let closeAction = connectionState.close() else { - continue - } + for index in self.connections.indices { + switch closeConnection(at: index) { + case .close(let closeAction): + cleanup.connections.append(closeAction.connection) + cleanup.timersToCancel.append(contentsOf: closeAction.timersToCancel) - if let connection = closeAction.connection { - cleanup.connections.append(connection) + case .cancelTimers(let timers): + cleanup.timersToCancel.append(contentsOf: timers) + + case .doNothing: + break } - cleanup.timersToCancel.append(contentsOf: closeAction.cancelTimers) } - - self.connections = [] } // MARK: - Private functions - diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift index 9912f13aa..b252ca4cf 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift @@ -587,10 +587,10 @@ extension PoolStateMachine { runningKeepAlive: keepAlive.isRunning ) - case .leased, .closed: + case .leased, .closed, .closing: return nil - case .backingOff, .starting, .closing: + case .backingOff, .starting: preconditionFailure("Invalid state: \(self.state)") } } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 8d995fa24..da40da515 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -88,8 +88,8 @@ struct PoolStateMachine< case runKeepAlive(Connection, TimerCancellationToken?) case cancelTimers(TinyFastSequence) case closeConnection(Connection, Max2Sequence) - case shutdown(Shutdown) - + case initiateShutdown(Shutdown) + case shutdown(TinyFastSequence) case none } @@ -259,8 +259,27 @@ struct PoolStateMachine< case .running, .shuttingDown(graceful: true): let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) return self.handleAvailableConnection(index: index, availableContext: context) - case .shuttingDown(graceful: false), .shutDown: - return .init(request: .none, connection: .closeConnection(connection, [])) + + case .shuttingDown(graceful: false): + let (index, _) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) + switch self.connections.closeConnection(at: index) { + case .close(let closeAction): + return .init( + request: .none, + connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) + ) + case .cancelTimers(let timers): + return .init( + request: .none, + connection: .cancelTimers(.init(timers)) + ) + case .doNothing: + return .none() + } + + case .shutDown: + fatalError("Connection pool is not running") +// return .init(request: .none, connection: .closeConnection(connection, [])) } } @@ -403,7 +422,7 @@ struct PoolStateMachine< case .running, .shuttingDown(graceful: true): self.cacheNoMoreConnectionsAllowed = false - let closedConnectionAction = self.connections.connectionClosed(connection.id) + let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: false) let connectionAction: ConnectionAction if let newRequest = closedConnectionAction.newConnectionRequest { @@ -414,7 +433,19 @@ struct PoolStateMachine< return .init(request: .none, connection: connectionAction) - case .shuttingDown(graceful: false), .shutDown: + case .shuttingDown(graceful: false): + let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: true) + + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .shutdown(closedConnectionAction.timersToCancel) + } else { + connectionAction = .cancelTimers(closedConnectionAction.timersToCancel) + } + + return .init(request: .none, connection: connectionAction) + case .shutDown: return .none() } } @@ -442,13 +473,13 @@ struct PoolStateMachine< var shutdown = ConnectionAction.Shutdown() self.connections.triggerForceShutdown(&shutdown) - if shutdown.connections.isEmpty { + if self.connections.isEmpty { self.poolState = .shutDown } return .init( request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), - connection: .shutdown(shutdown) + connection: .initiateShutdown(shutdown) ) case .shuttingDown: @@ -569,6 +600,9 @@ extension PoolStateMachine { @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {} +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension PoolStateMachine.PoolState: Equatable {} + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable { @usableFromInline @@ -582,6 +616,8 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo return lhsConn === rhsConn && lhsToken == rhsToken case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)): return lhsConn === rhsConn && lhsTimers == rhsTimers + case (.initiateShutdown(let lhs), .initiateShutdown(let rhs)): + return lhs == rhs case (.shutdown(let lhs), .shutdown(let rhs)): return lhs == rhs case (.cancelTimers(let lhs), .cancelTimers(let rhs)): diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index f36642422..8581e9543 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -846,6 +846,49 @@ import Testing } } } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdown() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 1 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + await factory.nextConnectAttempt { connectionID in + return 1 + } + let lease = try await pool.leaseConnection() + pool.releaseConnection(lease.connection) + + pool.triggerForceShutdown() + + for connection in factory.runningConnections { + connection.closeIfClosing() + } + } + } } struct ConnectionFuture: ConnectionRequestProtocol { diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift index 6bfe0f394..87d39da9f 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift @@ -119,7 +119,7 @@ import Testing #expect(closeAction.connection === newConnection) #expect(connections.stats == .init(closing: 1, availableStreams: 0)) - let closeContext = connections.connectionClosed(newConnection.id) + let closeContext = connections.connectionClosed(newConnection.id, shuttingDown: false) #expect(closeContext.connectionsStarting == 0) #expect(connections.isEmpty) #expect(connections.stats == .init()) diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index c748de286..238a995a6 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -170,7 +170,7 @@ typealias TestPoolStateMachine = PoolStateMachine< let shutdownAction = stateMachine.triggerForceShutdown() #expect(shutdownAction.request == .failRequests(.init(), .poolShutdown)) - #expect(shutdownAction.connection == .shutdown(.init())) + #expect(shutdownAction.connection == .initiateShutdown(.init())) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -390,4 +390,121 @@ typealias TestPoolStateMachine = PoolStateMachine< } } + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithIdleConnections() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + let connection1KeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 0, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction.connection == .scheduleTimers([connection1KeepAliveTimer])) + #expect(stateMachine.timerScheduled(connection1KeepAliveTimer, cancelContinuation: MockTimerCancellationToken(connection1KeepAliveTimer)) == .none) + + let shutdownAction = stateMachine.triggerForceShutdown() + var shutdown = TestPoolStateMachine.ConnectionAction.Shutdown() + shutdown.connections = [connection] + shutdown.timersToCancel = [MockTimerCancellationToken(connection1KeepAliveTimer)] + #expect(shutdownAction.connection == .initiateShutdown(shutdown)) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .shutdown([])) + + #expect(stateMachine.poolState == .shutDown) + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithLeasedConnections() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + let connection1KeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 0, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction.connection == .scheduleTimers([connection1KeepAliveTimer])) + #expect(stateMachine.timerScheduled(connection1KeepAliveTimer, cancelContinuation: MockTimerCancellationToken(connection1KeepAliveTimer)) == .none) + + let request = MockRequest(connectionType: MockConnection.self) + let leaseAction = stateMachine.leaseConnection(request) + #expect(leaseAction.request == .leaseConnection(.init(element: request), connection)) + #expect(leaseAction.connection == .cancelTimers([MockTimerCancellationToken(connection1KeepAliveTimer)])) + + let shutdownAction = stateMachine.triggerForceShutdown() + var shutdown = TestPoolStateMachine.ConnectionAction.Shutdown() + shutdown.connections = [connection] + #expect(shutdownAction.connection == .initiateShutdown(shutdown)) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .shutdown([])) + + #expect(stateMachine.poolState == .shutDown) + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithInProgessRequest() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + let shutdownAction = stateMachine.triggerForceShutdown() + #expect(shutdownAction.connection == .initiateShutdown(.init())) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + #expect(createdAction.connection == .closeConnection(connection, [])) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .shutdown([])) + + #expect(stateMachine.poolState == .shutDown) + } } From af973053d8ee27787aa4013077da35195751bf39 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 9 Dec 2025 15:10:59 +0000 Subject: [PATCH 2/8] If no connections return shutdown action --- Sources/ConnectionPoolModule/PoolStateMachine.swift | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index da40da515..2e8dc7657 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -89,7 +89,7 @@ struct PoolStateMachine< case cancelTimers(TinyFastSequence) case closeConnection(Connection, Max2Sequence) case initiateShutdown(Shutdown) - case shutdown(TinyFastSequence) + case shutdown([TimerCancellationToken]) case none } @@ -439,7 +439,7 @@ struct PoolStateMachine< let connectionAction: ConnectionAction if self.connections.isEmpty { self.poolState = .shutDown - connectionAction = .shutdown(closedConnectionAction.timersToCancel) + connectionAction = .shutdown(.init(closedConnectionAction.timersToCancel)) } else { connectionAction = .cancelTimers(closedConnectionAction.timersToCancel) } @@ -473,8 +473,12 @@ struct PoolStateMachine< var shutdown = ConnectionAction.Shutdown() self.connections.triggerForceShutdown(&shutdown) - if self.connections.isEmpty { + if self.connections.isEmpty, shutdown.connections.isEmpty { self.poolState = .shutDown + return .init( + request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), + connection: .shutdown(shutdown.timersToCancel) + ) } return .init( From 9badd982627ee12d205e1d01f8e6728b39a2adb1 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 9 Dec 2025 18:03:19 +0000 Subject: [PATCH 3/8] Have `handleAvailableConnection` close idle connections on shutdown --- .../PoolStateMachine.swift | 47 ++++++++++--------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 2e8dc7657..d79cfe1ec 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -239,7 +239,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) } mutating func cancelRequest(id: RequestID) -> Action { @@ -256,30 +256,16 @@ struct PoolStateMachine< @inlinable mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action { switch self.poolState { - case .running, .shuttingDown(graceful: true): + case .running: let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) - case .shuttingDown(graceful: false): - let (index, _) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - switch self.connections.closeConnection(at: index) { - case .close(let closeAction): - return .init( - request: .none, - connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) - ) - case .cancelTimers(let timers): - return .init( - request: .none, - connection: .cancelTimers(.init(timers)) - ) - case .doNothing: - return .none() - } + case .shuttingDown: + let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) + return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: true) case .shutDown: fatalError("Connection pool is not running") -// return .init(request: .none, connection: .closeConnection(connection, [])) } } @@ -392,7 +378,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.keepAliveSucceeded(connection.id) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context) + return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) } @inlinable @@ -497,7 +483,8 @@ struct PoolStateMachine< @inlinable /*private*/ mutating func handleAvailableConnection( index: Int, - availableContext: ConnectionGroup.AvailableConnectionContext + availableContext: ConnectionGroup.AvailableConnectionContext, + shuttingDown: Bool ) -> Action { // this connection was busy before let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) @@ -516,6 +503,22 @@ struct PoolStateMachine< return .none() case .idle(_, let newIdle): + if shuttingDown { + switch self.connections.closeConnection(at: index) { + case .close(let closeAction): + return .init( + request: .none, + connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) + ) + case .cancelTimers(let timers): + return .init( + request: .none, + connection: .cancelTimers(.init(timers)) + ) + case .doNothing: + return .none() + } + } let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers) return .init( From c0776de1eba5e69c375de0887f31d379831ea117 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 10 Dec 2025 10:38:17 +0000 Subject: [PATCH 4/8] Don't pass around shuttingDown boolean, add extra tests --- .../PoolStateMachine.swift | 15 +-- .../ConnectionPoolTests.swift | 106 ++++++++++++++++++ 2 files changed, 114 insertions(+), 7 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index d79cfe1ec..7a83d78b0 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -88,7 +88,9 @@ struct PoolStateMachine< case runKeepAlive(Connection, TimerCancellationToken?) case cancelTimers(TinyFastSequence) case closeConnection(Connection, Max2Sequence) + /// Start process of shutting down the connection pool. Close connections, cancel timers. case initiateShutdown(Shutdown) + /// All connections have been closed, the pool event stream can be ended. case shutdown([TimerCancellationToken]) case none } @@ -239,7 +241,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.releaseConnection(connection.id, streams: streams) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) + return self.handleAvailableConnection(index: index, availableContext: context) } mutating func cancelRequest(id: RequestID) -> Action { @@ -258,11 +260,11 @@ struct PoolStateMachine< switch self.poolState { case .running: let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) + return self.handleAvailableConnection(index: index, availableContext: context) case .shuttingDown: let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) - return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: true) + return self.handleAvailableConnection(index: index, availableContext: context) case .shutDown: fatalError("Connection pool is not running") @@ -378,7 +380,7 @@ struct PoolStateMachine< guard let (index, context) = self.connections.keepAliveSucceeded(connection.id) else { return .none() } - return self.handleAvailableConnection(index: index, availableContext: context, shuttingDown: false) + return self.handleAvailableConnection(index: index, availableContext: context) } @inlinable @@ -483,8 +485,7 @@ struct PoolStateMachine< @inlinable /*private*/ mutating func handleAvailableConnection( index: Int, - availableContext: ConnectionGroup.AvailableConnectionContext, - shuttingDown: Bool + availableContext: ConnectionGroup.AvailableConnectionContext ) -> Action { // this connection was busy before let requests = self.requestQueue.pop(max: availableContext.info.availableStreams) @@ -503,7 +504,7 @@ struct PoolStateMachine< return .none() case .idle(_, let newIdle): - if shuttingDown { + if case .shuttingDown = self.poolState { switch self.connections.closeConnection(at: index) { case .close(let closeAction): return .init( diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 8581e9543..7bbba54f0 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -885,6 +885,112 @@ import Testing pool.triggerForceShutdown() for connection in factory.runningConnections { + try await connection.signalToClose + connection.closeIfClosing() + } + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithLeasedConnection() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 1 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + await factory.nextConnectAttempt { connectionID in + return 1 + } + _ = try await pool.leaseConnection() + + pool.triggerForceShutdown() + + for connection in factory.runningConnections { + try await connection.signalToClose + connection.closeIfClosing() + } + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithActiveRequest() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + let connectionAttemptWaiter = Future(of: Void.self) + let triggerShutdownWaiter = Future(of: Void.self) + let leaseFailedWaiter = Future(of: Void.self) + + taskGroup.addTask { + await #expect(throws: ConnectionPoolError.poolShutdown) { + try await pool.leaseConnection() + } + leaseFailedWaiter.yield(value: ()) + } + + taskGroup.addTask { + try await factory.nextConnectAttempt { connectionID in + connectionAttemptWaiter.yield(value: ()) + try await triggerShutdownWaiter.success + return 1 + } + } + try await connectionAttemptWaiter.success + + pool.triggerForceShutdown() + + triggerShutdownWaiter.yield(value: ()) + + try await leaseFailedWaiter.success + + for connection in factory.runningConnections { + try await connection.signalToClose connection.closeIfClosing() } } From fe0d33215ae2368d923ae59e810a34b65383d284 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 10 Dec 2025 11:23:27 +0000 Subject: [PATCH 5/8] shutdown -> cancelEventStreamAndFinalCleanup --- Sources/ConnectionPoolModule/ConnectionPool.swift | 2 +- Sources/ConnectionPoolModule/PoolStateMachine.swift | 8 ++++---- .../ConnectionPoolModuleTests/PoolStateMachineTests.swift | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 824f79fc7..33969d283 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -396,7 +396,7 @@ public final class ConnectionPool< } self.cancelTimers(cleanup.timersToCancel) - case .shutdown(let timersToCancel): + case .cancelEventStreamAndFinalCleanup(let timersToCancel): self.cancelTimers(timersToCancel) self.eventContinuation.finish() diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 7a83d78b0..885456bac 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -91,7 +91,7 @@ struct PoolStateMachine< /// Start process of shutting down the connection pool. Close connections, cancel timers. case initiateShutdown(Shutdown) /// All connections have been closed, the pool event stream can be ended. - case shutdown([TimerCancellationToken]) + case cancelEventStreamAndFinalCleanup([TimerCancellationToken]) case none } @@ -427,7 +427,7 @@ struct PoolStateMachine< let connectionAction: ConnectionAction if self.connections.isEmpty { self.poolState = .shutDown - connectionAction = .shutdown(.init(closedConnectionAction.timersToCancel)) + connectionAction = .cancelEventStreamAndFinalCleanup(.init(closedConnectionAction.timersToCancel)) } else { connectionAction = .cancelTimers(closedConnectionAction.timersToCancel) } @@ -465,7 +465,7 @@ struct PoolStateMachine< self.poolState = .shutDown return .init( request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), - connection: .shutdown(shutdown.timersToCancel) + connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel) ) } @@ -626,7 +626,7 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo return lhsConn === rhsConn && lhsTimers == rhsTimers case (.initiateShutdown(let lhs), .initiateShutdown(let rhs)): return lhs == rhs - case (.shutdown(let lhs), .shutdown(let rhs)): + case (.cancelEventStreamAndFinalCleanup(let lhs), .cancelEventStreamAndFinalCleanup(let rhs)): return lhs == rhs case (.cancelTimers(let lhs), .cancelTimers(let rhs)): return lhs == rhs diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index 238a995a6..a1ae9f8b7 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -425,7 +425,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(shutdownAction.connection == .initiateShutdown(shutdown)) let closedAction = stateMachine.connectionClosed(connection) - #expect(closedAction.connection == .shutdown([])) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.poolState == .shutDown) } @@ -469,7 +469,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(shutdownAction.connection == .initiateShutdown(shutdown)) let closedAction = stateMachine.connectionClosed(connection) - #expect(closedAction.connection == .shutdown([])) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.poolState == .shutDown) } @@ -503,7 +503,7 @@ typealias TestPoolStateMachine = PoolStateMachine< #expect(createdAction.connection == .closeConnection(connection, [])) let closedAction = stateMachine.connectionClosed(connection) - #expect(closedAction.connection == .shutdown([])) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) #expect(stateMachine.poolState == .shutDown) } From f4790a3c555a91a4d30418502ff24729f7c00239 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 10 Dec 2025 11:32:07 +0000 Subject: [PATCH 6/8] Deal with releasing already closed connections Now that we keep a record of all connections when shutting down it is possible for a connection to be released while in the closing state. --- .../PoolStateMachine+ConnectionGroup.swift | 4 +++- .../PoolStateMachine+ConnectionState.swift | 8 ++++++-- Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift | 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index 90aa6e828..bc5da6902 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -398,7 +398,9 @@ extension PoolStateMachine { return nil } - let connectionInfo = self.connections[index].release(streams: streams) + guard let connectionInfo = self.connections[index].release(streams: streams) else { + return nil + } self.stats.availableStreams += streams self.stats.leasedStreams -= streams switch connectionInfo { diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift index b252ca4cf..988832f82 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift @@ -396,7 +396,7 @@ extension PoolStateMachine { } @inlinable - mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo { + mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo? { switch self.state { case .leased(let connection, let usedStreams, let maxStreams, let keepAlive): precondition(usedStreams >= returnedStreams) @@ -409,7 +409,11 @@ extension PoolStateMachine { self.state = .leased(connection, usedStreams: newUsedStreams, maxStreams: maxStreams, keepAlive: keepAlive) return .leased(availableStreams: availableStreams) } - case .backingOff, .starting, .idle, .closing, .closed: + + case .closing: + return nil + + case .backingOff, .starting, .idle, .closed: preconditionFailure("Invalid state: \(self.state)") } } diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 7bbba54f0..31f7945c5 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -923,10 +923,12 @@ import Testing await factory.nextConnectAttempt { connectionID in return 1 } - _ = try await pool.leaseConnection() + let lease = try await pool.leaseConnection() pool.triggerForceShutdown() + pool.releaseConnection(lease.connection) + for connection in factory.runningConnections { try await connection.signalToClose connection.closeIfClosing() From 6b4a3d58a3573715843e4312c78a32d6a67b0e8e Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 10 Dec 2025 16:06:46 +0000 Subject: [PATCH 7/8] Destroy failed connections on shutdown --- .../PoolStateMachine+ConnectionGroup.swift | 19 +++++++- .../PoolStateMachine+ConnectionState.swift | 11 +++++ .../PoolStateMachine.swift | 13 +++++- .../ConnectionPoolTests.swift | 45 +++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index bc5da6902..97a923130 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -546,7 +546,7 @@ extension PoolStateMachine { break case .backingOff: - break + self.stats.backingOff -= 1 } if let connection = closeAction.connection { @@ -555,7 +555,12 @@ extension PoolStateMachine { timersToCancel: closeAction.cancelTimers )) } else { - return .cancelTimers(closeAction.cancelTimers) + // if there is no connection we should delete this now + var timersToCancel = closeAction.cancelTimers + if let cancellationTimer = self.swapForDeletion(index: index) { + timersToCancel.append(cancellationTimer) + } + return .cancelTimers(timersToCancel) } } @@ -577,6 +582,16 @@ extension PoolStateMachine { return self.closeConnectionIfIdle(at: index) } + @inlinable + mutating func destroyFailedConnection(_ connectionID: Connection.ID) -> TimerCancellationToken? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("Failing a connection we don't have a record of.") + } + + self.connections[index].destroyFailedConnection() + return self.swapForDeletion(index: index) + } + /// Information around the failed/closed connection. @usableFromInline struct ClosedAction { diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift index 988832f82..93f3b40c5 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift @@ -599,6 +599,17 @@ extension PoolStateMachine { } } + @inlinable + mutating func destroyFailedConnection() { + switch self.state { + case .starting: + self.state = .closed + + case .idle, .leased, .closed, .closing, .backingOff: + preconditionFailure("Invalid state: \(self.state)") + } + } + @inlinable mutating func close() -> CloseAction? { switch self.state { diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 885456bac..cdee6765d 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -333,7 +333,18 @@ struct PoolStateMachine< return .init(request: .none, connection: .scheduleTimers(.init(timer))) case .shuttingDown(graceful: false), .shutDown: - return .none() + let timerToCancel = self.connections.destroyFailedConnection(request.connectionID) + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .cancelEventStreamAndFinalCleanup(timerToCancel.map {[$0]} ?? []) + } else { + connectionAction = .cancelTimers(timerToCancel.map {[$0]} ?? []) + } + return .init( + request: .none, + connection: connectionAction + ) } } diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index 31f7945c5..6ef6f6f47 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -997,6 +997,51 @@ import Testing } } } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithConnectionBackingOff() async throws { + struct CreateConnectionFailed: Error {} + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + + taskGroup.addTask { + await #expect(throws: ConnectionPoolError.poolShutdown) { + try await pool.leaseConnection() + } + } + + _ = try? await factory.nextConnectAttempt { connectionID in + throw CreateConnectionFailed() + } + + pool.triggerForceShutdown() + } + } } struct ConnectionFuture: ConnectionRequestProtocol { From f6016ed6e76251b95de085a746a1537a1c30e6d5 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Wed, 10 Dec 2025 16:15:28 +0000 Subject: [PATCH 8/8] Shutdown pool if creation backoff done destroys the connection state --- Sources/ConnectionPoolModule/PoolStateMachine.swift | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index cdee6765d..1b58722b9 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -366,7 +366,17 @@ struct PoolStateMachine< return .init(request: .none, connection: .makeConnection(request, timers)) case .cancelTimers(let timers): - return .init(request: .none, connection: .cancelTimers(.init(timers))) + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .cancelEventStreamAndFinalCleanup(.init(timers)) + } else { + connectionAction = .cancelTimers(.init(timers)) + } + return .init( + request: .none, + connection: connectionAction + ) } case .shuttingDown(graceful: false), .shutDown: