diff --git a/Sources/MongoSwift/ChangeStream.swift b/Sources/MongoSwift/ChangeStream.swift index a5a71e7d3..55204cf9c 100644 --- a/Sources/MongoSwift/ChangeStream.swift +++ b/Sources/MongoSwift/ChangeStream.swift @@ -6,6 +6,8 @@ import NIO private struct MongocChangeStream: MongocCursorWrapper { internal let pointer: OpaquePointer + internal static var isLazy: Bool { return false } + fileprivate init(stealing ptr: OpaquePointer) { self.pointer = ptr } @@ -63,11 +65,16 @@ public class ChangeStream: CursorProtocol { /// The cursor this change stream is wrapping. private let wrappedCursor: Cursor - /// Process an event before returning it to the user. + /// Process an event before returning it to the user, or does nothing and returns nil if the provided event is nil. private func processEvent(_ event: Document?) throws -> T? { guard let event = event else { return nil } + return try self.processEvent(event) + } + + /// Process an event before returning it to the user. + private func processEvent(_ event: Document) throws -> T { // Update the resumeToken with the `_id` field from the document. guard let resumeToken = event["_id"]?.documentValue else { throw InternalError(message: "_id field is missing from the change stream document.") @@ -167,6 +174,27 @@ public class ChangeStream: CursorProtocol { } } + /** + * Consolidate the currently available results of the change stream into an array of type `T`. + * + * Since `toArray` will only fetch the currently available results, it may return more data if it is called again + * while the change stream is still alive. + * + * - Returns: + * An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error. + * + * If the future evaluates to an error, that error is likely one of the following: + * - `CommandError` if an error occurs while fetching more results from the server. + * - `LogicError` if this function is called after the change stream has died. + * - `LogicError` if this function is called and the session associated with this change stream is inactive. + * - `DecodingError` if an error occurs decoding the server's responses. + */ + public func toArray() -> EventLoopFuture<[T]> { + return self.client.operationExecutor.execute { + try self.wrappedCursor.toArray().map(self.processEvent) + } + } + /** * Kill this change stream. * diff --git a/Sources/MongoSwift/CursorCommon.swift b/Sources/MongoSwift/CursorCommon.swift index 8073395c6..8752df0a8 100644 --- a/Sources/MongoSwift/CursorCommon.swift +++ b/Sources/MongoSwift/CursorCommon.swift @@ -40,6 +40,11 @@ internal protocol CursorProtocol { */ func tryNext() -> EventLoopFuture + /// Retrieves all the documents currently available in this cursor. If the cursor is not tailable, exhausts it. If + /// the cursor is tailable or is a change stream, this method may return more data if it is called again while the + /// cursor is still alive. + func toArray() -> EventLoopFuture<[T]> + /** * Kills this cursor. * @@ -67,6 +72,9 @@ internal protocol MongocCursorWrapper { /// The underlying libmongoc pointer. var pointer: OpaquePointer { get } + /// Indicates whether this type lazily sends its corresponding initial command to the server. + static var isLazy: Bool { get } + /// Method wrapping the appropriate libmongoc "error" function (e.g. `mongoc_cursor_error_document`). func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer) -> Bool @@ -96,10 +104,28 @@ internal class Cursor { /// The state of this cursor. private var state: State + /// Used to store a cached next value to return, if one exists. + private enum CachedDocument { + /// Indicates that the associated value is the next value to return. This value may be nil. + case cached(Document?) + /// Indicates that there is no value cached. + case none + + /// Get the contents of the cache and clear it. + fileprivate mutating func clear() -> CachedDocument { + let copy = self + self = .none + return copy + } + } + + /// Tracks the caching status of this cursor. + private var cached: CachedDocument + /// The type of this cursor. Useful for indicating whether or not it is tailable. private let type: CursorType - /// Lock used to synchronize usage of the internal state. + /// Lock used to synchronize usage of the internal state: specifically the `state` and `cached` properties. /// This lock should only be acquired in the bodies of non-private methods. private let lock: Lock @@ -202,16 +228,34 @@ internal class Cursor { self.type = type self.lock = Lock() self.isClosing = NIOAtomic.makeAtomic(value: false) + self.cached = .none // If there was an error constructing the cursor, throw it. if let error = self.getMongocError() { self.close() throw error } + + // if this type lazily sends its initial command, retrieve and cache the first document so that we start I/O. + if CursorKind.isLazy { + self.cached = try .cached(self.tryNext()) + } } /// Whether the cursor is alive. internal var isAlive: Bool { + return self.lock.withLock { + self._isAlive + } + } + + /// Checks whether the cursor is alive. Meant for private use only. + /// This property should only be read while the lock is held. + private var _isAlive: Bool { + if case .cached = self.cached { + return true + } + switch self.state { case .open: return true @@ -224,11 +268,20 @@ internal class Cursor { /// This method is blocking and should only be run via the executor. internal func next() throws -> Document? { return try self.lock.withLock { - guard self.isAlive else { + guard self._isAlive else { throw ClosedCursorError } + + if case let .cached(result) = self.cached.clear() { + // If there are no more results forthcoming after clearing the cache, or the cache had a non-nil + // result in it, return that. + if !self._isAlive || result != nil { + return result + } + } + // Keep trying until either the cursor is killed or a notification has been sent by close - while self.isAlive && !self.isClosing.load() { + while self._isAlive && !self.isClosing.load() { if let doc = try self.getNextDocument() { return doc } @@ -241,16 +294,31 @@ internal class Cursor { /// This method is blocking and should only be run via the executor. internal func tryNext() throws -> Document? { return try self.lock.withLock { - try self.getNextDocument() + if case let .cached(result) = self.cached.clear() { + return result + } + return try self.getNextDocument() } } /// Retreive all the currently available documents in the result set. /// This will not exhaust the cursor. /// This method is blocking and should only be run via the executor. - internal func all() throws -> [Document] { + internal func toArray() throws -> [Document] { return try self.lock.withLock { + guard self._isAlive else { + throw ClosedCursorError + } + var results: [Document] = [] + if case let .cached(result) = self.cached.clear(), let unwrappedResult = result { + results.append(unwrappedResult) + } + // the only value left was the cached one + guard self._isAlive else { + return results + } + while let result = try self.getNextDocument() { results.append(result) } @@ -264,6 +332,7 @@ internal class Cursor { internal func kill() { self.isClosing.store(true) self.lock.withLock { + self.cached = .none self.close() } self.isClosing.store(false) diff --git a/Sources/MongoSwift/MongoCollection+Indexes.swift b/Sources/MongoSwift/MongoCollection+Indexes.swift index 97b6878e9..35fcc7aa0 100644 --- a/Sources/MongoSwift/MongoCollection+Indexes.swift +++ b/Sources/MongoSwift/MongoCollection+Indexes.swift @@ -408,7 +408,7 @@ extension MongoCollection { */ public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> { return self.listIndexes().flatMap { cursor in - cursor.all() + cursor.toArray() }.flatMapThrowing { models in try models.map { model in guard let name = model.options?.name else { diff --git a/Sources/MongoSwift/MongoCursor.swift b/Sources/MongoSwift/MongoCursor.swift index 2b53efb83..a6f548eea 100644 --- a/Sources/MongoSwift/MongoCursor.swift +++ b/Sources/MongoSwift/MongoCursor.swift @@ -7,6 +7,8 @@ import NIOConcurrencyHelpers internal struct MongocCursor: MongocCursorWrapper { internal let pointer: OpaquePointer + internal static var isLazy: Bool { return true } + internal init(referencing pointer: OpaquePointer) { self.pointer = pointer } @@ -39,24 +41,6 @@ public class MongoCursor: CursorProtocol { /// Decoder from the client, database, or collection that created this cursor. internal let decoder: BSONDecoder - /// Used to store a cached next value to return, if one exists. - private enum CachedDocument { - /// Indicates that the associated value is the next value to return. This value may be nil. - case cached(T?) - /// Indicates that there is no value cached. - case none - - /// Get the contents of the cache and clear it. - fileprivate mutating func clear() -> CachedDocument { - let copy = self - self = .none - return copy - } - } - - /// Tracks the caching status of this cursor. - private var cached: CachedDocument - /** * Initializes a new `MongoCursor` instance. Not meant to be instantiated directly by a user. When `forceIO` is * true, this initializer will force a connection to the server if one is not already established. @@ -75,7 +59,6 @@ public class MongoCursor: CursorProtocol { ) throws { self.client = client self.decoder = decoder - self.cached = .none self.wrappedCursor = try Cursor( mongocCursor: MongocCursor(referencing: cursorPtr), @@ -83,17 +66,6 @@ public class MongoCursor: CursorProtocol { session: session, type: cursorType ?? .nonTailable ) - - let next = try self.decode(result: self.wrappedCursor.tryNext()) - self.cached = .cached(next) - } - - /// Close this cursor - /// - /// This method should only be called while the lock is held. - internal func blockingKill() { - self.cached = .none - self.wrappedCursor.kill() } /// Asserts that the cursor was closed. @@ -111,12 +83,7 @@ public class MongoCursor: CursorProtocol { /// Decodes the given document to the generic type. private func decode(doc: Document) throws -> T { - do { - return try self.decoder.decode(T.self, from: doc) - } catch { - self.blockingKill() - throw error - } + return try self.decoder.decode(T.self, from: doc) } /** @@ -130,9 +97,6 @@ public class MongoCursor: CursorProtocol { * This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`. */ public var isAlive: Bool { - if case .cached = self.cached { - return true - } return self.wrappedCursor.isAlive } @@ -147,37 +111,6 @@ public class MongoCursor: CursorProtocol { } } - /** - * Consolidate the currently available results of the cursor into an array of type `T`. - * - * If this cursor is not tailable, this method will exhaust it. - * - * If this cursor is tailable, `all` will only fetch the currently available results, and it - * may return more data if it is called again while the cursor is still alive. - * - * - Returns: - * An `EventLoopFuture<[T]>` evaluating to the results currently available to this cursor or an error. - * - * If the future evaluates to an error, that error is likely one of the following: - * - `CommandError` if an error occurs while fetching more results from the server. - * - `LogicError` if this function is called after the cursor has died. - * - `LogicError` if this function is called and the session associated with this cursor is inactive. - * - `DecodingError` if an error occurs decoding the server's responses. - */ - internal func all() -> EventLoopFuture<[T]> { - return self.client.operationExecutor.execute { - var results: [T] = [] - if case let .cached(result) = self.cached.clear(), let unwrappedResult = result { - results.append(unwrappedResult) - } - // If the cursor still could have more results after clearing the cache, collect them too. - if self.isAlive { - results += try self.wrappedCursor.all().map { try self.decode(doc: $0) } - } - return results - } - } - /** * Attempt to get the next `T` from the cursor, returning `nil` if there are no results. * @@ -200,10 +133,7 @@ public class MongoCursor: CursorProtocol { */ public func tryNext() -> EventLoopFuture { return self.client.operationExecutor.execute { - if case let .cached(result) = self.cached.clear() { - return result - } - return try self.decode(result: self.wrappedCursor.tryNext()) + try self.decode(result: self.wrappedCursor.tryNext()) } } @@ -226,15 +156,30 @@ public class MongoCursor: CursorProtocol { */ public func next() -> EventLoopFuture { return self.client.operationExecutor.execute { - if case let .cached(result) = self.cached.clear() { - // If there are no more results forthcoming after clearing the cache, or the cache had a non-nil - // result in it, return that. - if !self.isAlive || result != nil { - return result - } - } - // Otherwise iterate until a result is received. - return try self.decode(result: self.wrappedCursor.next()) + try self.decode(result: self.wrappedCursor.next()) + } + } + + /** + * Consolidate the currently available results of the cursor into an array of type `T`. + * + * If this cursor is not tailable, this method will exhaust it. + * + * If this cursor is tailable, `toArray` will only fetch the currently available results, and it + * may return more data if it is called again while the cursor is still alive. + * + * - Returns: + * An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error. + * + * If the future evaluates to an error, that error is likely one of the following: + * - `CommandError` if an error occurs while fetching more results from the server. + * - `LogicError` if this function is called after the cursor has died. + * - `LogicError` if this function is called and the session associated with this cursor is inactive. + * - `DecodingError` if an error occurs decoding the server's responses. + */ + public func toArray() -> EventLoopFuture<[T]> { + return self.client.operationExecutor.execute { + try self.wrappedCursor.toArray().map { try self.decode(doc: $0) } } } @@ -249,7 +194,7 @@ public class MongoCursor: CursorProtocol { */ public func kill() -> EventLoopFuture { return self.client.operationExecutor.execute { - self.blockingKill() + self.wrappedCursor.kill() } } } diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index bdc7f395b..378e69270 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -13,6 +13,7 @@ extension AsyncMongoCursorTests { ("testTailableAwaitAsyncCursor", testTailableAwaitAsyncCursor), ("testTailableAsyncCursor", testTailableAsyncCursor), ("testAsyncNext", testAsyncNext), + ("testCursorToArray", testCursorToArray), ] } @@ -49,6 +50,7 @@ extension ChangeStreamTests { ("testChangeStreamNext", testChangeStreamNext), ("testChangeStreamError", testChangeStreamError), ("testChangeStreamEmpty", testChangeStreamEmpty), + ("testChangeStreamToArray", testChangeStreamToArray), ] } diff --git a/Tests/MongoSwiftTests/ChangeStreamTests.swift b/Tests/MongoSwiftTests/ChangeStreamTests.swift index 2425ffeb6..435283499 100644 --- a/Tests/MongoSwiftTests/ChangeStreamTests.swift +++ b/Tests/MongoSwiftTests/ChangeStreamTests.swift @@ -90,4 +90,36 @@ final class ChangeStreamTests: MongoSwiftTestCase { expect(try nextFuture.wait()).to(beNil()) } } + + func testChangeStreamToArray() throws { + guard MongoSwiftTestCase.topologyType != .single else { + print(unsupportedTopologyMessage(testName: self.name)) + return + } + try self.withTestClient { client in + let db = client.db(type(of: self).testDatabase) + try? db.collection(self.getCollectionName()).drop().wait() + let coll = try db.createCollection(self.getCollectionName()).wait() + + let stream = try coll.watch().wait() + expect(stream.isAlive).to(beTrue()) + + // initially, no events, but stream should stay alive + expect(try stream.toArray().wait()).to(beEmpty()) + expect(stream.isAlive).to(beTrue()) + + // we should get back single event now via toArray + _ = try coll.insertOne(["x": 1]).wait() + let results = try stream.toArray().wait() + expect(results[0].fullDocument?["x"]).to(equal(1)) + expect(stream.isAlive).to(beTrue()) + + // no more events, but stream should stay alive + expect(try stream.toArray().wait()).to(beEmpty()) + expect(stream.isAlive).to(beTrue()) + + try stream.kill().wait() + expect(stream.isAlive).to(beFalse()) + } + } } diff --git a/Tests/MongoSwiftTests/MongoCursorTests.swift b/Tests/MongoSwiftTests/MongoCursorTests.swift index 85c0b1bda..f55aaaf81 100644 --- a/Tests/MongoSwiftTests/MongoCursorTests.swift +++ b/Tests/MongoSwiftTests/MongoCursorTests.swift @@ -27,7 +27,7 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // insert and read out one document _ = try coll.insertOne(doc1).wait() cursor = try coll.find().wait() - var results = try cursor.all().wait() + let results = try cursor.toArray().wait() expect(results).to(haveCount(1)) expect(results[0]).to(equal(doc1)) // cursor should be closed now that its exhausted @@ -35,17 +35,6 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // iterating a dead cursor should error expect(try cursor.next().wait()).to(throwError()) - // iterating after calling all should error. - _ = try coll.insertMany([doc2, doc3]).wait() - cursor = try coll.find().wait() - results = try cursor.all().wait() - expect(results).to(haveCount(3)) - expect(results).to(equal([doc1, doc2, doc3])) - // cursor should be closed now that its exhausted - expect(cursor.isAlive).to(beFalse()) - // iterating dead cursor should error - expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) - cursor = try coll.find(options: FindOptions(batchSize: 1)).wait() expect(try cursor.next().wait()).toNot(throwError()) @@ -106,7 +95,7 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // for each doc we insert, check that it arrives in the cursor next, // and that the cursor is still alive afterward let checkNextResult: (Document) throws -> Void = { doc in - let results = try cursor.all().wait() + let results = try cursor.toArray().wait() expect(results).to(haveCount(1)) expect(results[0]).to(equal(doc)) expect(cursor.isAlive).to(beTrue()) @@ -164,4 +153,57 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) } } + + func testCursorToArray() throws { + // normal cursor + try self.withTestNamespace { _, _, coll in + // query empty collection + var cursor = try coll.find().wait() + expect(try cursor.toArray().wait()).to(equal([])) + expect(cursor.isAlive).to(beFalse()) + // iterating dead cursor should error + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // iterating after calling toArray should error. + _ = try coll.insertMany([doc1, doc2, doc3]).wait() + cursor = try coll.find().wait() + var results = try cursor.toArray().wait() + expect(results).to(equal([doc1, doc2, doc3])) + // cursor should be closed now that its exhausted + expect(cursor.isAlive).to(beFalse()) + // iterating dead cursor should error + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // calling toArray on a closed cursor should error. + cursor = try coll.find().wait() + results = try cursor.toArray().wait() + expect(results).to(haveCount(3)) + expect(try cursor.toArray().wait()).to(throwError()) + } + + // tailable cursor + let collOptions = CreateCollectionOptions(capped: true, max: 3, size: 1000) + try self.withTestNamespace(collectionOptions: collOptions) { _, _, coll in + let cursorOpts = FindOptions(cursorType: .tailable) + + var cursor = try coll.find(options: cursorOpts).wait() + defer { try? cursor.kill().wait() } + + expect(try cursor.toArray().wait()).to(beEmpty()) + // no documents matched initial query, so cursor is dead + expect(cursor.isAlive).to(beFalse()) + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // insert a doc so something matches initial query + _ = try coll.insertOne(doc1).wait() + cursor = try coll.find(options: cursorOpts).wait() + expect(try cursor.toArray().wait()).to(equal([doc1])) + expect(cursor.isAlive).to(beTrue()) + + // newly inserted docs will be returned by toArray + _ = try coll.insertMany([doc2, doc3]).wait() + expect(try cursor.toArray().wait()).to(equal([doc2, doc3])) + expect(cursor.isAlive).to(beTrue()) + } + } }