Skip to content

Commit

Permalink
SWIFT-668, SWIFT-669: Make MongoSwift.MongoClient async and implement…
Browse files Browse the repository at this point in the history
… MongoSwiftSync.MongoClient (#377)
  • Loading branch information
kmahar committed Dec 20, 2019
1 parent bf1ef6f commit 0d21107
Show file tree
Hide file tree
Showing 41 changed files with 429 additions and 420 deletions.
15 changes: 8 additions & 7 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,11 @@ buildvariants:
then:
remove_tasks: ".3.6"

- matrix_name: "atlas-connect"
matrix_spec:
os-fully-featured: ubuntu-18.04
swift-version: "*"
display_name: "Atlas Connectivity ${swift-version} ${os-fully-featured}"
tasks:
- ".atlas-connect"
# TODO SWIFT-703: reenable
# - matrix_name: "atlas-connect"
# matrix_spec:
# os-fully-featured: ubuntu-18.04
# swift-version: "*"
# display_name: "Atlas Connectivity ${swift-version} ${os-fully-featured}"
# tasks:
# - ".atlas-connect"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ playground.xcworkspace
# Packages/
# Package.pins
.build/
.swiftpm/

# CocoaPods
#
Expand Down
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ jobs:
- stage: pre-tests
name: lint
os: osx
osx_image: xcode11.1
osx_image: xcode11.3
install: ./Tests/Scripts/install_dependencies.sh swiftlint && ./Tests/Scripts/install_dependencies.sh swiftformat
before_script: skip
script: ${PWD}/swiftlint/swiftlint --strict && ${PWD}/SwiftFormat/CommandLineTool/swiftformat --verbose --lint .

- stage: tests
os: osx
osx_image: xcode10.2
osx_image: xcode11.3
env: SWIFT_VERSION=5.0

- stage: tests
Expand All @@ -41,7 +41,7 @@ jobs:
- stage: post-tests
name: code coverage
os: osx
osx_image: xcode10.2
osx_image: xcode11.3
script: make coverage
after_success: bash <(curl -s https://codecov.io/bash)

Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ project:
ruby Tests/Scripts/add_json_files.rb

linuxmain:
$(SOURCERY) --sources Tests/ --templates Tests/LinuxMain.stencil --output Tests/LinuxMain.swift
$(SOURCERY) --sources Tests/ --exclude-sources Tests/DisabledTests/ --templates Tests/LinuxMain.stencil --output Tests/LinuxMain.swift

exports:
$(SOURCERY) --sources Sources/MongoSwift/ --templates Sources/MongoSwiftSync/Exports.stencil --output Sources/MongoSwiftSync/Exports.swift
Expand All @@ -45,9 +45,10 @@ lint:
swiftlint autocorrect
swiftlint

# MacOS only
coverage:
make project
xcodebuild -project MongoSwift.xcodeproj -scheme MongoSwift-Package -enableCodeCoverage YES build test
swift test --enable-code-coverage
xcrun llvm-cov export -format="lcov" .build/debug/MongoSwiftPackageTests.xctest/Contents/MacOS/MongoSwiftPackageTests -instr-profile .build/debug/codecov/default.profdata > info.lcov

clean:
rm -rf Packages
Expand Down
9 changes: 9 additions & 0 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@
"revision": "e503b50210dae2cf0220474c2f12087830150e9f",
"version": "2.0.0"
}
},
{
"package": "swift-nio",
"repositoryURL": "https://github.com/apple/swift-nio",
"state": {
"branch": null,
"revision": "3f04a5c056fa52e01b9ef43240988de792461e81",
"version": "2.11.1"
}
}
]
},
Expand Down
9 changes: 5 additions & 4 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/mongodb/swift-bson", .upToNextMajor(from: "2.0.0")),
.package(url: "https://github.com/mongodb/swift-mongoc", .upToNextMajor(from: "2.0.0")),
.package(url: "https://github.com/Quick/Nimble.git", .exact("8.0.2"))
.package(url: "https://github.com/Quick/Nimble.git", .exact("8.0.2")),
.package(url: "https://github.com/apple/swift-nio", .upToNextMajor(from: "2.0.0"))
],
targets: [
.target(name: "MongoSwift", dependencies: ["mongoc", "bson"]),
.target(name: "MongoSwift", dependencies: ["mongoc", "bson", "NIO"]),
.target(name: "MongoSwiftSync", dependencies: ["MongoSwift"]),
.target(name: "AtlasConnectivity", dependencies: ["MongoSwift"]),
.target(name: "AtlasConnectivity", dependencies: ["MongoSwiftSync"]),
.target(name: "TestsCommon", dependencies: ["MongoSwift", "Nimble", "mongoc"]),
.testTarget(name: "BSONTests", dependencies: ["MongoSwift", "TestsCommon", "Nimble", "mongoc"]),
.testTarget(name: "MongoSwiftTests", dependencies: ["MongoSwift", "TestsCommon", "Nimble", "mongoc"]),
.testTarget(name: "MongoSwiftTests", dependencies: ["MongoSwift", "TestsCommon", "Nimble", "NIO", "mongoc"]),
.testTarget(name: "MongoSwiftSyncTests", dependencies: ["MongoSwiftSync", "TestsCommon", "Nimble", "mongoc"])
]
)
2 changes: 1 addition & 1 deletion Sources/AtlasConnectivity/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Foundation
import MongoSwift
import MongoSwiftSync

private let configs = ["ATLAS_REPL", "ATLAS_SHRD", "ATLAS_FREE", "ATLAS_TLS11", "ATLAS_TLS12"]

Expand Down
12 changes: 8 additions & 4 deletions Sources/MongoSwift/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ internal class ConnectionPool {
}
}

/// Closes the pool if it has not been manually closed already.
deinit {
self.close()
guard case .closed = self.state else {
assertionFailure("ConnectionPool was not closed")
return
}
}

/// Closes the pool, cleaning up underlying resources.
/// Closes the pool, cleaning up underlying resources. This method blocks as it sends `endSessions` to the server.
internal func close() {
switch self.state {
case let .open(pool):
Expand All @@ -68,6 +70,7 @@ internal class ConnectionPool {
}

/// Checks out a connection. This connection will return itself to the pool when its reference count drops to 0.
/// This method will block until a connection is available.
internal func checkOut() throws -> Connection {
switch self.state {
case let .open(pool):
Expand All @@ -77,7 +80,8 @@ internal class ConnectionPool {
}
}

/// Executes the given closure using a connection from the pool.
/// Executes the given closure using a connection from the pool. This method will block until a connection is
/// available.
internal func withConnection<T>(body: (Connection) throws -> T) throws -> T {
let connection = try self.checkOut()
return try body(connection)
Expand Down
115 changes: 86 additions & 29 deletions Sources/MongoSwift/MongoClient.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Foundation
import mongoc
import NIO

/// Options to use when creating a `MongoClient`.
public struct ClientOptions: CodingStrategyProvider, Decodable {
Expand Down Expand Up @@ -53,6 +53,15 @@ public struct ClientOptions: CodingStrategyProvider, Decodable {
*/
public var serverMonitoring: Bool = false

/**
* `MongoSwift.MongoClient` provides an asynchronous API by running all blocking operations off of their
* originating threads in a thread pool. `MongoSwiftSync.MongoClient` is implemented as a wrapper of the async
* client which waits for each corresponding asynchronous operation to complete and then returns the result.
* This option specifies the size of the thread pool used by the asynchronous client, and determines the max
* number of concurrent operations that may be performed using a single client.
*/
public var threadPoolSize: Int? = MongoClient.defaultThreadPoolSize

/// Specifies the TLS/SSL options to use for database connections.
public var tlsOptions: TLSOptions? = nil

Expand Down Expand Up @@ -80,6 +89,7 @@ public struct ClientOptions: CodingStrategyProvider, Decodable {
retryReads: Bool? = nil,
retryWrites: Bool? = nil,
serverMonitoring: Bool = false,
threadPoolSize: Int = MongoClient.defaultThreadPoolSize,
tlsOptions: TLSOptions? = nil,
uuidCodingStrategy: UUIDCodingStrategy? = nil,
writeConcern: WriteConcern? = nil
Expand All @@ -93,6 +103,7 @@ public struct ClientOptions: CodingStrategyProvider, Decodable {
self.retryWrites = retryWrites
self.retryReads = retryReads
self.serverMonitoring = serverMonitoring
self.threadPoolSize = threadPoolSize
self.tlsOptions = tlsOptions
self.uuidCodingStrategy = uuidCodingStrategy
self.writeConcern = writeConcern
Expand Down Expand Up @@ -174,11 +185,18 @@ public struct TLSOptions {
}

// sourcery: skipSyncExport
/// A MongoDB Client.
/// A MongoDB Client providing an asynchronous, SwiftNIO-based API.
public class MongoClient {
internal let connectionPool: ConnectionPool

private let operationExecutor: OperationExecutor = DefaultOperationExecutor()
private let operationExecutor: OperationExecutor

// TODO: SWIFT-705 document size justification.
/// Default size for a client's NIOThreadPool.
public static let defaultThreadPoolSize = 5

/// Indicates whether this client has been closed.
internal private(set) var isClosed = false

/// If command and/or server monitoring is enabled, stores the NotificationCenter events are posted to.
internal let notificationCenter: NotificationCenter
Expand All @@ -189,6 +207,9 @@ public class MongoClient {
/// Counter for generating client _ids.
internal static let clientIdGenerator = Counter(label: "MongoClient ID generator")

/// Error thrown when user attempts to use a closed client.
internal static let ClosedClientError = LogicError(message: "MongoClient was already closed")

/// Encoder whose options are inherited by databases derived from this client.
public let encoder: BSONEncoder

Expand All @@ -205,12 +226,15 @@ public class MongoClient {
public let writeConcern: WriteConcern?

/**
* Create a new client connection to a MongoDB server. For options that included in both the connection string URI
* Create a new client for a MongoDB deployment. For options that included in both the connection string URI
* and the ClientOptions struct, the final value is set in descending order of priority: the value specified in
* ClientOptions (if non-nil), the value specified in the URI, or the default value if both are unset.
*
* - Parameters:
* - connectionString: the connection string to connect to.
* - eventLoopGroup: A SwiftNIO `EventLoopGroup` which the client will use for executing operations. It is the
* user's responsibility to ensure the group remains active for as long as the client does, and
* to ensure the group is properly shut down when it is no longer in use.
* - options: optional `ClientOptions` to use for this client
*
* - SeeAlso: https://docs.mongodb.com/manual/reference/connection-string/
Expand All @@ -220,12 +244,20 @@ public class MongoClient {
* - A `InvalidArgumentError` if the connection string specifies the use of TLS but libmongoc was not
* built with TLS support.
*/
public init(_ connectionString: String = "mongodb://localhost:27017", options: ClientOptions? = nil) throws {
public init(
_ connectionString: String = "mongodb://localhost:27017",
using eventLoopGroup: EventLoopGroup,
options: ClientOptions? = nil
) throws {
// Initialize mongoc. Repeated calls have no effect so this is safe to do every time.
initializeMongoc()

let connString = try ConnectionString(connectionString, options: options)
self.connectionPool = try ConnectionPool(from: connString, options: options?.tlsOptions)
self.operationExecutor = DefaultOperationExecutor(
eventLoopGroup: eventLoopGroup,
threadPoolSize: options?.threadPoolSize ?? MongoClient.defaultThreadPoolSize
)

let rc = connString.readConcern
if !rc.isDefault {
Expand Down Expand Up @@ -253,6 +285,23 @@ public class MongoClient {
)
}

deinit {
assert(self.isClosed, "MongoClient was not closed before deinitialization")
}

/// Closes this `MongoClient`. Call this method exactly once when you are finished using the client. You must
/// ensure that all operations using the client have completed before calling this. The returned future must be
/// fulfilled before the client goes out of scope.
public func close() -> EventLoopFuture<Void> {
return self.operationExecutor.execute {
self.connectionPool.close()
self.isClosed = true
}
.flatMap {
self.operationExecutor.close()
}
}

/**
* Starts a new `ClientSession` with the provided options.
*
Expand Down Expand Up @@ -287,7 +336,7 @@ public class MongoClient {
* on the "name", "sizeOnDisk", "empty", or "shards" fields of the output.
* - session: Optional `ClientSession` to use when executing this command.
*
* - Returns: A `[DatabaseSpecification]` containing the databases matching provided criteria.
* - Returns: An `EventLoopFuture<[DatabaseSpecification]>` containing the databases matching provided criteria.
*
* - Throws:
* - `LogicError` if the provided session is inactive.
Expand All @@ -298,16 +347,14 @@ public class MongoClient {
public func listDatabases(
_ filter: Document? = nil,
session: ClientSession? = nil
) throws -> [DatabaseSpecification] {
let operation = ListDatabasesOperation(
client: self,
filter: filter,
nameOnly: nil
)
guard case let .specs(result) = try self.executeOperation(operation, session: session) else {
throw InternalError(message: "Invalid result")
) -> EventLoopFuture<[DatabaseSpecification]> {
let operation = ListDatabasesOperation(client: self, filter: filter, nameOnly: nil)
return self.executeOperationAsync(operation, session: session).flatMapThrowing { result in
guard case let .specs(dbs) = result else {
throw InternalError(message: "Invalid result")
}
return dbs
}
return result
}

/**
Expand All @@ -317,16 +364,16 @@ public class MongoClient {
* - filter: Optional `Document` specifying a filter on the names of the returned databases.
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns: An Array of `MongoDatabase`s that match the provided filter.
* - Returns: An `EventLoopFuture<[MongoDatabase]>` containing databases that match the provided filter.
*
* - Throws:
* - `LogicError` if the provided session is inactive.
*/
public func listMongoDatabases(
_ filter: Document? = nil,
session: ClientSession? = nil
) throws -> [MongoDatabase] {
return try self.listDatabaseNames(filter, session: session).map { self.db($0) }
) -> EventLoopFuture<[MongoDatabase]> {
return self.listDatabaseNames(filter, session: session).map { $0.map { self.db($0) } }
}

/**
Expand All @@ -336,21 +383,22 @@ public class MongoClient {
* - filter: Optional `Document` specifying a filter on the names of the returned databases.
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns: A `[String]` containing names of databases that match the provided filter.
* - Returns: An `EventLoopFuture<[String]>` containing names of databases that match the provided filter.
*
* - Throws:
* - `LogicError` if the provided session is inactive.
*/
public func listDatabaseNames(_ filter: Document? = nil, session: ClientSession? = nil) throws -> [String] {
let operation = ListDatabasesOperation(
client: self,
filter: filter,
nameOnly: true
)
guard case let .names(result) = try self.executeOperation(operation, session: session) else {
throw InternalError(message: "Invalid result")
public func listDatabaseNames(
_ filter: Document? = nil,
session: ClientSession? = nil
) -> EventLoopFuture<[String]> {
let operation = ListDatabasesOperation(client: self, filter: filter, nameOnly: true)
return self.executeOperationAsync(operation, session: session).flatMapThrowing { result in
guard case let .names(names) = result else {
throw InternalError(message: "Invalid result")
}
return names
}
return result
}

/**
Expand Down Expand Up @@ -491,7 +539,16 @@ public class MongoClient {
using connection: Connection? = nil,
session: ClientSession? = nil
) throws -> T.OperationResult {
return try self.operationExecutor.execute(operation, using: connection, client: self, session: session)
return try self.operationExecutor.execute(operation, using: connection, client: self, session: session).wait()
}

/// Executes an `Operation` asynchronously using this `MongoClient` and an optionally provided session.
internal func executeOperationAsync<T: Operation>(
_ operation: T,
using connection: Connection? = nil,
session: ClientSession? = nil
) -> EventLoopFuture<T.OperationResult> {
return self.operationExecutor.execute(operation, using: connection, client: self, session: session)
}
}

Expand Down
Loading

0 comments on commit 0d21107

Please sign in to comment.