diff --git a/Sources/MongoSwift/APM.swift b/Sources/MongoSwift/APM.swift index 55cf41bd2..304156046 100644 --- a/Sources/MongoSwift/APM.swift +++ b/Sources/MongoSwift/APM.swift @@ -701,7 +701,8 @@ private func publishEvent(type: T.Type, eventPtr: OpaquePoin /// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring. extension ConnectionPool { - /// Internal function to install monitoring callbacks for this pool. + /// Internal function to install monitoring callbacks for this pool. **This method may only be called before any + /// connections are checked out from the pool.** internal func initializeMonitoring(client: MongoClient) { guard let callbacks = mongoc_apm_callbacks_new() else { fatalError("failed to initialize new mongoc_apm_callbacks_t") @@ -722,13 +723,6 @@ extension ConnectionPool { mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded) mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed) - // we can pass the MongoClient as unretained because the callbacks are stored on clientHandle, so if the - // callback is being executed, this pool and therefore its parent `MongoClient` must still be valid. - switch self.state { - case let .open(pool): - mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) - case .closed: - fatalError("ConnectionPool was already closed") - } + self.setAPMCallbacks(callbacks: callbacks, client: client) } } diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 961b880cc..3ccf05b89 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -1,4 +1,6 @@ import CLibMongoC +import Foundation +import NIOConcurrencyHelpers /// A connection to the database. internal class Connection { @@ -13,27 +15,60 @@ internal class Connection { } deinit { - switch self.pool.state { - case let .open(pool): - mongoc_client_pool_push(pool, self.clientHandle) - case .closed: - assertionFailure("ConnectionPool was already closed") + do { + try self.pool.checkIn(self) + } catch { + assertionFailure("Failed to check connection back in: \(error)") } } } +extension NSCondition { + fileprivate func withLock(_ body: () throws -> T) rethrows -> T { + self.lock() + defer { self.unlock() } + return try body() + } +} + /// A pool of one or more connections. internal class ConnectionPool { /// Represents the state of a `ConnectionPool`. internal enum State { /// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`. case open(pool: OpaquePointer) + /// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but + /// no new connections can be checked out. + case closing(pool: OpaquePointer) /// Indicates that the `ConnectionPool` has been closed and contains no connections. case closed } /// The state of this `ConnectionPool`. - internal private(set) var state: State + private var state: State + /// The number of connections currently checked out of the pool. + private var connCount = 0 + /// Lock over `state` and `connCount`. + private let stateLock = NSCondition() + + /// Internal helper for testing purposes that returns whether the pool is in the `closing` state. + internal var isClosing: Bool { + self.stateLock.withLock { + guard case .closing = self.state else { + return false + } + return true + } + } + + /// Internal helper for testing purposes that returns the number of connections currently checked out from the pool. + internal var checkedOutConnections: Int { + self.stateLock.withLock { + self.connCount + } + } + + internal static let PoolClosedError = LogicError(message: "ConnectionPool was already closed") /// Initializes the pool using the provided `ConnectionString` and options. internal init(from connString: ConnectionString, options: ClientOptions?) throws { @@ -56,7 +91,7 @@ internal class ConnectionPool { self.state = .open(pool: pool) if let options = options { - try self.setTLSOptions(options) + self.setTLSOptions(options) } } @@ -67,38 +102,79 @@ internal class ConnectionPool { } } - /// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server. - internal func shutdown() { - switch self.state { - case let .open(pool): - mongoc_client_pool_destroy(pool) - case .closed: - return + /// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to + /// the pool.** + internal func close() throws { + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + self.state = .closing(pool: pool) + case .closing, .closed: + throw Self.PoolClosedError + } + + while self.connCount > 0 { + // wait for signal from checkIn(). + self.stateLock.wait() + } + + switch self.state { + case .open, .closed: + throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()") + case let .closing(pool): + mongoc_client_pool_destroy(pool) + self.state = .closed + } } - self.state = .closed } /// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0. - /// This method will block until a connection is available. + /// This method will block until a connection is available. Throws an error if the pool is in the process of + /// closing or has finished closing. internal func checkOut() throws -> Connection { - switch self.state { - case let .open(pool): - return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + self.connCount += 1 + return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self) + case .closing, .closed: + throw Self.PoolClosedError + } } } - /// Checks out a connection from the pool, or returns `nil` if none are currently available. + /// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the + /// pool is not open. This method may block waiting on the state lock as well as libmongoc's locks and thus must be + // run within the thread pool. internal func tryCheckOut() throws -> Connection? { - switch self.state { - case let .open(pool): - guard let handle = mongoc_client_pool_try_pop(pool) else { - return nil + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + guard let handle = mongoc_client_pool_try_pop(pool) else { + return nil + } + self.connCount += 1 + return Connection(clientHandle: handle, pool: self) + case .closing, .closed: + throw Self.PoolClosedError + } + } + } + + /// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of + /// closing; throws an error if the pool has already finished closing. This method may block waiting on the state + /// lock as well as libmongoc's locks, and thus must be run within the thread pool. + fileprivate func checkIn(_ connection: Connection) throws { + try self.stateLock.withLock { + switch self.state { + case let .open(pool), let .closing(pool): + mongoc_client_pool_push(pool, connection.clientHandle) + self.connCount -= 1 + // signal to close() that we are updating the count. + self.stateLock.signal() + case .closed: + throw Self.PoolClosedError } - return Connection(clientHandle: handle, pool: self) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") } } @@ -109,9 +185,9 @@ internal class ConnectionPool { return try body(connection) } - // Sets TLS/SSL options that the user passes in through the client level. This must be called from - // the ConnectionPool init before the pool is used. - private func setTLSOptions(_ options: ClientOptions) throws { + // Sets TLS/SSL options that the user passes in through the client level. **This must only be called from + // the ConnectionPool initializer**. + private func setTLSOptions(_ options: ClientOptions) { // return early so we don't set an empty options struct on the libmongoc pool. doing so will make libmongoc // attempt to use TLS for connections. guard options.tls == true || @@ -147,11 +223,14 @@ internal class ConnectionPool { if let invalidHosts = options.tlsAllowInvalidHostnames { opts.allow_invalid_hostname = invalidHosts } + + // lock isn't needed as this is called before pool is in use. switch self.state { case let .open(pool): mongoc_client_pool_set_ssl_opts(pool, &opts) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") + case .closing, .closed: + // if we get here, we must have called this method outside of `ConnectionPool.init`. + fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options") } } @@ -187,6 +266,22 @@ internal class ConnectionPool { return ConnectionString(copying: uri) } } + + /// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method + /// may only be called before any connections are checked out of the pool.** Ideally this code would just live in + /// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it + /// as the context. In order for it to be fully initialized its pool must exist already. + internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) { + // lock isn't needed as this is called before pool is in use. + switch self.state { + case let .open(pool): + mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) + case .closing, .closed: + // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`. + // unless we have a bug it's impossible that the pool is already closed. + fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks") + } + } } extension String { diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index cce6ab3cc..d1563c289 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -157,8 +157,10 @@ public struct DatabaseOptions: CodingStrategyProvider { // sourcery: skipSyncExport /// A MongoDB Client providing an asynchronous, SwiftNIO-based API. public class MongoClient { + /// The pool of connections backing this client. internal let connectionPool: ConnectionPool + /// Executor responsible for executing operations on behalf of this client and its child objects. internal let operationExecutor: OperationExecutor /// Default size for a client's NIOThreadPool. @@ -167,8 +169,10 @@ public class MongoClient { /// Default maximum size for connection pools created by this client. internal static let defaultMaxConnectionPoolSize = 100 - /// Indicates whether this client has been closed. - internal private(set) var isClosed = false + /// Indicates whether this client has been closed. A lock around this variable is not needed because: + /// - This value is only modified on success of `ConnectionPool.close()`. That method will succeed exactly once. + /// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete. + private var wasClosed = false /// Handlers for command monitoring events. internal var commandEventHandlers: [CommandEventHandler] @@ -182,9 +186,6 @@ public class MongoClient { /// A unique identifier for this client. Sets _id to the generator's current value and increments the generator. internal let _id = clientIdGenerator.add(1) - /// Error thrown when user attempts to use a closed client. - internal static let ClosedClientError = LogicError(message: "MongoClient was already closed") - /// Encoder whose options are inherited by databases derived from this client. public let encoder: BSONEncoder @@ -255,21 +256,37 @@ public class MongoClient { } deinit { - assert(self.isClosed, "MongoClient was not closed before deinitialization") + assert( + self.wasClosed, + "MongoClient was not closed before deinitialization. " + + "Please call `close()` or `syncClose()` when the client is no longer needed." + ) } - /// Shuts this `MongoClient` down, closing all connection to the server and cleaning up internal state. - /// Call this method exactly once when you are finished using the client. You must ensure that all operations - /// using the client have completed before calling this. The returned future must be fulfilled before the - /// `EventLoopGroup` provided to this client's constructor is shut down. - public func shutdown() -> EventLoopFuture { - self.operationExecutor.execute { - self.connectionPool.shutdown() - self.isClosed = true + /** + * Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. + * + * Call this method exactly once when you are finished using the client. You must ensure that all operations using + * the client have completed before calling this. + * + * The returned future will not be fulfilled until all cursors and change streams created from this client have been + * been killed, and all sessions created from this client have been ended. + * + * The returned future must be fulfilled before the `EventLoopGroup` provided to this client's constructor is shut + * down. + */ + public func close() -> EventLoopFuture { + let closeResult = self.operationExecutor.execute { + try self.connectionPool.close() } .flatMap { - self.operationExecutor.close() + self.operationExecutor.shutdown() + } + closeResult.whenComplete { _ in + self.wasClosed = true } + + return closeResult } /** @@ -277,15 +294,15 @@ public class MongoClient { * internal state. * * Call this method exactly once when you are finished using the client. You must ensure that all operations - * using the client have completed before calling this. + * using the client have completed before calling this. This method will block until all cursors and change streams + * created from this client have been killed, and all sessions created from this client have been ended. * * This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down. */ - public func syncShutdown() { - self.connectionPool.shutdown() - self.isClosed = true - // TODO: SWIFT-349 log any errors encountered here. - try? self.operationExecutor.syncClose() + public func syncClose() throws { + try self.connectionPool.close() + try self.operationExecutor.syncShutdown() + self.wasClosed = true } /// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call @@ -602,15 +619,6 @@ public class MongoClient { self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) } - /// Executes an `Operation` using this `MongoClient` and an optionally provided session. - internal func executeOperation( - _ operation: T, - using connection: Connection? = nil, - session: ClientSession? = nil - ) throws -> T.OperationResult { - try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait() - } - /// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block /// and is for testing purposes only**. internal func getMongocReadConcern() throws -> ReadConcern? { diff --git a/Sources/MongoSwift/Operations/Operation.swift b/Sources/MongoSwift/Operations/Operation.swift index c10722e27..b89dcfcc2 100644 --- a/Sources/MongoSwift/Operations/Operation.swift +++ b/Sources/MongoSwift/Operations/Operation.swift @@ -33,7 +33,7 @@ internal class OperationExecutor { } /// Closes the executor's underlying thread pool. - internal func close() -> EventLoopFuture { + internal func shutdown() -> EventLoopFuture { let promise = self.eventLoopGroup.next().makePromise(of: Void.self) self.threadPool.shutdownGracefully { error in if let error = error { @@ -46,35 +46,23 @@ internal class OperationExecutor { } /// Closes the executor's underlying thread pool synchronously. - internal func syncClose() throws { + internal func syncShutdown() throws { try self.threadPool.syncShutdownGracefully() } internal func execute( _ operation: T, - using connection: Connection? = nil, client: MongoClient, session: ClientSession? ) -> EventLoopFuture { - // early exit and don't attempt to use the thread pool if we've already closed the client. - guard !client.isClosed else { - return self.makeFailedFuture(MongoClient.ClosedClientError) - } - // closure containing the work to run in the thread pool: obtain a connection and execute the operation. let doOperation = { () -> ExecuteResult in - // it's possible that the client was closed in between submitting this task and it being executed, so we - // check again here. - guard !client.isClosed else { - throw MongoClient.ClosedClientError - } - // select a connection in following order of priority: // 1. connection specifically provided for use with this operation // 2. if a session was provided, use its underlying connection // 3. a new connection from the pool, if available - guard let connection = try connection ?? - session?.getConnection(forUseWith: client) ?? + guard let connection = + try session?.getConnection(forUseWith: client) ?? client.connectionPool.tryCheckOut() else { return .resubmit } @@ -93,7 +81,7 @@ internal class OperationExecutor { case let .success(res): return self.makeSucceededFuture(res) case .resubmit: - return self.execute(operation, using: connection, client: client, session: session) + return self.execute(operation, client: client, session: session) } } diff --git a/Sources/MongoSwiftSync/MongoClient.swift b/Sources/MongoSwiftSync/MongoClient.swift index 69aa64387..e8ef4c234 100644 --- a/Sources/MongoSwiftSync/MongoClient.swift +++ b/Sources/MongoSwiftSync/MongoClient.swift @@ -53,7 +53,7 @@ public class MongoClient { deinit { do { - try self.asyncClient.shutdown().wait() + try self.asyncClient.syncClose() } catch { assertionFailure("Error closing async client: \(error)") } diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 41513ddbd..60d6066bf 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -160,6 +160,7 @@ extension MongoClientTests { ("testListDatabases", testListDatabases), ("testClientIdGeneration", testClientIdGeneration), ("testResubmittingToThreadPool", testResubmittingToThreadPool), + ("testConnectionPoolClose", testConnectionPoolClose), ] } diff --git a/Tests/MongoSwiftTests/AsyncTestUtils.swift b/Tests/MongoSwiftTests/AsyncTestUtils.swift index 6a6a5f61a..f64eeef9d 100644 --- a/Tests/MongoSwiftTests/AsyncTestUtils.swift +++ b/Tests/MongoSwiftTests/AsyncTestUtils.swift @@ -5,7 +5,7 @@ import TestsCommon import XCTest extension MongoClient { - fileprivate static func makeTestClient( + internal static func makeTestClient( _ uri: String = MongoSwiftTestCase.getConnectionString(), eventLoopGroup: EventLoopGroup, options: ClientOptions? = nil @@ -21,7 +21,7 @@ extension MongoClient { internal func syncCloseOrFail() { do { - try self.shutdown().wait() + try self.syncClose() } catch { XCTFail("Error closing test client: \(error)") } diff --git a/Tests/MongoSwiftTests/MongoClientTests.swift b/Tests/MongoSwiftTests/MongoClientTests.swift index 0dc2ac1d5..14e655c56 100644 --- a/Tests/MongoSwiftTests/MongoClientTests.swift +++ b/Tests/MongoSwiftTests/MongoClientTests.swift @@ -8,8 +8,8 @@ final class MongoClientTests: MongoSwiftTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { elg.syncShutdownOrFail() } let client = try MongoClient(using: elg) - client.syncShutdown() - expect(try client.listDatabases().wait()).to(throwError(MongoClient.ClosedClientError)) + try client.syncClose() + expect(try client.listDatabases().wait()).to(throwError(errorType: ChannelError.self)) } func verifyPoolSize(_ client: MongoClient, size: Int) throws { @@ -97,4 +97,55 @@ final class MongoClientTests: MongoSwiftTestCase { _ = try waitingOperations.map { try $0.wait() } } } + + func testConnectionPoolClose() throws { + let ns = MongoNamespace(db: "connPoolTest", collection: "foo") + + // clean up this test's namespace after we're done + defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } + + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { elg.syncShutdownOrFail() } + let client = try MongoClient.makeTestClient(eventLoopGroup: elg) + + // create a cursor + let collection = client.db(ns.db).collection(ns.collection!) + _ = try collection.insertMany([["x": 1], ["x": 2]]).wait() + let cursor = try collection.find().wait() + + // create a session + let session = client.startSession() + // run a command to trigger starting libmongoc session + _ = try client.listDatabases(session: session).wait() + + // start the client's closing process + let closeFuture = client.close() + + // the pool should enter the closing state, with 2 connections out + expect(client.connectionPool.isClosing).toEventually(beTrue()) + expect(client.connectionPool.checkedOutConnections).to(equal(2)) + + // calling a method that will request a new connection errors + expect(try client.listDatabases().wait()).to(throwError(errorType: LogicError.self)) + + // cursor can still be used and successfully killed while closing occurs + expect(try cursor.next().wait()).toNot(throwError()) + try cursor.kill().wait() + + // still in closing state; got connection back from cursor + expect(client.connectionPool.isClosing).to(beTrue()) + expect(client.connectionPool.checkedOutConnections).to(equal(1)) + + // attempting to use session succeeds + expect(try client.listDatabases(session: session).wait()).toNot(throwError()) + // ending session succeeds + expect(try session.end().wait()).toNot(throwError()) + + // once session releases connection, the pool can close + expect(client.connectionPool.isClosing).toEventually(beFalse()) + expect(client.connectionPool.checkedOutConnections).to(equal(0)) + + // wait to ensure all resource cleanup happens correctly + try closeFuture.wait() + } }