Skip to content
Merged
76 changes: 75 additions & 1 deletion Sources/MongoSwift/ClientSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,80 @@ public final class ClientSession {
/// started the libmongoc session.
internal var id: Document?

/// The server ID of the mongos this session is pinned to. A server ID of 0 indicates that the session is unpinned.
internal var serverId: UInt32? {
switch self.state {
case .notStarted, .ended:
return nil
case let .started(session, _):
return mongoc_client_session_get_server_id(session)
}
}

/// Enum tracking the state of the transaction associated with this session.
internal enum TransactionState: String, Decodable {
/// There is no transaction in progress.
case none
/// A transaction has been started, but no operation has been sent to the server.
case starting
/// A transaction is in progress.
case inProgress
/// The transaction was committed.
case committed
/// The transaction was aborted.
case aborted

fileprivate var mongocTransactionState: mongoc_transaction_state_t {
switch self {
case .none:
return MONGOC_TRANSACTION_NONE
case .starting:
return MONGOC_TRANSACTION_STARTING
case .inProgress:
return MONGOC_TRANSACTION_IN_PROGRESS
case .committed:
return MONGOC_TRANSACTION_COMMITTED
case .aborted:
return MONGOC_TRANSACTION_ABORTED
}
}

fileprivate init(mongocTransactionState: mongoc_transaction_state_t) {
switch mongocTransactionState {
case MONGOC_TRANSACTION_NONE:
self = .none
case MONGOC_TRANSACTION_STARTING:
self = .starting
case MONGOC_TRANSACTION_IN_PROGRESS:
self = .inProgress
case MONGOC_TRANSACTION_COMMITTED:
self = .committed
case MONGOC_TRANSACTION_ABORTED:
self = .aborted
default:
fatalError("Unexpected transaction state: \(mongocTransactionState)")
}
}
}

/// The transaction state of this session.
internal var transactionState: TransactionState? {
switch self.state {
case .notStarted, .ended:
return nil
case let .started(session, _):
return TransactionState(mongocTransactionState: mongoc_client_session_get_transaction_state(session))
}
}

/// Indicates whether or not the session is in a transaction.
internal var inTransaction: Bool {
if let transactionState = self.transactionState {
return transactionState != .none
}
return false
}

/// The most recent cluster time seen by this session. This value will be nil if either of the following are true:
/// - No operations have been executed using this session and `advanceClusterTime` has not been called.
/// - This session has been ended.
Expand Down Expand Up @@ -243,7 +317,7 @@ public final class ClientSession {
* - SeeAlso:
* - https://docs.mongodb.com/manual/core/transactions/
*/
public func startTransaction(_ options: TransactionOptions?) -> EventLoopFuture<Void> {
public func startTransaction(options: TransactionOptions? = nil) -> EventLoopFuture<Void> {
switch self.state {
case .notStarted, .started:
let operation = StartTransactionOperation(options: options)
Expand Down
12 changes: 5 additions & 7 deletions Sources/MongoSwift/MongoClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ import NIO
import NIOConcurrencyHelpers

/// Options to use when creating a `MongoClient`.
public struct ClientOptions: CodingStrategyProvider, Decodable {
// swiftlint:disable redundant_optional_initialization

public struct ClientOptions: CodingStrategyProvider {
/// Specifies the `DataCodingStrategy` to use for BSON encoding/decoding operations performed by this client and any
/// databases or collections that derive from it.
public var dataCodingStrategy: DataCodingStrategy? = nil
public var dataCodingStrategy: DataCodingStrategy?

/// Specifies the `DateCodingStrategy` to use for BSON encoding/decoding operations performed by this client and any
/// databases or collections that derive from it.
public var dateCodingStrategy: DateCodingStrategy? = nil
public var dateCodingStrategy: DateCodingStrategy?

/// The maximum number of connections that may be associated with a connection pool created by this client at a
/// given time. This includes in-use and available connections. Defaults to 100.
Expand All @@ -22,7 +20,7 @@ public struct ClientOptions: CodingStrategyProvider, Decodable {
public var readConcern: ReadConcern?

/// Specifies a ReadPreference to use for the client.
public var readPreference: ReadPreference? = nil
public var readPreference: ReadPreference?

/// Determines whether the client should retry supported read operations (on by default).
public var retryReads: Bool?
Expand Down Expand Up @@ -65,7 +63,7 @@ public struct ClientOptions: CodingStrategyProvider, Decodable {

/// Specifies the `UUIDCodingStrategy` to use for BSON encoding/decoding operations performed by this client and any
/// databases or collections that derive from it.
public var uuidCodingStrategy: UUIDCodingStrategy? = nil
public var uuidCodingStrategy: UUIDCodingStrategy?

// swiftlint:enable redundant_optional_initialization

Expand Down
25 changes: 23 additions & 2 deletions Sources/MongoSwift/MongoCollection+BulkWrite.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ internal struct BulkWriteOperation<T: Codable>: Operation {
let opts = try encodeOptions(options: options, session: session)
var insertedIds: [Int: BSON] = [:]

if session?.inTransaction == true && self.options?.writeConcern != nil {
throw InvalidArgumentError(
message: "Cannot specify a write concern on an individual helper in a " +
"transaction. Instead specify it when starting the transaction."
)
}

let (serverId, isAcknowledged): (UInt32, Bool) =
try self.collection.withMongocCollection(from: connection) { collPtr in
guard let bulk = mongoc_collection_create_bulk_operation_with_opts(collPtr, opts?._bson) else {
Expand All @@ -214,8 +221,22 @@ internal struct BulkWriteOperation<T: Codable>: Operation {
mongoc_bulk_operation_execute(bulk, replyPtr, &error)
}

let writeConcern = WriteConcern(from: mongoc_bulk_operation_get_write_concern(bulk))
return (serverId, writeConcern.isAcknowledged)
var writeConcernAcknowledged: Bool
if session?.inTransaction == true {
// Bulk write operations in transactions must get their write concern from the session, not from
// the `BulkWriteOptions` passed to the `bulkWrite` helper. `libmongoc` surfaces this
// implementation detail by nulling out the write concern stored on the bulk write. To sidestep
// this, we can only call `mongoc_bulk_operation_get_write_concern` out of a transaction.
//
// In a transaction, default to writeConcernAcknowledged = true. This is acceptable because
// transactions do not support unacknowledged writes.
writeConcernAcknowledged = true
} else {
let writeConcern = WriteConcern(from: mongoc_bulk_operation_get_write_concern(bulk))
writeConcernAcknowledged = writeConcern.isAcknowledged
}

return (serverId, writeConcernAcknowledged)
}

let result = try BulkWriteResult(reply: reply, insertedIds: insertedIds)
Expand Down
12 changes: 4 additions & 8 deletions Sources/MongoSwift/MongoCollection+Read.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,24 @@ extension MongoCollection {
}

/**
* Gets an estimate of the count of documents in this collection using collection metadata.
* Gets an estimate of the count of documents in this collection using collection metadata. This operation cannot
* be used in a transaction.
*
* - Parameters:
* - options: Optional `EstimatedDocumentCountOptions` to use when executing the command
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns:
* An `EventLoopFuture<Int>`. On success, contains an estimate of the count of documents in this collection.
*
* If the future fails, the error is likely one of the following:
* - `CommandError` if an error occurs that prevents the command from executing.
* - `InvalidArgumentError` if the options passed in form an invalid combination.
* - `LogicError` if the provided session is inactive.
* - `LogicError` if this collection's parent client has already been closed.
* - `EncodingError` if an error occurs while encoding the options to BSON.
*/
public func estimatedDocumentCount(
options: EstimatedDocumentCountOptions? = nil,
session: ClientSession? = nil
) -> EventLoopFuture<Int> {
public func estimatedDocumentCount(options: EstimatedDocumentCountOptions? = nil) -> EventLoopFuture<Int> {
let operation = EstimatedDocumentCountOperation(collection: self, options: options)
return self._client.operationExecutor.execute(operation, client: self._client, session: session)
return self._client.operationExecutor.execute(operation, client: self._client, session: nil)
}

/**
Expand Down
37 changes: 33 additions & 4 deletions Sources/MongoSwift/MongoError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public struct InternalError: RuntimeError {
/// An error thrown when encountering a connection or socket related error.
/// May contain labels providing additional information on the nature of the error.
public struct ConnectionError: RuntimeError, LabeledError {
internal let message: String
public let message: String

public let errorLabels: [String]?

Expand Down Expand Up @@ -173,11 +173,36 @@ public struct WriteConcernFailure: Codable {
/// A description of the error.
public let message: String

/// Labels that may describe the context in which this error was thrown.
public let errorLabels: [String]?

private enum CodingKeys: String, CodingKey {
case code
case codeName
case details = "errInfo"
case message = "errmsg"
case errorLabels
}

// TODO: can remove this once SERVER-36755 is resolved
public init(from decoder: Decoder) throws {
let container = try decoder.container(keyedBy: CodingKeys.self)
self.code = try container.decode(ServerErrorCode.self, forKey: .code)
self.message = try container.decode(String.self, forKey: .message)
self.codeName = try container.decodeIfPresent(String.self, forKey: .codeName) ?? ""
self.details = try container.decodeIfPresent(Document.self, forKey: .details)
self.errorLabels = try container.decodeIfPresent([String].self, forKey: .errorLabels)
}

// TODO: can remove this once SERVER-36755 is resolved
internal init(
code: ServerErrorCode, codeName: String, details: Document?, message: String, errorLabels: [String]? = nil
) {
self.code = code
self.codeName = codeName
self.message = message
self.details = details
self.errorLabels = errorLabels
}
}

Expand Down Expand Up @@ -279,6 +304,7 @@ internal func extractMongoError(error bsonError: bson_error_t, reply: Document?
// if the reply is nil or writeErrors or writeConcernErrors aren't present, then this is likely a commandError.
guard let serverReply: Document = reply,
!(serverReply["writeErrors"]?.arrayValue ?? []).isEmpty ||
!(serverReply["writeConcernError"]?.documentValue?.keys ?? []).isEmpty ||
!(serverReply["writeConcernErrors"]?.arrayValue ?? []).isEmpty else {
return parseMongocError(bsonError, reply: reply)
}
Expand Down Expand Up @@ -390,11 +416,14 @@ internal func extractBulkWriteError<T: Codable>(

/// Extracts a `WriteConcernError` from a server reply.
private func extractWriteConcernError(from reply: Document) throws -> WriteConcernFailure? {
guard let writeConcernErrors = reply["writeConcernErrors"]?.arrayValue?.compactMap({ $0.documentValue }),
!writeConcernErrors.isEmpty else {
if let writeConcernErrors = reply["writeConcernErrors"]?.arrayValue?.compactMap({ $0.documentValue }),
!writeConcernErrors.isEmpty {
return try BSONDecoder().decode(WriteConcernFailure.self, from: writeConcernErrors[0])
} else if let writeConcernError = reply["writeConcernError"]?.documentValue {
return try BSONDecoder().decode(WriteConcernFailure.self, from: writeConcernError)
} else {
return nil
}
return try BSONDecoder().decode(WriteConcernFailure.self, from: writeConcernErrors[0])
}

/// Internal function used by write methods performing single writes that are implemented via the bulk API. If the
Expand Down
2 changes: 0 additions & 2 deletions Sources/MongoSwift/MongoSwiftVersion.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Generated using Sourcery 0.16.1 — https://github.com/krzysztofzablocki/Sourcery
// DO NOT EDIT


// swiftlint:disable:previous vertical_whitespace
internal let MongoSwiftVersionString = "1.0.0-rc0"
2 changes: 1 addition & 1 deletion Sources/MongoSwiftSync/ClientSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public final class ClientSession {
* - https://docs.mongodb.com/manual/core/transactions/
*/
public func startTransaction(options: TransactionOptions? = nil) throws {
try self.asyncSession.startTransaction(options).wait()
try self.asyncSession.startTransaction(options: options).wait()
}

/**
Expand Down
11 changes: 4 additions & 7 deletions Sources/MongoSwiftSync/MongoCollection+Read.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,16 @@ extension MongoCollection {
}

/**
* Gets an estimate of the count of documents in this collection using collection metadata.
* Gets an estimate of the count of documents in this collection using collection metadata. This operation cannot
* be used in a transaction.
*
* - Parameters:
* - options: Optional `EstimatedDocumentCountOptions` to use when executing the command
* - session: Optional `ClientSession` to use when executing this command
*
* - Returns: an estimate of the count of documents in this collection
*/
public func estimatedDocumentCount(
options: EstimatedDocumentCountOptions? = nil,
session: ClientSession? = nil
) throws -> Int {
try self.asyncColl.estimatedDocumentCount(options: options, session: session?.asyncSession).wait()
public func estimatedDocumentCount(options: EstimatedDocumentCountOptions? = nil) throws -> Int {
try self.asyncColl.estimatedDocumentCount(options: options).wait()
}

/**
Expand Down
7 changes: 7 additions & 0 deletions Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ extension SyncMongoClientTests {
]
}

extension TransactionsTests {
static var allTests = [
("testTransactions", testTransactions),
]
}

extension WriteConcernTests {
static var allTests = [
("testWriteConcernType", testWriteConcernType),
Expand Down Expand Up @@ -408,5 +414,6 @@ XCTMain([
testCase(SyncChangeStreamTests.allTests),
testCase(SyncClientSessionTests.allTests),
testCase(SyncMongoClientTests.allTests),
testCase(TransactionsTests.allTests),
testCase(WriteConcernTests.allTests),
])
Loading