Skip to content

Commit

Permalink
Add Test: Lease connection after shutdown has started fails (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
fabianfett committed Nov 14, 2023
1 parent e178163 commit dc94503
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 17 deletions.
116 changes: 116 additions & 0 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,122 @@ final class ConnectionPoolTests: XCTestCase {
}
}
}

func testLeasingConnectionAfterShutdownIsInvokedFails() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 4
mutableConfig.maximumConnectionSoftLimit = 4
mutableConfig.maximumConnectionHardLimit = 4
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionRequest<MockConnection>.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 persisted connections
for _ in 0..<4 {
await factory.nextConnectAttempt { connectionID in
return 1
}
}

// shutdown
taskGroup.cancelAll()

do {
_ = try await pool.leaseConnection()
XCTFail("Expected a failure")
} catch {
print("failed")
XCTAssertEqual(error as? ConnectionPoolError, .poolShutdown)
}

print("will close connections: \(factory.runningConnections)")
for connection in factory.runningConnections {
try await connection.signalToClose
connection.closeIfClosing()
}
}
}

func testLeasingConnectionsAfterShutdownIsInvokedFails() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 4
mutableConfig.maximumConnectionSoftLimit = 4
mutableConfig.maximumConnectionHardLimit = 4
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 persisted connections
for _ in 0..<4 {
await factory.nextConnectAttempt { connectionID in
return 1
}
}

// shutdown
taskGroup.cancelAll()

// create 4 connection requests
let requests = (0..<4).map { ConnectionFuture(id: $0) }

// lease 4 connections at once
pool.leaseConnections(requests)

for request in requests {
do {
_ = try await request.future.success
XCTFail("Expected a failure")
} catch {
XCTAssertEqual(error as? ConnectionPoolError, .poolShutdown)
}
}

for connection in factory.runningConnections {
try await connection.signalToClose
connection.closeIfClosing()
}
}
}
}

struct ConnectionFuture: ConnectionRequestProtocol {
Expand Down
66 changes: 49 additions & 17 deletions Tests/ConnectionPoolModuleTests/Mocks/MockConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,59 @@ import DequeModule
@testable import _ConnectionPoolModule

// Sendability enforced through the lock
final class MockConnection: PooledConnection, @unchecked Sendable {
final class MockConnection: PooledConnection, Sendable {
typealias ID = Int

let id: ID

private enum State {
case running([@Sendable ((any Error)?) -> ()])
case running([CheckedContinuation<Void, any Error>], [@Sendable ((any Error)?) -> ()])
case closing([@Sendable ((any Error)?) -> ()])
case closed
}

private let lock = NIOLock()
private var _state = State.running([])
private let lock: NIOLockedValueBox<State> = NIOLockedValueBox(.running([], []))

init(id: Int) {
self.id = id
}

var signalToClose: Void {
get async throws {
try await withCheckedThrowingContinuation { continuation in
let runRightAway = self.lock.withLockedValue { state -> Bool in
switch state {
case .running(var continuations, let callbacks):
continuations.append(continuation)
state = .running(continuations, callbacks)
return false

case .closing, .closed:
return true
}
}

if runRightAway {
continuation.resume()
}
}
}
}

func onClose(_ closure: @escaping @Sendable ((any Error)?) -> ()) {
let enqueued = self.lock.withLock { () -> Bool in
switch self._state {
let enqueued = self.lock.withLockedValue { state -> Bool in
switch state {
case .closed:
return false

case .running(var callbacks):
case .running(let continuations, var callbacks):
callbacks.append(closure)
self._state = .running(callbacks)
state = .running(continuations, callbacks)
return true

case .closing(var callbacks):
callbacks.append(closure)
self._state = .closing(callbacks)
state = .closing(callbacks)
return true
}
}
Expand All @@ -44,25 +65,30 @@ final class MockConnection: PooledConnection, @unchecked Sendable {
}

func close() {
self.lock.withLock {
switch self._state {
case .running(let callbacks):
self._state = .closing(callbacks)
let continuations = self.lock.withLockedValue { state -> [CheckedContinuation<Void, any Error>] in
switch state {
case .running(let continuations, let callbacks):
state = .closing(callbacks)
return continuations

case .closing, .closed:
break
return []
}
}

for continuation in continuations {
continuation.resume()
}
}

func closeIfClosing() {
let callbacks = self.lock.withLock { () -> [@Sendable ((any Error)?) -> ()] in
switch self._state {
let callbacks = self.lock.withLockedValue { state -> [@Sendable ((any Error)?) -> ()] in
switch state {
case .running, .closed:
return []

case .closing(let callbacks):
self._state = .closed
state = .closed
return callbacks
}
}
Expand All @@ -73,3 +99,9 @@ final class MockConnection: PooledConnection, @unchecked Sendable {
}
}

extension MockConnection: CustomStringConvertible {
var description: String {
let state = self.lock.withLockedValue { $0 }
return "MockConnection(id: \(self.id), state: \(state))"
}
}

0 comments on commit dc94503

Please sign in to comment.