diff --git a/.gitignore b/.gitignore index d0d66c24..64e4ded3 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ DerivedData Package.resolved .swiftpm Tests/LinuxMain.swift +.vscode/ \ No newline at end of file diff --git a/Sources/ConnectionPoolModule/ConnectionPool.swift b/Sources/ConnectionPoolModule/ConnectionPool.swift index 32836967..33969d28 100644 --- a/Sources/ConnectionPoolModule/ConnectionPool.swift +++ b/Sources/ConnectionPoolModule/ConnectionPool.swift @@ -390,12 +390,16 @@ public final class ConnectionPool< self.closeConnection(connection) self.cancelTimers(timers) - case .shutdown(let cleanup): + case .initiateShutdown(let cleanup): for connection in cleanup.connections { self.closeConnection(connection) } self.cancelTimers(cleanup.timersToCancel) + case .cancelEventStreamAndFinalCleanup(let timersToCancel): + self.cancelTimers(timersToCancel) + self.eventContinuation.finish() + case .none: break } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift index a8e97ffd..97a92313 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionGroup.swift @@ -98,6 +98,7 @@ extension PoolStateMachine { self.keepAliveReducesAvailableStreams = keepAliveReducesAvailableStreams } + @usableFromInline var isEmpty: Bool { self.connections.isEmpty } @@ -397,7 +398,9 @@ extension PoolStateMachine { return nil } - let connectionInfo = self.connections[index].release(streams: streams) + guard let connectionInfo = self.connections[index].release(streams: streams) else { + return nil + } self.stats.availableStreams += streams self.stats.leasedStreams -= streams switch connectionInfo { @@ -514,6 +517,53 @@ extension PoolStateMachine { ) } + @usableFromInline + enum CloseConnectionAction { + case close(CloseAction) + case cancelTimers(Max2Sequence) + case doNothing + } + /// Closes the connection at the given index. + @inlinable + mutating func closeConnection(at index: Int) -> CloseConnectionAction { + guard let closeAction = self.connections[index].close() else { + return .doNothing // no action to take + } + + self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0 + self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams + + switch closeAction.previousConnectionState { + case .idle: + self.stats.idle -= 1 + self.stats.closing += 1 + + case .leased: + self.stats.leased -= 1 + self.stats.closing += 1 + + case .closing: + break + + case .backingOff: + self.stats.backingOff -= 1 + } + + if let connection = closeAction.connection { + return .close(CloseAction( + connection: connection, + timersToCancel: closeAction.cancelTimers + )) + } else { + // if there is no connection we should delete this now + var timersToCancel = closeAction.cancelTimers + if let cancellationTimer = self.swapForDeletion(index: index) { + timersToCancel.append(cancellationTimer) + } + return .cancelTimers(timersToCancel) + } + } + @inlinable mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? { guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { @@ -532,6 +582,16 @@ extension PoolStateMachine { return self.closeConnectionIfIdle(at: index) } + @inlinable + mutating func destroyFailedConnection(_ connectionID: Connection.ID) -> TimerCancellationToken? { + guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { + preconditionFailure("Failing a connection we don't have a record of.") + } + + self.connections[index].destroyFailedConnection() + return self.swapForDeletion(index: index) + } + /// Information around the failed/closed connection. @usableFromInline struct ClosedAction { @@ -567,7 +627,7 @@ extension PoolStateMachine { /// supplied index after this. If nil is returned the connection was closed by the state machine and was /// therefore already removed. @inlinable - mutating func connectionClosed(_ connectionID: Connection.ID) -> ClosedAction { + mutating func connectionClosed(_ connectionID: Connection.ID, shuttingDown: Bool) -> ClosedAction { guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else { preconditionFailure("All connections that have been created should say goodbye exactly once!") } @@ -597,7 +657,7 @@ extension PoolStateMachine { } let newConnectionRequest: ConnectionRequest? - if self.connections.count < self.minimumConcurrentConnections { + if !shuttingDown, self.connections.count < self.minimumConcurrentConnections { newConnectionRequest = self.createNewConnection() } else { newConnectionRequest = .none @@ -613,18 +673,19 @@ extension PoolStateMachine { // MARK: Shutdown mutating func triggerForceShutdown(_ cleanup: inout ConnectionAction.Shutdown) { - for var connectionState in self.connections { - guard let closeAction = connectionState.close() else { - continue - } + for index in self.connections.indices { + switch closeConnection(at: index) { + case .close(let closeAction): + cleanup.connections.append(closeAction.connection) + cleanup.timersToCancel.append(contentsOf: closeAction.timersToCancel) + + case .cancelTimers(let timers): + cleanup.timersToCancel.append(contentsOf: timers) - if let connection = closeAction.connection { - cleanup.connections.append(connection) + case .doNothing: + break } - cleanup.timersToCancel.append(contentsOf: closeAction.cancelTimers) } - - self.connections = [] } // MARK: - Private functions - diff --git a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift index 9912f13a..93f3b40c 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine+ConnectionState.swift @@ -396,7 +396,7 @@ extension PoolStateMachine { } @inlinable - mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo { + mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo? { switch self.state { case .leased(let connection, let usedStreams, let maxStreams, let keepAlive): precondition(usedStreams >= returnedStreams) @@ -409,7 +409,11 @@ extension PoolStateMachine { self.state = .leased(connection, usedStreams: newUsedStreams, maxStreams: maxStreams, keepAlive: keepAlive) return .leased(availableStreams: availableStreams) } - case .backingOff, .starting, .idle, .closing, .closed: + + case .closing: + return nil + + case .backingOff, .starting, .idle, .closed: preconditionFailure("Invalid state: \(self.state)") } } @@ -587,10 +591,21 @@ extension PoolStateMachine { runningKeepAlive: keepAlive.isRunning ) - case .leased, .closed: + case .leased, .closed, .closing: return nil - case .backingOff, .starting, .closing: + case .backingOff, .starting: + preconditionFailure("Invalid state: \(self.state)") + } + } + + @inlinable + mutating func destroyFailedConnection() { + switch self.state { + case .starting: + self.state = .closed + + case .idle, .leased, .closed, .closing, .backingOff: preconditionFailure("Invalid state: \(self.state)") } } diff --git a/Sources/ConnectionPoolModule/PoolStateMachine.swift b/Sources/ConnectionPoolModule/PoolStateMachine.swift index 8d995fa2..1b58722b 100644 --- a/Sources/ConnectionPoolModule/PoolStateMachine.swift +++ b/Sources/ConnectionPoolModule/PoolStateMachine.swift @@ -88,8 +88,10 @@ struct PoolStateMachine< case runKeepAlive(Connection, TimerCancellationToken?) case cancelTimers(TinyFastSequence) case closeConnection(Connection, Max2Sequence) - case shutdown(Shutdown) - + /// Start process of shutting down the connection pool. Close connections, cancel timers. + case initiateShutdown(Shutdown) + /// All connections have been closed, the pool event stream can be ended. + case cancelEventStreamAndFinalCleanup([TimerCancellationToken]) case none } @@ -256,11 +258,16 @@ struct PoolStateMachine< @inlinable mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action { switch self.poolState { - case .running, .shuttingDown(graceful: true): + case .running: let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) return self.handleAvailableConnection(index: index, availableContext: context) - case .shuttingDown(graceful: false), .shutDown: - return .init(request: .none, connection: .closeConnection(connection, [])) + + case .shuttingDown: + let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams) + return self.handleAvailableConnection(index: index, availableContext: context) + + case .shutDown: + fatalError("Connection pool is not running") } } @@ -326,7 +333,18 @@ struct PoolStateMachine< return .init(request: .none, connection: .scheduleTimers(.init(timer))) case .shuttingDown(graceful: false), .shutDown: - return .none() + let timerToCancel = self.connections.destroyFailedConnection(request.connectionID) + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .cancelEventStreamAndFinalCleanup(timerToCancel.map {[$0]} ?? []) + } else { + connectionAction = .cancelTimers(timerToCancel.map {[$0]} ?? []) + } + return .init( + request: .none, + connection: connectionAction + ) } } @@ -348,7 +366,17 @@ struct PoolStateMachine< return .init(request: .none, connection: .makeConnection(request, timers)) case .cancelTimers(let timers): - return .init(request: .none, connection: .cancelTimers(.init(timers))) + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .cancelEventStreamAndFinalCleanup(.init(timers)) + } else { + connectionAction = .cancelTimers(.init(timers)) + } + return .init( + request: .none, + connection: connectionAction + ) } case .shuttingDown(graceful: false), .shutDown: @@ -403,7 +431,7 @@ struct PoolStateMachine< case .running, .shuttingDown(graceful: true): self.cacheNoMoreConnectionsAllowed = false - let closedConnectionAction = self.connections.connectionClosed(connection.id) + let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: false) let connectionAction: ConnectionAction if let newRequest = closedConnectionAction.newConnectionRequest { @@ -414,7 +442,19 @@ struct PoolStateMachine< return .init(request: .none, connection: connectionAction) - case .shuttingDown(graceful: false), .shutDown: + case .shuttingDown(graceful: false): + let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: true) + + let connectionAction: ConnectionAction + if self.connections.isEmpty { + self.poolState = .shutDown + connectionAction = .cancelEventStreamAndFinalCleanup(.init(closedConnectionAction.timersToCancel)) + } else { + connectionAction = .cancelTimers(closedConnectionAction.timersToCancel) + } + + return .init(request: .none, connection: connectionAction) + case .shutDown: return .none() } } @@ -442,13 +482,17 @@ struct PoolStateMachine< var shutdown = ConnectionAction.Shutdown() self.connections.triggerForceShutdown(&shutdown) - if shutdown.connections.isEmpty { + if self.connections.isEmpty, shutdown.connections.isEmpty { self.poolState = .shutDown + return .init( + request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), + connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel) + ) } return .init( request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown), - connection: .shutdown(shutdown) + connection: .initiateShutdown(shutdown) ) case .shuttingDown: @@ -481,6 +525,22 @@ struct PoolStateMachine< return .none() case .idle(_, let newIdle): + if case .shuttingDown = self.poolState { + switch self.connections.closeConnection(at: index) { + case .close(let closeAction): + return .init( + request: .none, + connection: .closeConnection(closeAction.connection, closeAction.timersToCancel) + ) + case .cancelTimers(let timers): + return .init( + request: .none, + connection: .cancelTimers(.init(timers)) + ) + case .doNothing: + return .none() + } + } let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers) return .init( @@ -569,6 +629,9 @@ extension PoolStateMachine { @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {} +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +extension PoolStateMachine.PoolState: Equatable {} + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable { @usableFromInline @@ -582,7 +645,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo return lhsConn === rhsConn && lhsToken == rhsToken case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)): return lhsConn === rhsConn && lhsTimers == rhsTimers - case (.shutdown(let lhs), .shutdown(let rhs)): + case (.initiateShutdown(let lhs), .initiateShutdown(let rhs)): + return lhs == rhs + case (.cancelEventStreamAndFinalCleanup(let lhs), .cancelEventStreamAndFinalCleanup(let rhs)): return lhs == rhs case (.cancelTimers(let lhs), .cancelTimers(let rhs)): return lhs == rhs diff --git a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift index f3664242..6ef6f6f4 100644 --- a/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift +++ b/Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift @@ -846,6 +846,202 @@ import Testing } } } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdown() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 1 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + await factory.nextConnectAttempt { connectionID in + return 1 + } + let lease = try await pool.leaseConnection() + pool.releaseConnection(lease.connection) + + pool.triggerForceShutdown() + + for connection in factory.runningConnections { + try await connection.signalToClose + connection.closeIfClosing() + } + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithLeasedConnection() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 1 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + await factory.nextConnectAttempt { connectionID in + return 1 + } + let lease = try await pool.leaseConnection() + + pool.triggerForceShutdown() + + pool.releaseConnection(lease.connection) + + for connection in factory.runningConnections { + try await connection.signalToClose + connection.closeIfClosing() + } + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithActiveRequest() async throws { + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + let connectionAttemptWaiter = Future(of: Void.self) + let triggerShutdownWaiter = Future(of: Void.self) + let leaseFailedWaiter = Future(of: Void.self) + + taskGroup.addTask { + await #expect(throws: ConnectionPoolError.poolShutdown) { + try await pool.leaseConnection() + } + leaseFailedWaiter.yield(value: ()) + } + + taskGroup.addTask { + try await factory.nextConnectAttempt { connectionID in + connectionAttemptWaiter.yield(value: ()) + try await triggerShutdownWaiter.success + return 1 + } + } + try await connectionAttemptWaiter.success + + pool.triggerForceShutdown() + + triggerShutdownWaiter.yield(value: ()) + + try await leaseFailedWaiter.success + + for connection in factory.runningConnections { + try await connection.signalToClose + connection.closeIfClosing() + } + } + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testForceShutdownWithConnectionBackingOff() async throws { + struct CreateConnectionFailed: Error {} + let clock = MockClock() + let factory = MockConnectionFactory() + let keepAliveDuration = Duration.seconds(30) + let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self) + + var mutableConfig = ConnectionPoolConfiguration() + mutableConfig.minimumConnectionCount = 0 + mutableConfig.maximumConnectionSoftLimit = 4 + mutableConfig.maximumConnectionHardLimit = 4 + mutableConfig.idleTimeout = .seconds(10) + let config = mutableConfig + + let pool = ConnectionPool( + configuration: config, + idGenerator: ConnectionIDGenerator(), + requestType: ConnectionRequest.self, + keepAliveBehavior: keepAlive, + observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self), + clock: clock + ) { + try await factory.makeConnection(id: $0, for: $1) + } + + await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + await pool.run() + } + + taskGroup.addTask { + await #expect(throws: ConnectionPoolError.poolShutdown) { + try await pool.leaseConnection() + } + } + + _ = try? await factory.nextConnectAttempt { connectionID in + throw CreateConnectionFailed() + } + + pool.triggerForceShutdown() + } + } } struct ConnectionFuture: ConnectionRequestProtocol { diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift index 6bfe0f39..87d39da9 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachine+ConnectionGroupTests.swift @@ -119,7 +119,7 @@ import Testing #expect(closeAction.connection === newConnection) #expect(connections.stats == .init(closing: 1, availableStreams: 0)) - let closeContext = connections.connectionClosed(newConnection.id) + let closeContext = connections.connectionClosed(newConnection.id, shuttingDown: false) #expect(closeContext.connectionsStarting == 0) #expect(connections.isEmpty) #expect(connections.stats == .init()) diff --git a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift index c748de28..a1ae9f8b 100644 --- a/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift +++ b/Tests/ConnectionPoolModuleTests/PoolStateMachineTests.swift @@ -170,7 +170,7 @@ typealias TestPoolStateMachine = PoolStateMachine< let shutdownAction = stateMachine.triggerForceShutdown() #expect(shutdownAction.request == .failRequests(.init(), .poolShutdown)) - #expect(shutdownAction.connection == .shutdown(.init())) + #expect(shutdownAction.connection == .initiateShutdown(.init())) } @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) @@ -390,4 +390,121 @@ typealias TestPoolStateMachine = PoolStateMachine< } } + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithIdleConnections() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + let connection1KeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 0, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction.connection == .scheduleTimers([connection1KeepAliveTimer])) + #expect(stateMachine.timerScheduled(connection1KeepAliveTimer, cancelContinuation: MockTimerCancellationToken(connection1KeepAliveTimer)) == .none) + + let shutdownAction = stateMachine.triggerForceShutdown() + var shutdown = TestPoolStateMachine.ConnectionAction.Shutdown() + shutdown.connections = [connection] + shutdown.timersToCancel = [MockTimerCancellationToken(connection1KeepAliveTimer)] + #expect(shutdownAction.connection == .initiateShutdown(shutdown)) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) + + #expect(stateMachine.poolState == .shutDown) + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithLeasedConnections() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + let connection1KeepAliveTimer = TestPoolStateMachine.Timer(.init(timerID: 0, connectionID: 0, usecase: .keepAlive), duration: .seconds(2)) + #expect(createdAction.connection == .scheduleTimers([connection1KeepAliveTimer])) + #expect(stateMachine.timerScheduled(connection1KeepAliveTimer, cancelContinuation: MockTimerCancellationToken(connection1KeepAliveTimer)) == .none) + + let request = MockRequest(connectionType: MockConnection.self) + let leaseAction = stateMachine.leaseConnection(request) + #expect(leaseAction.request == .leaseConnection(.init(element: request), connection)) + #expect(leaseAction.connection == .cancelTimers([MockTimerCancellationToken(connection1KeepAliveTimer)])) + + let shutdownAction = stateMachine.triggerForceShutdown() + var shutdown = TestPoolStateMachine.ConnectionAction.Shutdown() + shutdown.connections = [connection] + #expect(shutdownAction.connection == .initiateShutdown(shutdown)) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) + + #expect(stateMachine.poolState == .shutDown) + } + + @available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) + @Test func testTriggerForceShutdownWithInProgessRequest() { + var configuration = PoolConfiguration() + configuration.minimumConnectionCount = 1 + configuration.maximumConnectionSoftLimit = 2 + configuration.maximumConnectionHardLimit = 2 + configuration.keepAliveDuration = .seconds(2) + configuration.idleTimeoutDuration = .seconds(4) + + var stateMachine = TestPoolStateMachine( + configuration: configuration, + generator: .init(), + timerCancellationTokenType: MockTimerCancellationToken.self + ) + + // refill pool + let requests = stateMachine.refillConnections() + #expect(requests.count == 1) + + let shutdownAction = stateMachine.triggerForceShutdown() + #expect(shutdownAction.connection == .initiateShutdown(.init())) + + // make connection 1 + let connection = MockConnection(id: 0) + let createdAction = stateMachine.connectionEstablished(connection, maxStreams: 1) + #expect(createdAction.request == .none) + #expect(createdAction.connection == .closeConnection(connection, [])) + + let closedAction = stateMachine.connectionClosed(connection) + #expect(closedAction.connection == .cancelEventStreamAndFinalCleanup([])) + + #expect(stateMachine.poolState == .shutDown) + } }