Skip to content

Commit

Permalink
Timeout connection requests when in a deadlock. (#67)
Browse files Browse the repository at this point in the history
* Add deadlock timeout & test

* Add XCTFail case

* Cleanup

* Review comments

* Review comments
  • Loading branch information
MrLotU committed Jun 30, 2020
1 parent 8e2cad8 commit 7457413
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Sources/AsyncKit/ConnectionPool/ConnectionPoolError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,8 @@ public enum ConnectionPoolError: Error {
/// The connection pool has shutdown.
case shutdown
}

public enum ConnectionPoolTimeoutError: Error {
/// The connection request timed out.
case connectionRequestTimeout
}
22 changes: 20 additions & 2 deletions Sources/AsyncKit/ConnectionPool/EventLoopConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import struct NIO.CircularBuffer
import struct NIO.TimeAmount
import struct Logging.Logger
import struct Foundation.UUID

/// Holds a collection of active connections that can be requested and later released
/// back into the pool.
Expand All @@ -24,6 +26,9 @@ public final class EventLoopConnectionPool<Source> where Source: ConnectionPoolS
/// Max connections for this storage.
private let maxConnections: Int

/// Timeout for requesting a new connection.
private let requestTimeout: TimeAmount

/// This pool's event loop.
public let eventLoop: EventLoop

Expand Down Expand Up @@ -55,16 +60,20 @@ public final class EventLoopConnectionPool<Source> where Source: ConnectionPoolS
/// - source: Creates new connections when needed.
/// - maxConnections: Limits the number of connections that can be open.
/// Defaults to 1.
/// - requestTimeout: Timeout for requesting a new connection.
/// Defaults to 10 seconds.
/// - logger: For lifecycle logs.
/// - on: Event loop.
public init(
source: Source,
maxConnections: Int,
requestTimeout: TimeAmount = .seconds(10),
logger: Logger = .init(label: "codes.vapor.pool"),
on eventLoop: EventLoop
) {
self.source = source
self.maxConnections = maxConnections
self.requestTimeout = requestTimeout
self.logger = logger
self.eventLoop = eventLoop
self.available = .init(initialCapacity: maxConnections)
Expand Down Expand Up @@ -198,8 +207,17 @@ public final class EventLoopConnectionPool<Source> where Source: ConnectionPoolS
logger.debug("Connection pool exhausted on this event loop, adding request to waitlist")
let promise = eventLoop.makePromise(of: Source.Connection.self)
self.waiters.append((logger, promise))
// return waiter
return promise.futureResult

let task = eventLoop.scheduleTask(in: self.requestTimeout) { [weak self] in
guard let self = self else { return }
logger.error("Connection request timed out. This might indicate a connection deadlock in your application.")
if let idx = self.waiters.firstIndex(where: { _, p in return p.futureResult === promise.futureResult }) {
self.waiters.remove(at: idx)
}
promise.fail(ConnectionPoolTimeoutError.connectionRequestTimeout)
}

return promise.futureResult.always { _ in task.cancel() }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import struct Logging.Logger
import struct NIO.TimeAmount
import class NIOConcurrencyHelpers.Lock
import Dispatch

Expand Down Expand Up @@ -53,11 +54,14 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
/// - source: Creates new connections when needed.
/// - maxConnectionsPerEventLoop: Limits the number of connections that can be open per event loop.
/// Defaults to 1.
/// - requestTimeout: Timeout for requesting a new connection.
/// Defaults to 10 seconds.
/// - logger: For lifecycle logs.
/// - on: Event loop group.
public init(
source: Source,
maxConnectionsPerEventLoop: Int = 1,
requestTimeout: TimeAmount = .seconds(10),
logger: Logger = .init(label: "codes.vapor.pool"),
on eventLoopGroup: EventLoopGroup
) {
Expand All @@ -70,6 +74,7 @@ public final class EventLoopGroupConnectionPool<Source> where Source: Connection
self.storage = .init(uniqueKeysWithValues: eventLoopGroup.makeIterator().map { ($0.key, .init(
source: source,
maxConnections: maxConnectionsPerEventLoop,
requestTimeout: requestTimeout,
logger: logger,
on: $0
)) })
Expand Down
21 changes: 21 additions & 0 deletions Tests/AsyncKitTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,27 @@ final class ConnectionPoolTests: XCTestCase {
// pass
}
}

// https://github.com/vapor/async-kit/issues/63
func testDeadlock() {
let foo = FooDatabase()
let pool = EventLoopConnectionPool(
source: foo,
maxConnections: 1,
requestTimeout: .milliseconds(100),
on: self.eventLoopGroup.next()
)
defer { try! pool.close().wait() }
_ = pool.requestConnection()
let start = Date()
let a = pool.requestConnection()
XCTAssertThrowsError(try a.wait(), "Connection should have deadlocked and thrown ConnectionPoolTimeoutError.connectionRequestTimeout") { (error) in
let interval = Date().timeIntervalSince(start)
XCTAssertGreaterThan(interval, 0.1)
XCTAssertLessThan(interval, 0.2)
XCTAssertEqual(error as? ConnectionPoolTimeoutError, ConnectionPoolTimeoutError.connectionRequestTimeout)
}
}

func testPerformance() {
guard performance(expected: 0.088) else { return }
Expand Down

0 comments on commit 7457413

Please sign in to comment.