Skip to content

Commit 8434e91

Browse files
committed
close API updates
1 parent 973acb2 commit 8434e91

File tree

8 files changed

+251
-78
lines changed

8 files changed

+251
-78
lines changed

Sources/MongoSwift/APM.swift

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,8 @@ private func publishEvent<T: MongoSwiftEvent>(type: T.Type, eventPtr: OpaquePoin
701701

702702
/// An extension of `ConnectionPool` to add monitoring capability for commands and server discovery and monitoring.
703703
extension ConnectionPool {
704-
/// Internal function to install monitoring callbacks for this pool.
704+
/// Internal function to install monitoring callbacks for this pool. **This method may only be called before any
705+
/// connections are checked out from the pool.**
705706
internal func initializeMonitoring(client: MongoClient) {
706707
guard let callbacks = mongoc_apm_callbacks_new() else {
707708
fatalError("failed to initialize new mongoc_apm_callbacks_t")
@@ -722,13 +723,6 @@ extension ConnectionPool {
722723
mongoc_apm_set_server_heartbeat_succeeded_cb(callbacks, serverHeartbeatSucceeded)
723724
mongoc_apm_set_server_heartbeat_failed_cb(callbacks, serverHeartbeatFailed)
724725

725-
// we can pass the MongoClient as unretained because the callbacks are stored on clientHandle, so if the
726-
// callback is being executed, this pool and therefore its parent `MongoClient` must still be valid.
727-
switch self.state {
728-
case let .open(pool):
729-
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
730-
case .closed:
731-
fatalError("ConnectionPool was already closed")
732-
}
726+
self.setAPMCallbacks(callbacks: callbacks, client: client)
733727
}
734728
}

Sources/MongoSwift/ConnectionPool.swift

Lines changed: 118 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import CLibMongoC
2+
import NIOConcurrencyHelpers
23

34
/// A connection to the database.
45
internal class Connection {
@@ -13,11 +14,10 @@ internal class Connection {
1314
}
1415

1516
deinit {
16-
switch self.pool.state {
17-
case let .open(pool):
18-
mongoc_client_pool_push(pool, self.clientHandle)
19-
case .closed:
20-
assertionFailure("ConnectionPool was already closed")
17+
do {
18+
try self.pool.checkIn(self)
19+
} catch {
20+
assertionFailure("Failed to check connection back in: \(error)")
2121
}
2222
}
2323
}
@@ -28,12 +28,38 @@ internal class ConnectionPool {
2828
internal enum State {
2929
/// Indicates that the `ConnectionPool` is open and using the associated pointer to a `mongoc_client_pool_t`.
3030
case open(pool: OpaquePointer)
31+
/// Indicates that the `ConnectionPool` is in the process of closing. Connections can be checked back in, but
32+
/// no new connections can be checked out.
33+
case closing(pool: OpaquePointer)
3134
/// Indicates that the `ConnectionPool` has been closed and contains no connections.
3235
case closed
3336
}
3437

3538
/// The state of this `ConnectionPool`.
36-
internal private(set) var state: State
39+
private var state: State
40+
/// The number of connections currently checked out of the pool.
41+
private var connCount = 0
42+
/// Lock over `state` and `connCount`.
43+
private let stateLock = Lock()
44+
45+
/// Internal helper for testing purposes that returns whether the pool is in the `closing` state.
46+
internal var isClosing: Bool {
47+
self.stateLock.withLock {
48+
guard case .closing = self.state else {
49+
return false
50+
}
51+
return true
52+
}
53+
}
54+
55+
/// Internal helper for testing purposes that returns the number of connections currently checked out from the pool.
56+
internal var checkedOutConnections: Int {
57+
self.stateLock.withLock {
58+
self.connCount
59+
}
60+
}
61+
62+
private static let PoolClosedError = LogicError(message: "ConnectionPool was already closed")
3763

3864
/// Initializes the pool using the provided `ConnectionString` and options.
3965
internal init(from connString: ConnectionString, options: ClientOptions?) throws {
@@ -56,7 +82,7 @@ internal class ConnectionPool {
5682

5783
self.state = .open(pool: pool)
5884
if let options = options {
59-
try self.setTLSOptions(options)
85+
self.setTLSOptions(options)
6086
}
6187
}
6288

@@ -67,38 +93,76 @@ internal class ConnectionPool {
6793
}
6894
}
6995

70-
/// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server.
71-
internal func shutdown() {
72-
switch self.state {
73-
case let .open(pool):
74-
mongoc_client_pool_destroy(pool)
75-
case .closed:
76-
return
96+
/// Closes the pool, cleaning up underlying resources. **This method blocks until all connections are returned to
97+
/// the pool.**
98+
internal func close() throws {
99+
try self.stateLock.withLock {
100+
switch self.state {
101+
case let .open(pool):
102+
self.state = .closing(pool: pool)
103+
case .closing, .closed:
104+
throw Self.PoolClosedError
105+
}
106+
}
107+
108+
// continually loop and wait to get all connections back before destroying the pool. release the lock on each
109+
// iteration to allow other methods to acquire the lock.
110+
var done = false
111+
while !done {
112+
try self.stateLock.withLock {
113+
if self.connCount == 0 {
114+
switch self.state {
115+
case .open, .closed:
116+
throw InternalError(message: "ConnectionPool in unexpected state during close()")
117+
case let .closing(pool):
118+
mongoc_client_pool_destroy(pool)
119+
self.state = .closed
120+
}
121+
done = true
122+
}
123+
}
77124
}
78-
self.state = .closed
79125
}
80126

81127
/// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0.
82128
/// This method will block until a connection is available.
83129
internal func checkOut() throws -> Connection {
84-
switch self.state {
85-
case let .open(pool):
86-
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
87-
case .closed:
88-
throw InternalError(message: "ConnectionPool was already closed")
130+
try self.stateLock.withLock {
131+
switch self.state {
132+
case let .open(pool):
133+
self.connCount += 1
134+
return Connection(clientHandle: mongoc_client_pool_pop(pool), pool: self)
135+
case .closing, .closed:
136+
throw Self.PoolClosedError
137+
}
89138
}
90139
}
91140

92141
/// Checks out a connection from the pool, or returns `nil` if none are currently available.
93142
internal func tryCheckOut() throws -> Connection? {
94-
switch self.state {
95-
case let .open(pool):
96-
guard let handle = mongoc_client_pool_try_pop(pool) else {
97-
return nil
143+
try self.stateLock.withLock {
144+
switch self.state {
145+
case let .open(pool):
146+
guard let handle = mongoc_client_pool_try_pop(pool) else {
147+
return nil
148+
}
149+
self.connCount += 1
150+
return Connection(clientHandle: handle, pool: self)
151+
case .closing, .closed:
152+
throw Self.PoolClosedError
153+
}
154+
}
155+
}
156+
157+
fileprivate func checkIn(_ connection: Connection) throws {
158+
try self.stateLock.withLock {
159+
switch self.state {
160+
case let .open(pool), let .closing(pool):
161+
mongoc_client_pool_push(pool, connection.clientHandle)
162+
self.connCount -= 1
163+
case .closed:
164+
throw Self.PoolClosedError
98165
}
99-
return Connection(clientHandle: handle, pool: self)
100-
case .closed:
101-
throw InternalError(message: "ConnectionPool was already closed")
102166
}
103167
}
104168

@@ -109,9 +173,9 @@ internal class ConnectionPool {
109173
return try body(connection)
110174
}
111175

112-
// Sets TLS/SSL options that the user passes in through the client level. This must be called from
113-
// the ConnectionPool init before the pool is used.
114-
private func setTLSOptions(_ options: ClientOptions) throws {
176+
// Sets TLS/SSL options that the user passes in through the client level. **This must only be called from
177+
// the ConnectionPool initializer**.
178+
private func setTLSOptions(_ options: ClientOptions) {
115179
// return early so we don't set an empty options struct on the libmongoc pool. doing so will make libmongoc
116180
// attempt to use TLS for connections.
117181
guard options.tls == true ||
@@ -147,11 +211,15 @@ internal class ConnectionPool {
147211
if let invalidHosts = options.tlsAllowInvalidHostnames {
148212
opts.allow_invalid_hostname = invalidHosts
149213
}
150-
switch self.state {
151-
case let .open(pool):
152-
mongoc_client_pool_set_ssl_opts(pool, &opts)
153-
case .closed:
154-
throw InternalError(message: "ConnectionPool was already closed")
214+
215+
self.stateLock.withLock {
216+
switch self.state {
217+
case let .open(pool):
218+
mongoc_client_pool_set_ssl_opts(pool, &opts)
219+
case .closing, .closed:
220+
// if we get here, we must have called this method outside of `ConnectionPool.init`.
221+
fatalError("ConnectionPool unexpectedly in .closing or .closed state")
222+
}
155223
}
156224
}
157225

@@ -187,6 +255,21 @@ internal class ConnectionPool {
187255
return ConnectionString(copying: uri)
188256
}
189257
}
258+
259+
/// Sets the provided APM callbacks on this pool, using the provided client as the "context" value. **This method
260+
/// may only be called before any connections are checked out of the pool.**
261+
internal func setAPMCallbacks(callbacks: OpaquePointer, client: MongoClient) {
262+
self.stateLock.withLock {
263+
switch self.state {
264+
case let .open(pool):
265+
mongoc_client_pool_set_apm_callbacks(pool, callbacks, Unmanaged.passUnretained(client).toOpaque())
266+
case .closing, .closed:
267+
// this method is called via `initializeMonitoring()`, which is called from `MongoClient.init`.
268+
// unless we have a bug it's impossible that the pool is already closed.
269+
fatalError("ConnectionPool unexpectedly in .closed state")
270+
}
271+
}
272+
}
190273
}
191274

192275
extension String {

Sources/MongoSwift/MongoClient.swift

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,10 @@ public struct DatabaseOptions: CodingStrategyProvider {
157157
// sourcery: skipSyncExport
158158
/// A MongoDB Client providing an asynchronous, SwiftNIO-based API.
159159
public class MongoClient {
160+
/// The pool of connections backing this client.
160161
internal let connectionPool: ConnectionPool
161162

163+
/// Executor responsible for executing operations on behalf of this client and its child objects.
162164
internal let operationExecutor: OperationExecutor
163165

164166
/// Default size for a client's NIOThreadPool.
@@ -167,8 +169,27 @@ public class MongoClient {
167169
/// Default maximum size for connection pools created by this client.
168170
internal static let defaultMaxConnectionPoolSize = 100
169171

172+
/// Indicates the state of this `MongoClient`.
173+
private enum State {
174+
/// The client is open.
175+
case open
176+
/// The client is in the process of closing.
177+
case closing
178+
/// The client has finished closing.
179+
case closed
180+
}
181+
170182
/// Indicates whether this client has been closed.
171-
internal private(set) var isClosed = false
183+
private var state = State.open
184+
185+
/// Lock over `state`.
186+
private var stateLock = Lock()
187+
188+
internal var isOpen: Bool {
189+
self.stateLock.withLock {
190+
self.state == .open
191+
}
192+
}
172193

173194
/// Handlers for command monitoring events.
174195
internal var commandEventHandlers: [CommandEventHandler]
@@ -255,21 +276,46 @@ public class MongoClient {
255276
}
256277

257278
deinit {
258-
assert(self.isClosed, "MongoClient was not closed before deinitialization")
279+
assert(
280+
self.state == .closed,
281+
"MongoClient was not closed before deinitialization. " +
282+
"Please call `close()` or `syncClose()` when the client is no longer needed."
283+
)
259284
}
260285

261-
/// Shuts this `MongoClient` down, closing all connection to the server and cleaning up internal state.
286+
/// Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
262287
/// Call this method exactly once when you are finished using the client. You must ensure that all operations
263288
/// using the client have completed before calling this. The returned future must be fulfilled before the
264289
/// `EventLoopGroup` provided to this client's constructor is shut down.
265-
public func shutdown() -> EventLoopFuture<Void> {
266-
self.operationExecutor.execute {
267-
self.connectionPool.shutdown()
268-
self.isClosed = true
290+
public func close() -> EventLoopFuture<Void> {
291+
let stateError: Error? = self.stateLock.withLock {
292+
switch self.state {
293+
case .closing, .closed:
294+
return Self.ClosedClientError
295+
case .open:
296+
self.state = .closing
297+
return nil
298+
}
299+
}
300+
301+
if let stateError = stateError {
302+
return self.operationExecutor.makeFailedFuture(stateError)
303+
}
304+
305+
let closeResult = self.operationExecutor.execute {
306+
try self.connectionPool.close()
269307
}
270308
.flatMap {
271-
self.operationExecutor.close()
309+
self.operationExecutor.shutdown()
272310
}
311+
312+
closeResult.whenComplete { _ in
313+
self.stateLock.withLock {
314+
self.state = .closed
315+
}
316+
}
317+
318+
return closeResult
273319
}
274320

275321
/**
@@ -281,11 +327,21 @@ public class MongoClient {
281327
*
282328
* This method must complete before the `EventLoopGroup` provided to this client's constructor is shut down.
283329
*/
284-
public func syncShutdown() {
285-
self.connectionPool.shutdown()
286-
self.isClosed = true
287-
// TODO: SWIFT-349 log any errors encountered here.
288-
try? self.operationExecutor.syncClose()
330+
public func syncClose() throws {
331+
try self.stateLock.withLock {
332+
switch self.state {
333+
case .closing, .closed:
334+
throw Self.ClosedClientError
335+
case .open:
336+
self.state = .closing
337+
}
338+
}
339+
try self.connectionPool.close()
340+
try self.operationExecutor.syncShutdown()
341+
342+
self.stateLock.withLock {
343+
self.state = .closed
344+
}
289345
}
290346

291347
/// Starts a new `ClientSession` with the provided options. When you are done using this session, you must call
@@ -602,15 +658,6 @@ public class MongoClient {
602658
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
603659
}
604660

605-
/// Executes an `Operation` using this `MongoClient` and an optionally provided session.
606-
internal func executeOperation<T: Operation>(
607-
_ operation: T,
608-
using connection: Connection? = nil,
609-
session: ClientSession? = nil
610-
) throws -> T.OperationResult {
611-
try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait()
612-
}
613-
614661
/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
615662
/// and is for testing purposes only**.
616663
internal func getMongocReadConcern() throws -> ReadConcern? {

0 commit comments

Comments
 (0)