Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented server api versioning #281

Closed
9 changes: 6 additions & 3 deletions Package.swift
Expand Up @@ -39,10 +39,10 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),

// 📚
.package(url: "https://github.com/orlandos-nl/NioDNS.git", from: "2.0.0"),
.package(url: "https://github.com/orlandos-nl/NioDNS.git", .branch("3.0")),

// 🔑
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.0.0"),
],
targets: [
.target(
Expand Down Expand Up @@ -77,7 +77,10 @@ let package = Package(
),
.testTarget(
name: "MongoCoreTests",
dependencies: ["MongoCore"]),
dependencies: [
"MongoCore",
.product(name: "NIOSSL", package: "swift-nio-ssl"),
]),
.testTarget(
name: "MongoKittenTests",
dependencies: ["MongoKitten"]),
Expand Down
19 changes: 13 additions & 6 deletions Sources/MongoClient/Cluster.swift
Expand Up @@ -14,6 +14,8 @@ public typealias _MongoPlatformEventLoopGroup = EventLoopGroup
#endif

public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {
public let serverApi: ServerApi?

public private(set) var settings: ConnectionSettings {
didSet {
self.hosts = Set(settings.hosts)
Expand Down Expand Up @@ -88,27 +90,30 @@ public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {

private init(
settings: ConnectionSettings,
logger: Logger
logger: Logger,
api: ServerApi? = nil
) {
self.settings = settings
self.pool = []
self.hosts = Set(settings.hosts)
self.logger = logger
self.serverApi = api
}

/// Connects to a cluster lazily, which means you don't know if the connection was successful until you start querying
///
/// This is useful when you need a cluster synchronously to query asynchronously
public convenience init(
lazyConnectingTo settings: ConnectionSettings,
logger: Logger = Logger(label: "org.openkitten.mongokitten.cluster")
logger: Logger = Logger(label: "org.openkitten.mongokitten.cluster"),
api: ServerApi? = nil
) throws {
guard settings.hosts.count > 0 else {
logger.error("No MongoDB servers were specified while creating a cluster")
throw MongoError(.cannotConnect, reason: .noHostSpecified)
}

self.init(settings: settings, logger: logger)
self.init(settings: settings, logger: logger, api: api)

Task {
// Kick off the connection process
Expand All @@ -121,14 +126,15 @@ public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {
public convenience init(
connectingTo settings: ConnectionSettings,
allowFailure: Bool = false,
logger: Logger = Logger(label: "org.openkitten.mongokitten.cluster")
logger: Logger = Logger(label: "org.openkitten.mongokitten.cluster"),
api: ServerApi? = nil
) async throws {
guard settings.hosts.count > 0 else {
logger.error("No MongoDB servers were specified while creating a cluster")
throw MongoError(.cannotConnect, reason: .noHostSpecified)
}

self.init(settings: settings, logger: logger)
self.init(settings: settings, logger: logger, api: api)

// Resolve SRV hostnames
try await resolveSettings()
Expand Down Expand Up @@ -235,7 +241,8 @@ public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {
logger: logger,
onGroup: group,
resolver: self.dns,
sessionManager: sessionManager
sessionManager: sessionManager,
api: serverApi
)
connection.slaveOk.store(slaveOk)

Expand Down
4 changes: 4 additions & 0 deletions Sources/MongoClient/Connection+Execute.swift
Expand Up @@ -95,6 +95,8 @@ extension MongoConnection {
] as Document, forKey: "lsid")
}

if let serverVersion = serverApi { serverVersion.propagateCommand(&command)}

return try await executeMessage(
OpQuery(
query: command,
Expand All @@ -119,6 +121,8 @@ extension MongoConnection {
] as Document, forKey: "lsid")
}

if let serverVersion = serverApi { serverVersion.propagateCommand(&command)}

// TODO: When retrying a write, don't resend transaction messages except commit & abort
if let transaction = transaction {
command.appendValue(transaction.number, forKey: "txnNumber")
Expand Down
16 changes: 11 additions & 5 deletions Sources/MongoClient/Connection.swift
Expand Up @@ -29,6 +29,8 @@ public struct MongoHandshakeResult {
}

public final actor MongoConnection: @unchecked Sendable {
public let serverApi: ServerApi?

/// The NIO channel
internal let channel: Channel
public nonisolated var logger: Logger { context.logger }
Expand Down Expand Up @@ -76,10 +78,11 @@ public final actor MongoConnection: @unchecked Sendable {
}

/// Creates a connection that can communicate with MongoDB over a channel
public init(channel: Channel, context: MongoClientContext, sessionManager: MongoSessionManager = .init()) {
public init(channel: Channel, context: MongoClientContext, sessionManager: MongoSessionManager = .init(), api: ServerApi? = nil) {
self.sessionManager = sessionManager
self.channel = channel
self.context = context
self.serverApi = api
}

public static func addHandlers(to channel: Channel, context: MongoClientContext) -> EventLoopFuture<Void> {
Expand All @@ -91,12 +94,13 @@ public final actor MongoConnection: @unchecked Sendable {
settings: ConnectionSettings,
logger: Logger = Logger(label: "org.openkitten.mongokitten.connection"),
resolver: Resolver? = nil,
clientDetails: MongoClientDetails? = nil
clientDetails: MongoClientDetails? = nil,
api: ServerApi? = nil
) async throws -> MongoConnection {
#if canImport(NIOTransportServices) && os(iOS)
return try await connect(settings: settings, logger: logger, onGroup: NIOTSEventLoopGroup(loopCount: 1), resolver: resolver, clientDetails: clientDetails)
#else
return try await connect(settings: settings, logger: logger, onGroup: MultiThreadedEventLoopGroup(numberOfThreads: 1), resolver: resolver, clientDetails: clientDetails)
return try await connect(settings: settings, logger: logger, onGroup: MultiThreadedEventLoopGroup(numberOfThreads: 1), resolver: resolver, clientDetails: clientDetails, api: api)
#endif
}

Expand All @@ -106,7 +110,8 @@ public final actor MongoConnection: @unchecked Sendable {
onGroup group: _MongoPlatformEventLoopGroup,
resolver: Resolver? = nil,
clientDetails: MongoClientDetails? = nil,
sessionManager: MongoSessionManager = .init()
sessionManager: MongoSessionManager = .init(),
api: ServerApi? = nil
) async throws -> MongoConnection {
let context = MongoClientContext(logger: logger)

Expand Down Expand Up @@ -156,7 +161,8 @@ public final actor MongoConnection: @unchecked Sendable {
let connection = MongoConnection(
channel: channel,
context: context,
sessionManager: sessionManager
sessionManager: sessionManager,
api: api
)

try await connection.authenticate(
Expand Down
1 change: 1 addition & 0 deletions Sources/MongoClient/ConnectionPool.swift
Expand Up @@ -30,6 +30,7 @@ public protocol MongoConnectionPool {
var wireVersion: WireVersion? { get async }
var sessionManager: MongoSessionManager { get }
var logger: Logger { get }
var serverApi: ServerApi? { get }
}

extension MongoConnection: MongoConnectionPool {
Expand Down
1 change: 1 addition & 0 deletions Sources/MongoClient/Exports.swift
@@ -1 +1,2 @@
@_exported import MongoCore
@_exported import BSON
6 changes: 5 additions & 1 deletion Sources/MongoClient/MongoSingleConnectionPool.swift
Expand Up @@ -3,6 +3,8 @@ import Logging
import MongoCore

public final actor MongoSingleConnectionPool: MongoConnectionPool {
public let serverApi: ServerApi?

public typealias BuildConnection = @Sendable () async throws -> MongoConnection

public var wireVersion: WireVersion? {
Expand All @@ -21,11 +23,13 @@ public final actor MongoSingleConnectionPool: MongoConnectionPool {
public init(
authenticationSource: String = "admin",
credentials: ConnectionSettings.Authentication = .unauthenticated,
buildConnection: @escaping BuildConnection
buildConnection: @escaping BuildConnection,
api: ServerApi? = nil
) {
self.authenticationSource = authenticationSource
self.credentials = credentials
self.buildConnection = buildConnection
self.serverApi = api
}

public func next(for request: ConnectionPoolRequest) async throws -> MongoConnection {
Expand Down
25 changes: 25 additions & 0 deletions Sources/MongoClient/ServerApi.swift
@@ -0,0 +1,25 @@
import Foundation
import MongoCore

public struct ServerApiVersion {
internal enum _Version: String {
case v1 = "1"
}

internal let _version: _Version

public static let V1 = ServerApiVersion(_version: .v1)
}


public struct ServerApi {
var version: ServerApiVersion
var strict: Bool? = false
var deprecationErrors: Bool? = false

func propagateCommand( _ command: inout Document) {
command.appendValue(self.version._version.rawValue, forKey: "apiVersion")
if let strict = self.strict { command.appendValue(strict, forKey: "apiStrict") }
if let deprecationErrors = self.deprecationErrors { command.appendValue(deprecationErrors, forKey: "apiDeprecationErrors") }
}
}
File renamed without changes.
18 changes: 10 additions & 8 deletions Sources/MongoKitten/MongoDatabase.swift
Expand Up @@ -39,15 +39,15 @@ public class MongoDatabase {
/// Connect to the database at the given `uri`
///
/// - parameter uri: A MongoDB URI that contains at least a database component
public static func lazyConnect(to uri: String) throws -> MongoDatabase {
try lazyConnect(to: ConnectionSettings(uri))
public static func lazyConnect(to uri: String, api: ServerApi? = nil) throws -> MongoDatabase {
try lazyConnect(to: ConnectionSettings(uri), api: api)
}

/// Connect to the database at the given `uri`
///
/// - parameter uri: A MongoDB URI that contains at least a database component
public static func connect(to uri: String) async throws -> MongoDatabase {
try await connect(to: ConnectionSettings(uri))
public static func connect(to uri: String, api: ServerApi? = nil) async throws -> MongoDatabase {
try await connect(to: ConnectionSettings(uri), api: api)
}

/// Connect to the database with the given settings _lazily_. You can also use `lazyConnect(_:on:)` to connect by using a connection string.
Expand All @@ -56,15 +56,16 @@ public class MongoDatabase {
///
/// - parameter settings: The connection settings, which must include a database name
public static func lazyConnect(
to settings: ConnectionSettings
to settings: ConnectionSettings,
api: ServerApi? = nil
) throws -> MongoDatabase {
let logger = Logger(label: "org.openkitten.mongokitten")
guard let targetDatabase = settings.targetDatabase else {
logger.critical("Cannot connect to MongoDB: No target database specified")
throw MongoKittenError(.cannotConnect, reason: .noTargetDatabaseSpecified)
}

let cluster = try MongoCluster(lazyConnectingTo: settings, logger: logger)
let cluster = try MongoCluster(lazyConnectingTo: settings, logger: logger, api: api)
return MongoDatabase(named: targetDatabase, pool: cluster)
}

Expand All @@ -74,15 +75,16 @@ public class MongoDatabase {
///
/// - parameter settings: The connection settings, which must include a database name
public static func connect(
to settings: ConnectionSettings
to settings: ConnectionSettings,
api: ServerApi? = nil
) async throws -> MongoDatabase {
let logger = Logger(label: "org.openkitten.mongokitten")
guard let targetDatabase = settings.targetDatabase else {
logger.critical("Cannot connect to MongoDB: No target database specified")
throw MongoKittenError(.cannotConnect, reason: .noTargetDatabaseSpecified)
}

let cluster = try await MongoCluster(connectingTo: settings, logger: logger)
let cluster = try await MongoCluster(connectingTo: settings, logger: logger, api: api)
return MongoDatabase(named: targetDatabase, pool: cluster)
}

Expand Down