Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ DerivedData
Package.resolved
.swiftpm
Tests/LinuxMain.swift
.vscode/
6 changes: 5 additions & 1 deletion Sources/ConnectionPoolModule/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,16 @@ public final class ConnectionPool<
self.closeConnection(connection)
self.cancelTimers(timers)

case .shutdown(let cleanup):
case .initiateShutdown(let cleanup):
for connection in cleanup.connections {
self.closeConnection(connection)
}
self.cancelTimers(cleanup.timersToCancel)

case .cancelEventStreamAndFinalCleanup(let timersToCancel):
self.cancelTimers(timersToCancel)
self.eventContinuation.finish()

case .none:
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ extension PoolStateMachine {
self.keepAliveReducesAvailableStreams = keepAliveReducesAvailableStreams
}

@usableFromInline
var isEmpty: Bool {
self.connections.isEmpty
}
Expand Down Expand Up @@ -397,7 +398,9 @@ extension PoolStateMachine {
return nil
}

let connectionInfo = self.connections[index].release(streams: streams)
guard let connectionInfo = self.connections[index].release(streams: streams) else {
return nil
}
self.stats.availableStreams += streams
self.stats.leasedStreams -= streams
switch connectionInfo {
Expand Down Expand Up @@ -514,6 +517,53 @@ extension PoolStateMachine {
)
}

@usableFromInline
enum CloseConnectionAction {
case close(CloseAction)
case cancelTimers(Max2Sequence<TimerCancellationToken>)
case doNothing
}
/// Closes the connection at the given index.
@inlinable
mutating func closeConnection(at index: Int) -> CloseConnectionAction {
guard let closeAction = self.connections[index].close() else {
return .doNothing // no action to take
}

self.stats.runningKeepAlive -= closeAction.runningKeepAlive ? 1 : 0
self.stats.availableStreams -= closeAction.maxStreams - closeAction.usedStreams

switch closeAction.previousConnectionState {
case .idle:
self.stats.idle -= 1
self.stats.closing += 1

case .leased:
self.stats.leased -= 1
self.stats.closing += 1

case .closing:
break

case .backingOff:
self.stats.backingOff -= 1
}

if let connection = closeAction.connection {
return .close(CloseAction(
connection: connection,
timersToCancel: closeAction.cancelTimers
))
} else {
// if there is no connection we should delete this now
var timersToCancel = closeAction.cancelTimers
if let cancellationTimer = self.swapForDeletion(index: index) {
timersToCancel.append(cancellationTimer)
}
return .cancelTimers(timersToCancel)
}
}

@inlinable
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
Expand All @@ -532,6 +582,16 @@ extension PoolStateMachine {
return self.closeConnectionIfIdle(at: index)
}

@inlinable
mutating func destroyFailedConnection(_ connectionID: Connection.ID) -> TimerCancellationToken? {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
preconditionFailure("Failing a connection we don't have a record of.")
}

self.connections[index].destroyFailedConnection()
return self.swapForDeletion(index: index)
}

/// Information around the failed/closed connection.
@usableFromInline
struct ClosedAction {
Expand Down Expand Up @@ -567,7 +627,7 @@ extension PoolStateMachine {
/// supplied index after this. If nil is returned the connection was closed by the state machine and was
/// therefore already removed.
@inlinable
mutating func connectionClosed(_ connectionID: Connection.ID) -> ClosedAction {
mutating func connectionClosed(_ connectionID: Connection.ID, shuttingDown: Bool) -> ClosedAction {
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
preconditionFailure("All connections that have been created should say goodbye exactly once!")
}
Expand Down Expand Up @@ -597,7 +657,7 @@ extension PoolStateMachine {
}

let newConnectionRequest: ConnectionRequest?
if self.connections.count < self.minimumConcurrentConnections {
if !shuttingDown, self.connections.count < self.minimumConcurrentConnections {
newConnectionRequest = self.createNewConnection()
} else {
newConnectionRequest = .none
Expand All @@ -613,18 +673,19 @@ extension PoolStateMachine {
// MARK: Shutdown

mutating func triggerForceShutdown(_ cleanup: inout ConnectionAction.Shutdown) {
for var connectionState in self.connections {
guard let closeAction = connectionState.close() else {
continue
}
for index in self.connections.indices {
switch closeConnection(at: index) {
case .close(let closeAction):
cleanup.connections.append(closeAction.connection)
cleanup.timersToCancel.append(contentsOf: closeAction.timersToCancel)

case .cancelTimers(let timers):
cleanup.timersToCancel.append(contentsOf: timers)

if let connection = closeAction.connection {
cleanup.connections.append(connection)
case .doNothing:
break
}
cleanup.timersToCancel.append(contentsOf: closeAction.cancelTimers)
}

self.connections = []
}

// MARK: - Private functions -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ extension PoolStateMachine {
}

@inlinable
mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo {
mutating func release(streams returnedStreams: UInt16) -> ConnectionAvailableInfo? {
switch self.state {
case .leased(let connection, let usedStreams, let maxStreams, let keepAlive):
precondition(usedStreams >= returnedStreams)
Expand All @@ -409,7 +409,11 @@ extension PoolStateMachine {
self.state = .leased(connection, usedStreams: newUsedStreams, maxStreams: maxStreams, keepAlive: keepAlive)
return .leased(availableStreams: availableStreams)
}
case .backingOff, .starting, .idle, .closing, .closed:

case .closing:
return nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we keep a record of all the connections during the shutdown It is now possible to catch a connection in the closing state when release is called after triggerForceShutdown


case .backingOff, .starting, .idle, .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}
Expand Down Expand Up @@ -587,10 +591,21 @@ extension PoolStateMachine {
runningKeepAlive: keepAlive.isRunning
)

case .leased, .closed:
case .leased, .closed, .closing:
return nil

case .backingOff, .starting, .closing:
case .backingOff, .starting:
preconditionFailure("Invalid state: \(self.state)")
}
}

@inlinable
mutating func destroyFailedConnection() {
switch self.state {
case .starting:
self.state = .closed

case .idle, .leased, .closed, .closing, .backingOff:
preconditionFailure("Invalid state: \(self.state)")
}
}
Expand Down
89 changes: 77 additions & 12 deletions Sources/ConnectionPoolModule/PoolStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ struct PoolStateMachine<
case runKeepAlive(Connection, TimerCancellationToken?)
case cancelTimers(TinyFastSequence<TimerCancellationToken>)
case closeConnection(Connection, Max2Sequence<TimerCancellationToken>)
case shutdown(Shutdown)

/// Start process of shutting down the connection pool. Close connections, cancel timers.
case initiateShutdown(Shutdown)
/// All connections have been closed, the pool event stream can be ended.
case cancelEventStreamAndFinalCleanup([TimerCancellationToken])
case none
}

Expand Down Expand Up @@ -256,11 +258,16 @@ struct PoolStateMachine<
@inlinable
mutating func connectionEstablished(_ connection: Connection, maxStreams: UInt16) -> Action {
switch self.poolState {
case .running, .shuttingDown(graceful: true):
case .running:
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
return self.handleAvailableConnection(index: index, availableContext: context)
case .shuttingDown(graceful: false), .shutDown:
return .init(request: .none, connection: .closeConnection(connection, []))

case .shuttingDown:
let (index, context) = self.connections.newConnectionEstablished(connection, maxStreams: maxStreams)
return self.handleAvailableConnection(index: index, availableContext: context)

case .shutDown:
fatalError("Connection pool is not running")
}
}

Expand Down Expand Up @@ -326,7 +333,18 @@ struct PoolStateMachine<
return .init(request: .none, connection: .scheduleTimers(.init(timer)))

case .shuttingDown(graceful: false), .shutDown:
return .none()
let timerToCancel = self.connections.destroyFailedConnection(request.connectionID)
let connectionAction: ConnectionAction
if self.connections.isEmpty {
self.poolState = .shutDown
connectionAction = .cancelEventStreamAndFinalCleanup(timerToCancel.map {[$0]} ?? [])
} else {
connectionAction = .cancelTimers(timerToCancel.map {[$0]} ?? [])
}
return .init(
request: .none,
connection: connectionAction
)
}
}

Expand All @@ -348,7 +366,17 @@ struct PoolStateMachine<
return .init(request: .none, connection: .makeConnection(request, timers))

case .cancelTimers(let timers):
return .init(request: .none, connection: .cancelTimers(.init(timers)))
let connectionAction: ConnectionAction
if self.connections.isEmpty {
self.poolState = .shutDown
connectionAction = .cancelEventStreamAndFinalCleanup(.init(timers))
} else {
connectionAction = .cancelTimers(.init(timers))
}
return .init(
request: .none,
connection: connectionAction
)
}

case .shuttingDown(graceful: false), .shutDown:
Expand Down Expand Up @@ -403,7 +431,7 @@ struct PoolStateMachine<
case .running, .shuttingDown(graceful: true):
self.cacheNoMoreConnectionsAllowed = false

let closedConnectionAction = self.connections.connectionClosed(connection.id)
let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: false)

let connectionAction: ConnectionAction
if let newRequest = closedConnectionAction.newConnectionRequest {
Expand All @@ -414,7 +442,19 @@ struct PoolStateMachine<

return .init(request: .none, connection: connectionAction)

case .shuttingDown(graceful: false), .shutDown:
case .shuttingDown(graceful: false):
let closedConnectionAction = self.connections.connectionClosed(connection.id, shuttingDown: true)

let connectionAction: ConnectionAction
if self.connections.isEmpty {
self.poolState = .shutDown
connectionAction = .cancelEventStreamAndFinalCleanup(.init(closedConnectionAction.timersToCancel))
} else {
connectionAction = .cancelTimers(closedConnectionAction.timersToCancel)
}

return .init(request: .none, connection: connectionAction)
case .shutDown:
return .none()
}
}
Expand Down Expand Up @@ -442,13 +482,17 @@ struct PoolStateMachine<
var shutdown = ConnectionAction.Shutdown()
self.connections.triggerForceShutdown(&shutdown)

if shutdown.connections.isEmpty {
if self.connections.isEmpty, shutdown.connections.isEmpty {
self.poolState = .shutDown
return .init(
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
connection: .cancelEventStreamAndFinalCleanup(shutdown.timersToCancel)
)
}

return .init(
request: .failRequests(self.requestQueue.removeAll(), ConnectionPoolError.poolShutdown),
connection: .shutdown(shutdown)
connection: .initiateShutdown(shutdown)
)

case .shuttingDown:
Expand Down Expand Up @@ -481,6 +525,22 @@ struct PoolStateMachine<
return .none()

case .idle(_, let newIdle):
if case .shuttingDown = self.poolState {
switch self.connections.closeConnection(at: index) {
case .close(let closeAction):
return .init(
request: .none,
connection: .closeConnection(closeAction.connection, closeAction.timersToCancel)
)
case .cancelTimers(let timers):
return .init(
request: .none,
connection: .cancelTimers(.init(timers))
)
case .doNothing:
return .none()
}
}
let timers = self.connections.parkConnection(at: index, hasBecomeIdle: newIdle).map(self.mapTimers)

return .init(
Expand Down Expand Up @@ -569,6 +629,9 @@ extension PoolStateMachine {
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.Action: Equatable where TimerCancellationToken: Equatable, Request: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.PoolState: Equatable {}

@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationToken: Equatable {
@usableFromInline
Expand All @@ -582,7 +645,9 @@ extension PoolStateMachine.ConnectionAction: Equatable where TimerCancellationTo
return lhsConn === rhsConn && lhsToken == rhsToken
case (.closeConnection(let lhsConn, let lhsTimers), .closeConnection(let rhsConn, let rhsTimers)):
return lhsConn === rhsConn && lhsTimers == rhsTimers
case (.shutdown(let lhs), .shutdown(let rhs)):
case (.initiateShutdown(let lhs), .initiateShutdown(let rhs)):
return lhs == rhs
case (.cancelEventStreamAndFinalCleanup(let lhs), .cancelEventStreamAndFinalCleanup(let rhs)):
return lhs == rhs
case (.cancelTimers(let lhs), .cancelTimers(let rhs)):
return lhs == rhs
Expand Down
Loading
Loading