From 8434e9123de0b6636bc2c3faba409e7eb93e820c Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 24 Apr 2020 14:18:38 -0400 Subject: [PATCH 01/11] close API updates --- Sources/MongoSwift/APM.swift | 12 +- Sources/MongoSwift/ConnectionPool.swift | 153 ++++++++++++++---- Sources/MongoSwift/MongoClient.swift | 91 ++++++++--- Sources/MongoSwift/Operations/Operation.swift | 15 +- Sources/MongoSwiftSync/MongoClient.swift | 2 +- Tests/LinuxMain.swift | 1 + Tests/MongoSwiftTests/AsyncTestUtils.swift | 4 +- Tests/MongoSwiftTests/MongoClientTests.swift | 51 +++++- 8 files changed, 251 insertions(+), 78 deletions(-) diff --git a/Sources/MongoSwift/APM.swift b/Sources/MongoSwift/APM.swift index 55cf41bd2..304156046 100644 --- a/Sources/MongoSwift/APM.swift +++ b/Sources/MongoSwift/APM.swift @@ -701,7 +701,8 @@ private func publishEvent(type: T.Type, eventPtr: OpaquePoin /// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring. extension ConnectionPool { - /// Internal function to install monitoring callbacks for this pool. + /// Internal function to install monitoring callbacks for this pool. **This method may only be called before any + /// connections are checked out from the pool.** internal func initializeMonitoring(client: MongoClient) { guard let callbacks = mongoc_apm_callbacks_new() else { fatalError("failed to initialize new mongoc_apm_callbacks_t") @@ -722,13 +723,6 @@ extension ConnectionPool { mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded) mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed) - // we can pass the MongoClient as unretained because the callbacks are stored on clientHandle, so if the - // callback is being executed, this pool and therefore its parent `MongoClient` must still be valid. - switch self.state { - case let .open(pool): - mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) - case .closed: - fatalError("ConnectionPool was already closed") - } + self.setAPMCallbacks(callbacks: callbacks, client: client) } } diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 961b880cc..8bcbb5e1e 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -1,4 +1,5 @@ import CLibMongoC +import NIOConcurrencyHelpers /// A connection to the database. internal class Connection { @@ -13,11 +14,10 @@ internal class Connection { } deinit { - switch self.pool.state { - case let .open(pool): - mongoc_client_pool_push(pool, self.clientHandle) - case .closed: - assertionFailure("ConnectionPool was already closed") + do { + try self.pool.checkIn(self) + } catch { + assertionFailure("Failed to check connection back in: \(error)") } } } @@ -28,12 +28,38 @@ internal class ConnectionPool { internal enum State { /// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`. case open(pool: OpaquePointer) + /// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but + /// no new connections can be checked out. + case closing(pool: OpaquePointer) /// Indicates that the `ConnectionPool` has been closed and contains no connections. case closed } /// The state of this `ConnectionPool`. - internal private(set) var state: State + private var state: State + /// The number of connections currently checked out of the pool. + private var connCount = 0 + /// Lock over `state` and `connCount`. + private let stateLock = Lock() + + /// Internal helper for testing purposes that returns whether the pool is in the `closing` state. + internal var isClosing: Bool { + self.stateLock.withLock { + guard case .closing = self.state else { + return false + } + return true + } + } + + /// Internal helper for testing purposes that returns the number of connections currently checked out from the pool. + internal var checkedOutConnections: Int { + self.stateLock.withLock { + self.connCount + } + } + + private static let PoolClosedError = LogicError(message: "ConnectionPool was already closed") /// Initializes the pool using the provided `ConnectionString` and options. internal init(from connString: ConnectionString, options: ClientOptions?) throws { @@ -56,7 +82,7 @@ internal class ConnectionPool { self.state = .open(pool: pool) if let options = options { - try self.setTLSOptions(options) + self.setTLSOptions(options) } } @@ -67,38 +93,76 @@ internal class ConnectionPool { } } - /// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server. - internal func shutdown() { - switch self.state { - case let .open(pool): - mongoc_client_pool_destroy(pool) - case .closed: - return + /// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to + /// the pool.** + internal func close() throws { + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + self.state = .closing(pool: pool) + case .closing, .closed: + throw Self.PoolClosedError + } + } + + // continually loop and wait to get all connections back before destroying the pool. release the lock on each + // iteration to allow other methods to acquire the lock. + var done = false + while !done { + try self.stateLock.withLock { + if self.connCount == 0 { + switch self.state { + case .open, .closed: + throw InternalError(message: "ConnectionPool in unexpected state during close()") + case let .closing(pool): + mongoc_client_pool_destroy(pool) + self.state = .closed + } + done = true + } + } } - self.state = .closed } /// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0. /// This method will block until a connection is available. internal func checkOut() throws -> Connection { - switch self.state { - case let .open(pool): - return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + self.connCount += 1 + return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self) + case .closing, .closed: + throw Self.PoolClosedError + } } } /// Checks out a connection from the pool, or returns `nil` if none are currently available. internal func tryCheckOut() throws -> Connection? { - switch self.state { - case let .open(pool): - guard let handle = mongoc_client_pool_try_pop(pool) else { - return nil + try self.stateLock.withLock { + switch self.state { + case let .open(pool): + guard let handle = mongoc_client_pool_try_pop(pool) else { + return nil + } + self.connCount += 1 + return Connection(clientHandle: handle, pool: self) + case .closing, .closed: + throw Self.PoolClosedError + } + } + } + + fileprivate func checkIn(_ connection: Connection) throws { + try self.stateLock.withLock { + switch self.state { + case let .open(pool), let .closing(pool): + mongoc_client_pool_push(pool, connection.clientHandle) + self.connCount -= 1 + case .closed: + throw Self.PoolClosedError } - return Connection(clientHandle: handle, pool: self) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") } } @@ -109,9 +173,9 @@ internal class ConnectionPool { return try body(connection) } - // Sets TLS/SSL options that the user passes in through the client level. This must be called from - // the ConnectionPool init before the pool is used. - private func setTLSOptions(_ options: ClientOptions) throws { + // Sets TLS/SSL options that the user passes in through the client level. **This must only be called from + // the ConnectionPool initializer**. + private func setTLSOptions(_ options: ClientOptions) { // return early so we don't set an empty options struct on the libmongoc pool. doing so will make libmongoc // attempt to use TLS for connections. guard options.tls == true || @@ -147,11 +211,15 @@ internal class ConnectionPool { if let invalidHosts = options.tlsAllowInvalidHostnames { opts.allow_invalid_hostname = invalidHosts } - switch self.state { - case let .open(pool): - mongoc_client_pool_set_ssl_opts(pool, &opts) - case .closed: - throw InternalError(message: "ConnectionPool was already closed") + + self.stateLock.withLock { + switch self.state { + case let .open(pool): + mongoc_client_pool_set_ssl_opts(pool, &opts) + case .closing, .closed: + // if we get here, we must have called this method outside of `ConnectionPool.init`. + fatalError("ConnectionPool unexpectedly in .closing or .closed state") + } } } @@ -187,6 +255,21 @@ internal class ConnectionPool { return ConnectionString(copying: uri) } } + + /// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method + /// may only be called before any connections are checked out of the pool.** + internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) { + self.stateLock.withLock { + switch self.state { + case let .open(pool): + mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) + case .closing, .closed: + // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`. + // unless we have a bug it's impossible that the pool is already closed. + fatalError("ConnectionPool unexpectedly in .closed state") + } + } + } } extension String { diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index cce6ab3cc..4a85e7fe2 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -157,8 +157,10 @@ public struct DatabaseOptions: CodingStrategyProvider { // sourcery: skipSyncExport /// A MongoDB Client providing an asynchronous, SwiftNIO-based API. public class MongoClient { + /// The pool of connections backing this client. internal let connectionPool: ConnectionPool + /// Executor responsible for executing operations on behalf of this client and its child objects. internal let operationExecutor: OperationExecutor /// Default size for a client's NIOThreadPool. @@ -167,8 +169,27 @@ public class MongoClient { /// Default maximum size for connection pools created by this client. internal static let defaultMaxConnectionPoolSize = 100 + /// Indicates the state of this `MongoClient`. + private enum State { + /// The client is open. + case open + /// The client is in the process of closing. + case closing + /// The client has finished closing. + case closed + } + /// Indicates whether this client has been closed. - internal private(set) var isClosed = false + private var state = State.open + + /// Lock over `state`. + private var stateLock = Lock() + + internal var isOpen: Bool { + self.stateLock.withLock { + self.state == .open + } + } /// Handlers for command monitoring events. internal var commandEventHandlers: [CommandEventHandler] @@ -255,21 +276,46 @@ public class MongoClient { } deinit { - assert(self.isClosed, "MongoClient was not closed before deinitialization") + assert( + self.state == .closed, + "MongoClient was not closed before deinitialization. " + + "Please call `close()` or `syncClose()` when the client is no longer needed." + ) } - /// Shuts this `MongoClient` down, closing all connection to the server and cleaning up internal state. + /// Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. /// Call this method exactly once when you are finished using the client. You must ensure that all operations /// using the client have completed before calling this. The returned future must be fulfilled before the /// `EventLoopGroup` provided to this client's constructor is shut down. - public func shutdown() -> EventLoopFuture { - self.operationExecutor.execute { - self.connectionPool.shutdown() - self.isClosed = true + public func close() -> EventLoopFuture { + let stateError: Error? = self.stateLock.withLock { + switch self.state { + case .closing, .closed: + return Self.ClosedClientError + case .open: + self.state = .closing + return nil + } + } + + if let stateError = stateError { + return self.operationExecutor.makeFailedFuture(stateError) + } + + let closeResult = self.operationExecutor.execute { + try self.connectionPool.close() } .flatMap { - self.operationExecutor.close() + self.operationExecutor.shutdown() } + + closeResult.whenComplete { _ in + self.stateLock.withLock { + self.state = .closed + } + } + + return closeResult } /** @@ -281,11 +327,21 @@ public class MongoClient { * * This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down. */ - public func syncShutdown() { - self.connectionPool.shutdown() - self.isClosed = true - // TODO: SWIFT-349 log any errors encountered here. - try? self.operationExecutor.syncClose() + public func syncClose() throws { + try self.stateLock.withLock { + switch self.state { + case .closing, .closed: + throw Self.ClosedClientError + case .open: + self.state = .closing + } + } + try self.connectionPool.close() + try self.operationExecutor.syncShutdown() + + self.stateLock.withLock { + self.state = .closed + } } /// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call @@ -602,15 +658,6 @@ public class MongoClient { self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc)) } - /// Executes an `Operation` using this `MongoClient` and an optionally provided session. - internal func executeOperation( - _ operation: T, - using connection: Connection? = nil, - session: ClientSession? = nil - ) throws -> T.OperationResult { - try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait() - } - /// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block /// and is for testing purposes only**. internal func getMongocReadConcern() throws -> ReadConcern? { diff --git a/Sources/MongoSwift/Operations/Operation.swift b/Sources/MongoSwift/Operations/Operation.swift index c10722e27..22ae37c8b 100644 --- a/Sources/MongoSwift/Operations/Operation.swift +++ b/Sources/MongoSwift/Operations/Operation.swift @@ -33,7 +33,7 @@ internal class OperationExecutor { } /// Closes the executor's underlying thread pool. - internal func close() -> EventLoopFuture { + internal func shutdown() -> EventLoopFuture { let promise = self.eventLoopGroup.next().makePromise(of: Void.self) self.threadPool.shutdownGracefully { error in if let error = error { @@ -46,18 +46,17 @@ internal class OperationExecutor { } /// Closes the executor's underlying thread pool synchronously. - internal func syncClose() throws { + internal func syncShutdown() throws { try self.threadPool.syncShutdownGracefully() } internal func execute( _ operation: T, - using connection: Connection? = nil, client: MongoClient, session: ClientSession? ) -> EventLoopFuture { // early exit and don't attempt to use the thread pool if we've already closed the client. - guard !client.isClosed else { + guard client.isOpen else { return self.makeFailedFuture(MongoClient.ClosedClientError) } @@ -65,7 +64,7 @@ internal class OperationExecutor { let doOperation = { () -> ExecuteResult in // it's possible that the client was closed in between submitting this task and it being executed, so we // check again here. - guard !client.isClosed else { + guard client.isOpen else { throw MongoClient.ClosedClientError } @@ -73,8 +72,8 @@ internal class OperationExecutor { // 1. connection specifically provided for use with this operation // 2. if a session was provided, use its underlying connection // 3. a new connection from the pool, if available - guard let connection = try connection ?? - session?.getConnection(forUseWith: client) ?? + guard let connection = + try session?.getConnection(forUseWith: client) ?? client.connectionPool.tryCheckOut() else { return .resubmit } @@ -93,7 +92,7 @@ internal class OperationExecutor { case let .success(res): return self.makeSucceededFuture(res) case .resubmit: - return self.execute(operation, using: connection, client: client, session: session) + return self.execute(operation, client: client, session: session) } } diff --git a/Sources/MongoSwiftSync/MongoClient.swift b/Sources/MongoSwiftSync/MongoClient.swift index 69aa64387..e8ef4c234 100644 --- a/Sources/MongoSwiftSync/MongoClient.swift +++ b/Sources/MongoSwiftSync/MongoClient.swift @@ -53,7 +53,7 @@ public class MongoClient { deinit { do { - try self.asyncClient.shutdown().wait() + try self.asyncClient.syncClose() } catch { assertionFailure("Error closing async client: \(error)") } diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 41513ddbd..60d6066bf 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -160,6 +160,7 @@ extension MongoClientTests { ("testListDatabases", testListDatabases), ("testClientIdGeneration", testClientIdGeneration), ("testResubmittingToThreadPool", testResubmittingToThreadPool), + ("testConnectionPoolClose", testConnectionPoolClose), ] } diff --git a/Tests/MongoSwiftTests/AsyncTestUtils.swift b/Tests/MongoSwiftTests/AsyncTestUtils.swift index 6a6a5f61a..f64eeef9d 100644 --- a/Tests/MongoSwiftTests/AsyncTestUtils.swift +++ b/Tests/MongoSwiftTests/AsyncTestUtils.swift @@ -5,7 +5,7 @@ import TestsCommon import XCTest extension MongoClient { - fileprivate static func makeTestClient( + internal static func makeTestClient( _ uri: String = MongoSwiftTestCase.getConnectionString(), eventLoopGroup: EventLoopGroup, options: ClientOptions? = nil @@ -21,7 +21,7 @@ extension MongoClient { internal func syncCloseOrFail() { do { - try self.shutdown().wait() + try self.syncClose() } catch { XCTFail("Error closing test client: \(error)") } diff --git a/Tests/MongoSwiftTests/MongoClientTests.swift b/Tests/MongoSwiftTests/MongoClientTests.swift index 0dc2ac1d5..a871d9149 100644 --- a/Tests/MongoSwiftTests/MongoClientTests.swift +++ b/Tests/MongoSwiftTests/MongoClientTests.swift @@ -8,7 +8,7 @@ final class MongoClientTests: MongoSwiftTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { elg.syncShutdownOrFail() } let client = try MongoClient(using: elg) - client.syncShutdown() + try client.syncClose() expect(try client.listDatabases().wait()).to(throwError(MongoClient.ClosedClientError)) } @@ -97,4 +97,53 @@ final class MongoClientTests: MongoSwiftTestCase { _ = try waitingOperations.map { try $0.wait() } } } + + func testConnectionPoolClose() throws { + let ns = MongoNamespace(db: "connPoolTest", collection: "foo") + + // clean up this test's namespace after we're done + defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } + + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { elg.syncShutdownOrFail() } + let client = try MongoClient.makeTestClient(eventLoopGroup: elg) + + // create a cursor + let collection = client.db(ns.db).collection(ns.collection!) + _ = try collection.insertMany([["x": 1], ["x": 2]]).wait() + let cursor = try collection.find().wait() + + // create a session + let session = client.startSession() + // run a command to trigger starting libmongoc session + _ = try client.listDatabases(session: session).wait() + + // start the client's closing process + let closeFuture = client.close() + + // the pool should enter the closing state, with 2 connections out + expect(client.connectionPool.isClosing).toEventually(beTrue()) + expect(client.connectionPool.checkedOutConnections).to(equal(2)) + + // cursor can still be used and successfully killed while closing occurs + expect(try cursor.next().wait()).toNot(throwError()) + try cursor.kill().wait() + + // still in closing state; got connection back from cursor + expect(client.connectionPool.isClosing).to(beTrue()) + expect(client.connectionPool.checkedOutConnections).to(equal(1)) + + // attempting to use session errors + expect(try client.listDatabases(session: session).wait()) + .to(throwError(MongoClient.ClosedClientError)) + // ending session succeeds + expect(try session.end().wait()).toNot(throwError()) + + // once session releases connection, the pool can close + expect(client.connectionPool.isClosing).toEventually(beFalse()) + expect(client.connectionPool.checkedOutConnections).to(equal(0)) + + // wait to ensure all resource cleanup happens correctly + try closeFuture.wait() + } } From 5d377f9faeff4c45b4256963f9a668e49c3d0ff8 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 01:01:50 -0400 Subject: [PATCH 02/11] error message consistency --- Sources/MongoSwift/ConnectionPool.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 8bcbb5e1e..86694e0ec 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -113,7 +113,7 @@ internal class ConnectionPool { if self.connCount == 0 { switch self.state { case .open, .closed: - throw InternalError(message: "ConnectionPool in unexpected state during close()") + throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()") case let .closing(pool): mongoc_client_pool_destroy(pool) self.state = .closed @@ -218,7 +218,7 @@ internal class ConnectionPool { mongoc_client_pool_set_ssl_opts(pool, &opts) case .closing, .closed: // if we get here, we must have called this method outside of `ConnectionPool.init`. - fatalError("ConnectionPool unexpectedly in .closing or .closed state") + fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options") } } } @@ -266,7 +266,7 @@ internal class ConnectionPool { case .closing, .closed: // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`. // unless we have a bug it's impossible that the pool is already closed. - fatalError("ConnectionPool unexpectedly in .closed state") + fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks") } } } From 818c5adb62ec1e4f5f74d3f26e3b1a07217776e6 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 01:04:50 -0400 Subject: [PATCH 03/11] comment --- Sources/MongoSwift/ConnectionPool.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 86694e0ec..87e24d9d7 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -257,7 +257,9 @@ internal class ConnectionPool { } /// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method - /// may only be called before any connections are checked out of the pool.** + /// may only be called before any connections are checked out of the pool.** Ideally this code would just live in + /// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it + /// as the context. In order for it to be fully initialized its pool must exist already. internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) { self.stateLock.withLock { switch self.state { From fc6f97acf17e1d10e856947a7c2ad6c09255a89c Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 01:09:43 -0400 Subject: [PATCH 04/11] Update MongoClient.swift --- Sources/MongoSwift/MongoClient.swift | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index 4a85e7fe2..466a7d1a1 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -283,10 +283,18 @@ public class MongoClient { ) } - /// Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. - /// Call this method exactly once when you are finished using the client. You must ensure that all operations - /// using the client have completed before calling this. The returned future must be fulfilled before the - /// `EventLoopGroup` provided to this client's constructor is shut down. + /** + * Closes this `MongoClient`, closing all connections to the server and cleaning up internal state. + * + * Call this method exactly once when you are finished using the client. You must ensure that all operations using + * the client have completed before calling this. + * + * The returned future will not be fulfilled until all cursors and change streams created from this client have been + * been killed, and all sessions created from this client have been ended. + * + * The returned future must be fulfilled before the `EventLoopGroup` provided to this client's constructor is shut + * down. + */ public func close() -> EventLoopFuture { let stateError: Error? = self.stateLock.withLock { switch self.state { @@ -323,7 +331,8 @@ public class MongoClient { * internal state. * * Call this method exactly once when you are finished using the client. You must ensure that all operations - * using the client have completed before calling this. + * using the client have completed before calling this. This method will block until all cursors and change streams + * created from this client have been killed, and all sessions created from this client have been ended. * * This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down. */ From 9aeeaef643c2d06a2a984dc970d0e73a3859ff50 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 01:22:39 -0400 Subject: [PATCH 05/11] Update MongoClientTests.swift --- Tests/MongoSwiftTests/MongoClientTests.swift | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Tests/MongoSwiftTests/MongoClientTests.swift b/Tests/MongoSwiftTests/MongoClientTests.swift index a871d9149..6f65360bb 100644 --- a/Tests/MongoSwiftTests/MongoClientTests.swift +++ b/Tests/MongoSwiftTests/MongoClientTests.swift @@ -125,6 +125,9 @@ final class MongoClientTests: MongoSwiftTestCase { expect(client.connectionPool.isClosing).toEventually(beTrue()) expect(client.connectionPool.checkedOutConnections).to(equal(2)) + // calling a method that will request a new connection errors + expect(try client.listDatabases().wait()).to(throwError(MongoClient.ClosedClientError)) + // cursor can still be used and successfully killed while closing occurs expect(try cursor.next().wait()).toNot(throwError()) try cursor.kill().wait() From f4c80eb93a58f91b5538c2ecbd806311715f88ec Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 02:05:07 -0400 Subject: [PATCH 06/11] simplify --- Sources/MongoSwift/ConnectionPool.swift | 2 +- Sources/MongoSwift/MongoClient.swift | 58 ++----------------- Sources/MongoSwift/Operations/Operation.swift | 11 ---- Tests/MongoSwiftTests/MongoClientTests.swift | 11 ++-- 4 files changed, 10 insertions(+), 72 deletions(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 87e24d9d7..471abbfe8 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -59,7 +59,7 @@ internal class ConnectionPool { } } - private static let PoolClosedError = LogicError(message: "ConnectionPool was already closed") + internal static let PoolClosedError = LogicError(message: "ConnectionPool was already closed") /// Initializes the pool using the provided `ConnectionString` and options. internal init(from connString: ConnectionString, options: ClientOptions?) throws { diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index 466a7d1a1..8c8b51576 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -169,27 +169,8 @@ public class MongoClient { /// Default maximum size for connection pools created by this client. internal static let defaultMaxConnectionPoolSize = 100 - /// Indicates the state of this `MongoClient`. - private enum State { - /// The client is open. - case open - /// The client is in the process of closing. - case closing - /// The client has finished closing. - case closed - } - /// Indicates whether this client has been closed. - private var state = State.open - - /// Lock over `state`. - private var stateLock = Lock() - - internal var isOpen: Bool { - self.stateLock.withLock { - self.state == .open - } - } + private var wasClosed = false /// Handlers for command monitoring events. internal var commandEventHandlers: [CommandEventHandler] @@ -203,9 +184,6 @@ public class MongoClient { /// A unique identifier for this client. Sets _id to the generator's current value and increments the generator. internal let _id = clientIdGenerator.add(1) - /// Error thrown when user attempts to use a closed client. - internal static let ClosedClientError = LogicError(message: "MongoClient was already closed") - /// Encoder whose options are inherited by databases derived from this client. public let encoder: BSONEncoder @@ -277,7 +255,7 @@ public class MongoClient { deinit { assert( - self.state == .closed, + self.wasClosed, "MongoClient was not closed before deinitialization. " + "Please call `close()` or `syncClose()` when the client is no longer needed." ) @@ -296,31 +274,14 @@ public class MongoClient { * down. */ public func close() -> EventLoopFuture { - let stateError: Error? = self.stateLock.withLock { - switch self.state { - case .closing, .closed: - return Self.ClosedClientError - case .open: - self.state = .closing - return nil - } - } - - if let stateError = stateError { - return self.operationExecutor.makeFailedFuture(stateError) - } - let closeResult = self.operationExecutor.execute { try self.connectionPool.close() } .flatMap { self.operationExecutor.shutdown() } - closeResult.whenComplete { _ in - self.stateLock.withLock { - self.state = .closed - } + self.wasClosed = true } return closeResult @@ -337,20 +298,9 @@ public class MongoClient { * This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down. */ public func syncClose() throws { - try self.stateLock.withLock { - switch self.state { - case .closing, .closed: - throw Self.ClosedClientError - case .open: - self.state = .closing - } - } try self.connectionPool.close() try self.operationExecutor.syncShutdown() - - self.stateLock.withLock { - self.state = .closed - } + self.wasClosed = true } /// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call diff --git a/Sources/MongoSwift/Operations/Operation.swift b/Sources/MongoSwift/Operations/Operation.swift index 22ae37c8b..b89dcfcc2 100644 --- a/Sources/MongoSwift/Operations/Operation.swift +++ b/Sources/MongoSwift/Operations/Operation.swift @@ -55,19 +55,8 @@ internal class OperationExecutor { client: MongoClient, session: ClientSession? ) -> EventLoopFuture { - // early exit and don't attempt to use the thread pool if we've already closed the client. - guard client.isOpen else { - return self.makeFailedFuture(MongoClient.ClosedClientError) - } - // closure containing the work to run in the thread pool: obtain a connection and execute the operation. let doOperation = { () -> ExecuteResult in - // it's possible that the client was closed in between submitting this task and it being executed, so we - // check again here. - guard client.isOpen else { - throw MongoClient.ClosedClientError - } - // select a connection in following order of priority: // 1. connection specifically provided for use with this operation // 2. if a session was provided, use its underlying connection diff --git a/Tests/MongoSwiftTests/MongoClientTests.swift b/Tests/MongoSwiftTests/MongoClientTests.swift index 6f65360bb..25f4d58c0 100644 --- a/Tests/MongoSwiftTests/MongoClientTests.swift +++ b/Tests/MongoSwiftTests/MongoClientTests.swift @@ -9,7 +9,7 @@ final class MongoClientTests: MongoSwiftTestCase { defer { elg.syncShutdownOrFail() } let client = try MongoClient(using: elg) try client.syncClose() - expect(try client.listDatabases().wait()).to(throwError(MongoClient.ClosedClientError)) + expect(try client.listDatabases().wait()).to(throwError(errorType: ChannelError.self)) } func verifyPoolSize(_ client: MongoClient, size: Int) throws { @@ -102,7 +102,7 @@ final class MongoClientTests: MongoSwiftTestCase { let ns = MongoNamespace(db: "connPoolTest", collection: "foo") // clean up this test's namespace after we're done - defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } + // defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { elg.syncShutdownOrFail() } @@ -126,7 +126,7 @@ final class MongoClientTests: MongoSwiftTestCase { expect(client.connectionPool.checkedOutConnections).to(equal(2)) // calling a method that will request a new connection errors - expect(try client.listDatabases().wait()).to(throwError(MongoClient.ClosedClientError)) + expect(try client.listDatabases().wait()).to(throwError(errorType: LogicError.self)) // cursor can still be used and successfully killed while closing occurs expect(try cursor.next().wait()).toNot(throwError()) @@ -136,9 +136,8 @@ final class MongoClientTests: MongoSwiftTestCase { expect(client.connectionPool.isClosing).to(beTrue()) expect(client.connectionPool.checkedOutConnections).to(equal(1)) - // attempting to use session errors - expect(try client.listDatabases(session: session).wait()) - .to(throwError(MongoClient.ClosedClientError)) + // attempting to use session succeeds + expect(try client.listDatabases(session: session).wait()).toNot(throwError()) // ending session succeeds expect(try session.end().wait()).toNot(throwError()) From 792e5d4cb499a0fd1dab65ed6fc0482a150e156b Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 02:14:14 -0400 Subject: [PATCH 07/11] comment --- Sources/MongoSwift/MongoClient.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index 8c8b51576..c52e000a3 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -169,7 +169,9 @@ public class MongoClient { /// Default maximum size for connection pools created by this client. internal static let defaultMaxConnectionPoolSize = 100 - /// Indicates whether this client has been closed. + /// Indicates whether this client has been closed. We don't need a lock because: + /// - This value is only modified on success of `ConnectionPool.close()`. That method will succeed exactly once. + /// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete. private var wasClosed = false /// Handlers for command monitoring events. From 12e51b20bb6fd6ea8ecddde9c1d0c13ca0d24295 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 02:16:47 -0400 Subject: [PATCH 08/11] Update MongoClientTests.swift --- Tests/MongoSwiftTests/MongoClientTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/MongoSwiftTests/MongoClientTests.swift b/Tests/MongoSwiftTests/MongoClientTests.swift index 25f4d58c0..14e655c56 100644 --- a/Tests/MongoSwiftTests/MongoClientTests.swift +++ b/Tests/MongoSwiftTests/MongoClientTests.swift @@ -102,7 +102,7 @@ final class MongoClientTests: MongoSwiftTestCase { let ns = MongoNamespace(db: "connPoolTest", collection: "foo") // clean up this test's namespace after we're done - // defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } + defer { try? self.withTestNamespace(ns: ns) { _, _, _ in } } let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { elg.syncShutdownOrFail() } From 0131e710a3e263d7c91773a466d313dfc2484213 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 28 Apr 2020 13:00:28 -0400 Subject: [PATCH 09/11] update comments --- Sources/MongoSwift/ConnectionPool.swift | 8 ++++++-- Sources/MongoSwift/MongoClient.swift | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 471abbfe8..467a9de65 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -125,7 +125,8 @@ internal class ConnectionPool { } /// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0. - /// This method will block until a connection is available. + /// This method will block until a connection is available. Throws an error if the pool is in the process of + /// closing or has finished closing. internal func checkOut() throws -> Connection { try self.stateLock.withLock { switch self.state { @@ -138,7 +139,8 @@ internal class ConnectionPool { } } - /// Checks out a connection from the pool, or returns `nil` if none are currently available. + /// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the + /// pool is not open. internal func tryCheckOut() throws -> Connection? { try self.stateLock.withLock { switch self.state { @@ -154,6 +156,8 @@ internal class ConnectionPool { } } + /// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of + /// closing; throws an error if the pool has already finished closing. fileprivate func checkIn(_ connection: Connection) throws { try self.stateLock.withLock { switch self.state { diff --git a/Sources/MongoSwift/MongoClient.swift b/Sources/MongoSwift/MongoClient.swift index c52e000a3..d1563c289 100644 --- a/Sources/MongoSwift/MongoClient.swift +++ b/Sources/MongoSwift/MongoClient.swift @@ -169,7 +169,7 @@ public class MongoClient { /// Default maximum size for connection pools created by this client. internal static let defaultMaxConnectionPoolSize = 100 - /// Indicates whether this client has been closed. We don't need a lock because: + /// Indicates whether this client has been closed. A lock around this variable is not needed because: /// - This value is only modified on success of `ConnectionPool.close()`. That method will succeed exactly once. /// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete. private var wasClosed = false From a15434150f3cfde005af2d1208aada643c71b37f Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Wed, 29 Apr 2020 19:13:26 -0400 Subject: [PATCH 10/11] comments 1 --- Sources/MongoSwift/ConnectionPool.swift | 34 ++++++++++++------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 467a9de65..05dfc0ad0 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -156,7 +156,7 @@ internal class ConnectionPool { } } - /// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of + /// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of /// closing; throws an error if the pool has already finished closing. fileprivate func checkIn(_ connection: Connection) throws { try self.stateLock.withLock { @@ -216,14 +216,13 @@ internal class ConnectionPool { opts.allow_invalid_hostname = invalidHosts } - self.stateLock.withLock { - switch self.state { - case let .open(pool): - mongoc_client_pool_set_ssl_opts(pool, &opts) - case .closing, .closed: - // if we get here, we must have called this method outside of `ConnectionPool.init`. - fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options") - } + // lock isn't needed as this is called before pool is in use. + switch self.state { + case let .open(pool): + mongoc_client_pool_set_ssl_opts(pool, &opts) + case .closing, .closed: + // if we get here, we must have called this method outside of `ConnectionPool.init`. + fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options") } } @@ -265,15 +264,14 @@ internal class ConnectionPool { /// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it /// as the context. In order for it to be fully initialized its pool must exist already. internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) { - self.stateLock.withLock { - switch self.state { - case let .open(pool): - mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) - case .closing, .closed: - // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`. - // unless we have a bug it's impossible that the pool is already closed. - fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks") - } + // lock isn't needed as this is called before pool is in use. + switch self.state { + case let .open(pool): + mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque()) + case .closing, .closed: + // this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`. + // unless we have a bug it's impossible that the pool is already closed. + fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks") } } } From 76221374c4298beb12aedde965b7cd0dca049ad7 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Wed, 29 Apr 2020 20:14:15 -0400 Subject: [PATCH 11/11] rest of comments --- Sources/MongoSwift/ConnectionPool.swift | 46 +++++++++++++++---------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/Sources/MongoSwift/ConnectionPool.swift b/Sources/MongoSwift/ConnectionPool.swift index 05dfc0ad0..3ccf05b89 100644 --- a/Sources/MongoSwift/ConnectionPool.swift +++ b/Sources/MongoSwift/ConnectionPool.swift @@ -1,4 +1,5 @@ import CLibMongoC +import Foundation import NIOConcurrencyHelpers /// A connection to the database. @@ -22,6 +23,14 @@ internal class Connection { } } +extension NSCondition { + fileprivate func withLock(_ body: () throws -> T) rethrows -> T { + self.lock() + defer { self.unlock() } + return try body() + } +} + /// A pool of one or more connections. internal class ConnectionPool { /// Represents the state of a `ConnectionPool`. @@ -40,7 +49,7 @@ internal class ConnectionPool { /// The number of connections currently checked out of the pool. private var connCount = 0 /// Lock over `state` and `connCount`. - private let stateLock = Lock() + private let stateLock = NSCondition() /// Internal helper for testing purposes that returns whether the pool is in the `closing` state. internal var isClosing: Bool { @@ -103,23 +112,18 @@ internal class ConnectionPool { case .closing, .closed: throw Self.PoolClosedError } - } - // continually loop and wait to get all connections back before destroying the pool. release the lock on each - // iteration to allow other methods to acquire the lock. - var done = false - while !done { - try self.stateLock.withLock { - if self.connCount == 0 { - switch self.state { - case .open, .closed: - throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()") - case let .closing(pool): - mongoc_client_pool_destroy(pool) - self.state = .closed - } - done = true - } + while self.connCount > 0 { + // wait for signal from checkIn(). + self.stateLock.wait() + } + + switch self.state { + case .open, .closed: + throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()") + case let .closing(pool): + mongoc_client_pool_destroy(pool) + self.state = .closed } } } @@ -140,7 +144,8 @@ internal class ConnectionPool { } /// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the - /// pool is not open. + /// pool is not open. This method may block waiting on the state lock as well as libmongoc's locks and thus must be + // run within the thread pool. internal func tryCheckOut() throws -> Connection? { try self.stateLock.withLock { switch self.state { @@ -157,13 +162,16 @@ internal class ConnectionPool { } /// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of - /// closing; throws an error if the pool has already finished closing. + /// closing; throws an error if the pool has already finished closing. This method may block waiting on the state + /// lock as well as libmongoc's locks, and thus must be run within the thread pool. fileprivate func checkIn(_ connection: Connection) throws { try self.stateLock.withLock { switch self.state { case let .open(pool), let .closing(pool): mongoc_client_pool_push(pool, connection.clientHandle) self.connCount -= 1 + // signal to close() that we are updating the count. + self.stateLock.signal() case .closed: throw Self.PoolClosedError }