Skip to content

Commit

Permalink
add ConnectionPool.close() + fix connect error bug (#35)
Browse files Browse the repository at this point in the history
* adds ConnectionPool.close() + fixes connect error bug

* add missing comments

* ensure no new connections can be created while waiting for connections to close

* add some missing inline comments

* fix docs typos

* update circle yml docker image

* catch ConnectionPoolError.closed specifically
  • Loading branch information
tanner0101 committed Mar 21, 2019
1 parent b575c85 commit 3d0892a
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 23 deletions.
86 changes: 75 additions & 11 deletions Sources/NIOKit/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public protocol ConnectionPoolSource {
public protocol ConnectionPoolItem: class {
/// If `true`, this connection has closed.
var isClosed: Bool { get }

/// Closes this connection.
func close() -> EventLoopFuture<Void>
}

/// Configuration options for `ConnectionPool`.
Expand All @@ -33,6 +36,12 @@ public struct ConnectionPoolConfig {
}
}

/// Errors thrown by `ConnectionPool`.
public enum ConnectionPoolError: Error {
/// The connection pool is closed.
case closed
}

/// Holds a collection of active connections that can be requested and later released
/// back into the pool.
///
Expand All @@ -56,6 +65,14 @@ public final class ConnectionPool<Source> where Source: ConnectionPoolSource {
/// Creates new connections when needed. See `ConnectionPoolSource`.
public let source: Source

/// This connection pool's event loop.
public var eventLoop: EventLoop {
return self.source.eventLoop
}

/// If `true`, this connection pool has been closed.
public private(set) var isClosed: Bool

// MARK: Private

/// All currently available connections.
Expand Down Expand Up @@ -86,6 +103,7 @@ public final class ConnectionPool<Source> where Source: ConnectionPoolSource {
self.available.reserveCapacity(config.maxConnections)
self.activeConnections = 0
self.waiters = .init(initialCapacity: 0)
self.isClosed = false
}

/// Fetches a pooled connection for the lifetime of the closure.
Expand Down Expand Up @@ -126,21 +144,31 @@ public final class ConnectionPool<Source> where Source: ConnectionPoolSource {
///
/// - returns: A future containing the requested connection.
public func requestConnection() -> EventLoopFuture<Source.Connection> {
guard !self.isClosed else {
return self.source.eventLoop.makeFailedFuture(ConnectionPoolError.closed)
}

if let conn = self.available.popLast() {
// check if it is still open
if !conn.isClosed {
// connection is still open, we can return it directly
return self.source.eventLoop.makeSucceededFuture(conn)
} else {
// connection is closed, we need to replace it
return self.source.makeConnection()
return self.source.makeConnection().flatMapErrorThrowing { error in
self.activeConnections -= 1
throw error
}
}
} else if self.activeConnections < self.config.maxConnections {
// all connections are busy, but we have room to open a new connection!
self.activeConnections += 1

// make the new connection
return self.source.makeConnection()
return self.source.makeConnection().flatMapErrorThrowing { error in
self.activeConnections -= 1
throw error
}
} else {
// connections are exhausted, we must wait for one to be returned
let promise = self.source.eventLoop.makePromise(of: Source.Connection.self)
Expand All @@ -158,15 +186,51 @@ public final class ConnectionPool<Source> where Source: ConnectionPoolSource {
/// - parameters:
/// - conn: Connection to release back to the pool.
public func releaseConnection(_ conn: Source.Connection) {
// add this connection back to the list of available
self.available.append(conn)

// now that we know a new connection is available, we should
// take this chance to fulfill one of the waiters
if !self.waiters.isEmpty {
self.requestConnection().cascade(
to: self.waiters.removeFirst()
)
if self.isClosed {
// this pool is closed and we are responsible for closing all
// of our connections
_ = conn.close()
} else {
// add this connection back to the list of available
self.available.append(conn)

// now that we know a new connection is available, we should
// take this chance to fulfill one of the waiters
if !self.waiters.isEmpty {
self.requestConnection().cascade(
to: self.waiters.removeFirst()
)
}
}
}

/// Closes the connection pool.
///
/// All available connections will be closed immediately.
/// Any connections currently in use will be closed when they are returned to the pool.
///
/// Once closed, the connection pool cannot be used to create new connections.
///
/// Connection pools must be closed before they deinitialize.
///
/// - returns: A future indicating close completion.
public func close() -> EventLoopFuture<Void> {
self.isClosed = true
return self.available.map { $0.close() }.flatten(on: self.eventLoop).map {
// inform any waiters that they will never be receiving a connection
while let waiter = self.waiters.popFirst() {
waiter.fail(ConnectionPoolError.closed)
}

// reset any variables to free up memory
self.available = []
self.activeConnections = 0
}
}

deinit {
if !self.isClosed {
assertionFailure("ConnectionPool deinitialized without being closed.")
}
}
}
21 changes: 19 additions & 2 deletions Sources/NIOKit/EventLoopFuture/Collection+Flatten.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import NIO

extension Collection {

/// Converts a collection of `EventLoopFuture`s to an `EventLoopFuture` that wraps an array with the future values.
///
/// Acts as a helper for the `EventLoop.flatten(_:[EventLoopFuture<Value>])` method.
Expand All @@ -12,7 +11,25 @@ extension Collection {
///
/// - parameter eventLoop: The event-loop to succeed the futures on.
/// - returns: The succeeded values in an array, wrapped in an `EventLoopFuture`.
public func flatten<Value>(on eventLoop: EventLoop) -> EventLoopFuture<[Value]> where Element == EventLoopFuture<Value> {
public func flatten<Value>(on eventLoop: EventLoop) -> EventLoopFuture<[Value]>
where Element == EventLoopFuture<Value>
{
return eventLoop.flatten(Array(self))
}
}

extension Array where Element == EventLoopFuture<Void> {
/// Converts a collection of `EventLoopFuture<Void>`s to an `EventLoopFuture<Void>`.
///
/// Acts as a helper for the `EventLoop.flatten(_:[EventLoopFuture<Value>])` method.
///
/// let futures = [el.future(1), el.future(2), el.future(3), el.future(4)]
/// let flattened = futures.flatten(on: el)
/// // flattened: EventLoopFuture<Void>
///
/// - parameter eventLoop: The event-loop to succeed the futures on.
/// - returns: The succeeded future.
public func flatten(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
return .andAllSucceed(self, on: eventLoop)
}
}
71 changes: 64 additions & 7 deletions Tests/NIOKitTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public final class ConnectionPoolTests: XCTestCase {
func testPooling() throws {
let foo = FooDatabase()
let pool = ConnectionPool(config: .init(maxConnections: 2), source: foo)
defer { try! pool.close().wait() }

// make two connections
let connA = try pool.requestConnection().wait()
Expand Down Expand Up @@ -32,7 +33,7 @@ public final class ConnectionPoolTests: XCTestCase {
XCTAssertEqual(foo.connectionsCreated, 2)

// this time, close the connection before releasing it
connC!.close()
try connC!.close().wait()
pool.releaseConnection(connC!)
XCTAssert(connD !== connB)
XCTAssertEqual(connD?.isClosed, false)
Expand All @@ -42,6 +43,7 @@ public final class ConnectionPoolTests: XCTestCase {
func testFIFOWaiters() throws {
let foo = FooDatabase()
let pool = ConnectionPool(config: .init(maxConnections: 1), source: foo)
defer { try! pool.close().wait() }
// * User A makes a request for a connection, gets connection number 1.
let a_1 = pool.requestConnection()
let a = try a_1.wait()
Expand All @@ -67,6 +69,52 @@ public final class ConnectionPoolTests: XCTestCase {
XCTAssert(a === c)
}


func testConnectError() throws {
let db = ErrorDatabase()
let pool = ConnectionPool(config: .init(maxConnections: 1), source: db)
defer { try! pool.close().wait() }
do {
_ = try pool.requestConnection().wait()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}

// test that we can still make another request even after a failed request
do {
_ = try pool.requestConnection().wait()
XCTFail("should not have created connection")
} catch _ as ErrorDatabase.Error {
// pass
}
}

func testPoolClose() throws {
let foo = FooDatabase()
let pool = ConnectionPool(config: .init(maxConnections: 1), source: foo)
let _ = try pool.requestConnection().wait()
let b = pool.requestConnection()
try pool.close().wait()
let c = pool.requestConnection()

// check that waiters are failed
do {
_ = try b.wait()
XCTFail("should not have created connection")
} catch ConnectionPoolError.closed {
// pass
}

// check that new requests fail
do {
_ = try c.wait()
XCTFail("should not have created connection")
} catch ConnectionPoolError.closed {
// pass
}
}

func testPerformance() {
guard performance(expected: 0.088) else { return }
let foo = FooDatabase()
Expand Down Expand Up @@ -95,12 +143,20 @@ public final class ConnectionPoolTests: XCTestCase {
}
}
}
}

private struct ErrorDatabase: ConnectionPoolSource {
enum Error: Swift.Error {
case test
}

var eventLoop: EventLoop {
return EmbeddedEventLoop()
}

public static let allTests = [
("testPooling", testPooling),
("testFIFOWaiters", testFIFOWaiters),
("testPerformance", testPerformance),
]
func makeConnection() -> EventLoopFuture<FooConnection> {
return self.eventLoop.makeFailedFuture(Error.test)
}
}

private final class FooDatabase: ConnectionPoolSource {
Expand All @@ -124,8 +180,9 @@ private final class FooConnection: ConnectionPoolItem {
self.isClosed = false
}

func close() {
func close() -> EventLoopFuture<Void> {
self.isClosed = true
return EmbeddedEventLoop().makeSucceededFuture(())
}
}

Expand Down
2 changes: 2 additions & 0 deletions Tests/NIOKitTests/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ extension ConnectionPoolTests {
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__ConnectionPoolTests = [
("testConnectError", testConnectError),
("testFIFOWaiters", testFIFOWaiters),
("testPerformance", testPerformance),
("testPoolClose", testPoolClose),
("testPooling", testPooling),
]
}
Expand Down
6 changes: 3 additions & 3 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ version: 2
jobs:
linux:
docker:
- image: codevapor/swift:5.0
- image: vapor/swift:5.0
steps:
- checkout
- run: swift build
- run: swift test
linux-release:
docker:
- image: codevapor/swift:5.0
- image: vapor/swift:5.0
steps:
- checkout
- run: swift build -c release
linux-performance:
docker:
- image: codevapor/swift:5.0
- image: vapor/swift:5.0
steps:
- checkout
- run: swift test -c release -Xswiftc -enable-testing
Expand Down

0 comments on commit 3d0892a

Please sign in to comment.