Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions Sources/MongoSwift/APM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ private func publishEvent<T: MongoSwiftEvent>(type: T.Type, eventPtr: OpaquePoin

/// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring.
extension ConnectionPool {
/// Internal function to install monitoring callbacks for this pool.
/// Internal function to install monitoring callbacks for this pool. **This method may only be called before any
/// connections are checked out from the pool.**
internal func initializeMonitoring(client: MongoClient) {
guard let callbacks = mongoc_apm_callbacks_new() else {
fatalError("failed to initialize new mongoc_apm_callbacks_t")
Expand All @@ -722,13 +723,6 @@ extension ConnectionPool {
mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded)
mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed)

// we can pass the MongoClient as unretained because the callbacks are stored on clientHandle, so if the
// callback is being executed, this pool and therefore its parent `MongoClient` must still be valid.
switch self.state {
case let .open(pool):
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
case .closed:
fatalError("ConnectionPool was already closed")
}
self.setAPMCallbacks(callbacks: callbacks, client: client)
}
}
163 changes: 129 additions & 34 deletions Sources/MongoSwift/ConnectionPool.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import CLibMongoC
import Foundation
import NIOConcurrencyHelpers

/// A connection to the database.
internal class Connection {
Expand All @@ -13,27 +15,60 @@ internal class Connection {
}

deinit {
switch self.pool.state {
case let .open(pool):
mongoc_client_pool_push(pool, self.clientHandle)
case .closed:
assertionFailure("ConnectionPool was already closed")
do {
try self.pool.checkIn(self)
} catch {
assertionFailure("Failed to check connection back in: \(error)")
}
}
}

extension NSCondition {
fileprivate func withLock<T>(_ body: () throws -> T) rethrows -> T {
self.lock()
defer { self.unlock() }
return try body()
}
}

/// A pool of one or more connections.
internal class ConnectionPool {
/// Represents the state of a `ConnectionPool`.
internal enum State {
/// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`.
case open(pool: OpaquePointer)
/// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but
/// no new connections can be checked out.
case closing(pool: OpaquePointer)
/// Indicates that the `ConnectionPool` has been closed and contains no connections.
case closed
}

/// The state of this `ConnectionPool`.
internal private(set) var state: State
private var state: State
/// The number of connections currently checked out of the pool.
private var connCount = 0
/// Lock over `state` and `connCount`.
private let stateLock = NSCondition()

/// Internal helper for testing purposes that returns whether the pool is in the `closing` state.
internal var isClosing: Bool {
self.stateLock.withLock {
guard case .closing = self.state else {
return false
}
return true
}
}

/// Internal helper for testing purposes that returns the number of connections currently checked out from the pool.
internal var checkedOutConnections: Int {
self.stateLock.withLock {
self.connCount
}
}

internal static let PoolClosedError = LogicError(message: "ConnectionPool was already closed")

/// Initializes the pool using the provided `ConnectionString` and options.
internal init(from connString: ConnectionString, options: ClientOptions?) throws {
Expand All @@ -56,7 +91,7 @@ internal class ConnectionPool {

self.state = .open(pool: pool)
if let options = options {
try self.setTLSOptions(options)
self.setTLSOptions(options)
}
}

Expand All @@ -67,38 +102,79 @@ internal class ConnectionPool {
}
}

/// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server.
internal func shutdown() {
switch self.state {
case let .open(pool):
mongoc_client_pool_destroy(pool)
case .closed:
return
/// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to
/// the pool.**
internal func close() throws {
try self.stateLock.withLock {
switch self.state {
case let .open(pool):
self.state = .closing(pool: pool)
case .closing, .closed:
throw Self.PoolClosedError
}

while self.connCount > 0 {
// wait for signal from checkIn().
self.stateLock.wait()
}

switch self.state {
case .open, .closed:
throw InternalError(message: "ConnectionPool in unexpected state \(self.state) during close()")
case let .closing(pool):
mongoc_client_pool_destroy(pool)
self.state = .closed
}
}
self.state = .closed
}

/// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0.
/// This method will block until a connection is available.
/// This method will block until a connection is available. Throws an error if the pool is in the process of
/// closing or has finished closing.
internal func checkOut() throws -> Connection {
switch self.state {
case let .open(pool):
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
case .closed:
throw InternalError(message: "ConnectionPool was already closed")
try self.stateLock.withLock {
switch self.state {
case let .open(pool):
self.connCount += 1
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
case .closing, .closed:
throw Self.PoolClosedError
}
}
}

/// Checks out a connection from the pool, or returns `nil` if none are currently available.
/// Checks out a connection from the pool, or returns `nil` if none are currently available. Throws an error if the
/// pool is not open. This method may block waiting on the state lock as well as libmongoc's locks and thus must be
// run within the thread pool.
internal func tryCheckOut() throws -> Connection? {
switch self.state {
case let .open(pool):
guard let handle = mongoc_client_pool_try_pop(pool) else {
return nil
try self.stateLock.withLock {
switch self.state {
case let .open(pool):
guard let handle = mongoc_client_pool_try_pop(pool) else {
return nil
}
self.connCount += 1
return Connection(clientHandle: handle, pool: self)
case .closing, .closed:
throw Self.PoolClosedError
}
}
}

/// Checks a connection into the pool. Accepts the connection if the pool is still open or in the process of
/// closing; throws an error if the pool has already finished closing. This method may block waiting on the state
/// lock as well as libmongoc's locks, and thus must be run within the thread pool.
fileprivate func checkIn(_ connection: Connection) throws {
try self.stateLock.withLock {
switch self.state {
case let .open(pool), let .closing(pool):
mongoc_client_pool_push(pool, connection.clientHandle)
self.connCount -= 1
// signal to close() that we are updating the count.
self.stateLock.signal()
case .closed:
throw Self.PoolClosedError
}
return Connection(clientHandle: handle, pool: self)
case .closed:
throw InternalError(message: "ConnectionPool was already closed")
}
}

Expand All @@ -109,9 +185,9 @@ internal class ConnectionPool {
return try body(connection)
}

// Sets TLS/SSL options that the user passes in through the client level. This must be called from
// the ConnectionPool init before the pool is used.
private func setTLSOptions(_ options: ClientOptions) throws {
// Sets TLS/SSL options that the user passes in through the client level. **This must only be called from
// the ConnectionPool initializer**.
private func setTLSOptions(_ options: ClientOptions) {
// return early so we don't set an empty options struct on the libmongoc pool. doing so will make libmongoc
// attempt to use TLS for connections.
guard options.tls == true ||
Expand Down Expand Up @@ -147,11 +223,14 @@ internal class ConnectionPool {
if let invalidHosts = options.tlsAllowInvalidHostnames {
opts.allow_invalid_hostname = invalidHosts
}

// lock isn't needed as this is called before pool is in use.
switch self.state {
case let .open(pool):
mongoc_client_pool_set_ssl_opts(pool, &opts)
case .closed:
throw InternalError(message: "ConnectionPool was already closed")
case .closing, .closed:
// if we get here, we must have called this method outside of `ConnectionPool.init`.
fatalError("ConnectionPool in unexpected state \(self.state) while setting TLS options")
}
}

Expand Down Expand Up @@ -187,6 +266,22 @@ internal class ConnectionPool {
return ConnectionString(copying: uri)
}
}

/// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method
/// may only be called before any connections are checked out of the pool.** Ideally this code would just live in
/// `ConnectionPool.init`. However, the client we accept here has to be fully initialized before we can pass it
/// as the context. In order for it to be fully initialized its pool must exist already.
internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) {
// lock isn't needed as this is called before pool is in use.
switch self.state {
case let .open(pool):
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
case .closing, .closed:
// this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`.
// unless we have a bug it's impossible that the pool is already closed.
fatalError("ConnectionPool in unexpected state \(self.state) while setting APM callbacks")
}
}
}

extension String {
Expand Down
68 changes: 38 additions & 30 deletions Sources/MongoSwift/MongoClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,10 @@ public struct DatabaseOptions: CodingStrategyProvider {
// sourcery: skipSyncExport
/// A MongoDB Client providing an asynchronous, SwiftNIO-based API.
public class MongoClient {
/// The pool of connections backing this client.
internal let connectionPool: ConnectionPool

/// Executor responsible for executing operations on behalf of this client and its child objects.
internal let operationExecutor: OperationExecutor

/// Default size for a client's NIOThreadPool.
Expand All @@ -167,8 +169,10 @@ public class MongoClient {
/// Default maximum size for connection pools created by this client.
internal static let defaultMaxConnectionPoolSize = 100

/// Indicates whether this client has been closed.
internal private(set) var isClosed = false
/// Indicates whether this client has been closed. A lock around this variable is not needed because:
/// - This value is only modified on success of `ConnectionPool.close()`. That method will succeed exactly once.
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete.
private var wasClosed = false

/// Handlers for command monitoring events.
internal var commandEventHandlers: [CommandEventHandler]
Expand All @@ -182,9 +186,6 @@ public class MongoClient {
/// A unique identifier for this client. Sets _id to the generator's current value and increments the generator.
internal let _id = clientIdGenerator.add(1)

/// Error thrown when user attempts to use a closed client.
internal static let ClosedClientError = LogicError(message: "MongoClient was already closed")

/// Encoder whose options are inherited by databases derived from this client.
public let encoder: BSONEncoder

Expand Down Expand Up @@ -255,37 +256,53 @@ public class MongoClient {
}

deinit {
assert(self.isClosed, "MongoClient was not closed before deinitialization")
assert(
self.wasClosed,
"MongoClient was not closed before deinitialization. " +
"Please call `close()` or `syncClose()` when the client is no longer needed."
)
}

/// Shuts this `MongoClient` down, closing all connection to the server and cleaning up internal state.
/// Call this method exactly once when you are finished using the client. You must ensure that all operations
/// using the client have completed before calling this. The returned future must be fulfilled before the
/// `EventLoopGroup` provided to this client's constructor is shut down.
public func shutdown() -> EventLoopFuture<Void> {
self.operationExecutor.execute {
self.connectionPool.shutdown()
self.isClosed = true
/**
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
*
* Call this method exactly once when you are finished using the client. You must ensure that all operations using
* the client have completed before calling this.
*
* The returned future will not be fulfilled until all cursors and change streams created from this client have been
* been killed, and all sessions created from this client have been ended.
*
* The returned future must be fulfilled before the `EventLoopGroup` provided to this client's constructor is shut
* down.
*/
public func close() -> EventLoopFuture<Void> {
let closeResult = self.operationExecutor.execute {
try self.connectionPool.close()
}
.flatMap {
self.operationExecutor.close()
self.operationExecutor.shutdown()
}
closeResult.whenComplete { _ in
self.wasClosed = true
}

return closeResult
}

/**
* Shuts this `MongoClient` down in a blocking fashion, closing all connections to the server and cleaning up
* internal state.
*
* Call this method exactly once when you are finished using the client. You must ensure that all operations
* using the client have completed before calling this.
* using the client have completed before calling this. This method will block until all cursors and change streams
* created from this client have been killed, and all sessions created from this client have been ended.
*
* This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down.
*/
public func syncShutdown() {
self.connectionPool.shutdown()
self.isClosed = true
// TODO: SWIFT-349 log any errors encountered here.
try? self.operationExecutor.syncClose()
public func syncClose() throws {
try self.connectionPool.close()
try self.operationExecutor.syncShutdown()
self.wasClosed = true
}

/// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call
Expand Down Expand Up @@ -602,15 +619,6 @@ public class MongoClient {
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
}

/// Executes an `Operation` using this `MongoClient` and an optionally provided session.
internal func executeOperation<T: Operation>(
_ operation: T,
using connection: Connection? = nil,
session: ClientSession? = nil
) throws -> T.OperationResult {
try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait()
}

/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
/// and is for testing purposes only**.
internal func getMongocReadConcern() throws -> ReadConcern? {
Expand Down
Loading