Skip to content

Commit

Permalink
Add test to lease multiple connections at once (#440)
Browse files Browse the repository at this point in the history
- Add test to lease multiple connections at once
- Rename `Waiter` to `Future`
- Rename `Waiter.Result` to `Future.Success`
  • Loading branch information
fabianfett committed Nov 13, 2023
1 parent d5d16e3 commit e178163
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 14 deletions.
86 changes: 84 additions & 2 deletions Tests/ConnectionPoolModuleTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ final class ConnectionPoolTests: XCTestCase {
_ = try await pool.leaseConnection()
}

let connectionAttemptWaiter = Waiter(of: Void.self)
let connectionAttemptWaiter = Future(of: Void.self)

taskGroup.addTask {
try await factory.nextConnectAttempt { connectionID in
Expand All @@ -410,7 +410,7 @@ final class ConnectionPoolTests: XCTestCase {
}
}

try await connectionAttemptWaiter.result
try await connectionAttemptWaiter.success
leaseTask.cancel()

let taskResult = await leaseTask.result
Expand All @@ -427,5 +427,87 @@ final class ConnectionPoolTests: XCTestCase {
}
}
}

func testLeasingMultipleConnectionsAtOnceWorks() async throws {
let clock = MockClock()
let factory = MockConnectionFactory<MockClock>()
let keepAliveDuration = Duration.seconds(30)
let keepAlive = MockPingPongBehavior(keepAliveFrequency: keepAliveDuration, connectionType: MockConnection.self)

var mutableConfig = ConnectionPoolConfiguration()
mutableConfig.minimumConnectionCount = 4
mutableConfig.maximumConnectionSoftLimit = 4
mutableConfig.maximumConnectionHardLimit = 4
mutableConfig.idleTimeout = .seconds(10)
let config = mutableConfig

let pool = ConnectionPool(
configuration: config,
idGenerator: ConnectionIDGenerator(),
requestType: ConnectionFuture.self,
keepAliveBehavior: keepAlive,
observabilityDelegate: NoOpConnectionPoolMetrics(connectionIDType: MockConnection.ID.self),
clock: clock
) {
try await factory.makeConnection(id: $0, for: $1)
}

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
await pool.run()
}

// create 4 persisted connections
for _ in 0..<4 {
await factory.nextConnectAttempt { connectionID in
return 1
}
}

// create 4 connection requests
let requests = (0..<4).map { ConnectionFuture(id: $0) }

// lease 4 connections at once
pool.leaseConnections(requests)
var connections = [MockConnection]()

for request in requests {
let connection = try await request.future.success
connections.append(connection)
}

// Ensure that we got 4 distinct connections
XCTAssertEqual(Set(connections.lazy.map(\.id)).count, 4)

// release all 4 leased connections
for connection in connections {
pool.releaseConnection(connection)
}

// shutdown
taskGroup.cancelAll()
for connection in factory.runningConnections {
connection.closeIfClosing()
}
}
}
}

struct ConnectionFuture: ConnectionRequestProtocol {
let id: Int
let future: Future<MockConnection>

init(id: Int) {
self.id = id
self.future = Future(of: MockConnection.self)
}

func complete(with result: Result<MockConnection, ConnectionPoolError>) {
switch result {
case .success(let success):
self.future.yield(value: success)
case .failure(let failure):
self.future.yield(error: failure)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ final class MockConnectionFactory<Clock: _Concurrency.Clock> where Clock.Duratio

func makeConnection(
id: Int,
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, ConnectionRequest<MockConnection>, Int, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock>
for pool: ConnectionPool<MockConnection, Int, ConnectionIDGenerator, some ConnectionRequestProtocol, Int, MockPingPongBehavior<MockConnection>, NoOpConnectionPoolMetrics<Int>, Clock>
) async throws -> ConnectionAndMetadata<MockConnection> {
// we currently don't support cancellation when creating a connection
let result = try await withCheckedThrowingContinuation { (checkedContinuation: CheckedContinuation<(MockConnection, UInt16), any Error>) in
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
import Atomics
@testable import _ConnectionPoolModule

final class Waiter<Result: Sendable>: Sendable {
/// This is a `Future` type that shall make writing tests a bit simpler. I'm well aware, that this is a pattern
/// that should not be embraced with structured concurrency. However writing all tests in full structured
/// concurrency is an effort, that isn't worth the endgoals in my view.
final class Future<Success: Sendable>: Sendable {
struct State: Sendable {

var result: Swift.Result<Result, any Error>? = nil
var continuations: [(Int, CheckedContinuation<Result, any Error>)] = []
var result: Swift.Result<Success, any Error>? = nil
var continuations: [(Int, CheckedContinuation<Success, any Error>)] = []

}

let waiterID = ManagedAtomic(0)
let stateBox: NIOLockedValueBox<State> = NIOLockedValueBox(State())

init(of: Result.Type) {}
init(of: Success.Type) {}

enum GetAction {
case fail(any Error)
case succeed(Result)
case succeed(Success)
case none
}

var result: Result {
var success: Success {
get async throws {
let waiterID = self.waiterID.loadThenWrappingIncrement(ordering: .relaxed)

return try await withTaskCancellationHandler {
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Result, any Error>) in
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Success, any Error>) in
let action = self.stateBox.withLockedValue { state -> GetAction in
if Task.isCancelled {
return .fail(CancellationError())
Expand Down Expand Up @@ -56,7 +59,7 @@ final class Waiter<Result: Sendable>: Sendable {
}
}
} onCancel: {
let cont = self.stateBox.withLockedValue { state -> CheckedContinuation<Result, any Error>? in
let cont = self.stateBox.withLockedValue { state -> CheckedContinuation<Success, any Error>? in
guard state.result == nil else { return nil }

guard let contIndex = state.continuations.firstIndex(where: { $0.0 == waiterID }) else {
Expand All @@ -71,10 +74,10 @@ final class Waiter<Result: Sendable>: Sendable {
}
}

func yield(value: Result) {
func yield(value: Success) {
let continuations = self.stateBox.withLockedValue { state in
guard state.result == nil else {
return [(Int, CheckedContinuation<Result, any Error>)]().lazy.map(\.1)
return [(Int, CheckedContinuation<Success, any Error>)]().lazy.map(\.1)
}
state.result = .success(value)

Expand All @@ -92,7 +95,7 @@ final class Waiter<Result: Sendable>: Sendable {
func yield(error: any Error) {
let continuations = self.stateBox.withLockedValue { state in
guard state.result == nil else {
return [(Int, CheckedContinuation<Result, any Error>)]().lazy.map(\.1)
return [(Int, CheckedContinuation<Success, any Error>)]().lazy.map(\.1)
}
state.result = .failure(error)

Expand Down

0 comments on commit e178163

Please sign in to comment.