From 0237c209959ff5d561fd79a5281eb367749c2a01 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Wed, 12 Feb 2020 19:23:03 -0500 Subject: [PATCH 1/9] wip --- Sources/MongoSwift/ChangeStream.swift | 40 +++++++++++++++++++++++---- Sources/MongoSwift/CursorCommon.swift | 7 ++++- Sources/MongoSwift/MongoCursor.swift | 12 +++++--- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/Sources/MongoSwift/ChangeStream.swift b/Sources/MongoSwift/ChangeStream.swift index a5a71e7d3..000fe9dfb 100644 --- a/Sources/MongoSwift/ChangeStream.swift +++ b/Sources/MongoSwift/ChangeStream.swift @@ -64,10 +64,7 @@ public class ChangeStream: CursorProtocol { private let wrappedCursor: Cursor /// Process an event before returning it to the user. - private func processEvent(_ event: Document?) throws -> T? { - guard let event = event else { - return nil - } + private func processEvent(_ event: Document) throws -> T { // Update the resumeToken with the `_id` field from the document. guard let resumeToken = event["_id"]?.documentValue else { throw InternalError(message: "_id field is missing from the change stream document.") @@ -139,7 +136,10 @@ public class ChangeStream: CursorProtocol { */ public func next() -> EventLoopFuture { return self.client.operationExecutor.execute { - try self.processEvent(self.wrappedCursor.next()) + guard let next = try self.wrappedCursor.next() else { + return nil + } + return try self.processEvent(next) } } @@ -163,7 +163,35 @@ public class ChangeStream: CursorProtocol { */ public func tryNext() -> EventLoopFuture { return self.client.operationExecutor.execute { - try self.processEvent(self.wrappedCursor.tryNext()) + guard let next = try self.wrappedCursor.tryNext() else { + return nil + } + return try self.processEvent(next) + } + } + + /** + * Consolidate the currently available results of the change stream into an array of type `T`. + * + * Since `toArray` will only fetch the currently available results, it may return more data if it is called again + * while the change stream is still alive. + * + * - Returns: + * An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error. + * + * If the future evaluates to an error, that error is likely one of the following: + * - `CommandError` if an error occurs while fetching more results from the server. + * - `LogicError` if this function is called after the change stream has died. + * - `LogicError` if this function is called and the session associated with this change stream is inactive. + * - `DecodingError` if an error occurs decoding the server's responses. + */ + public func toArray() -> EventLoopFuture<[T]> { + guard self.isAlive else { + return self.client.operationExecutor.makeFailedFuture(ClosedCursorError) + } + + return self.client.operationExecutor.execute { + try self.wrappedCursor.toArray().map(self.processEvent) } } diff --git a/Sources/MongoSwift/CursorCommon.swift b/Sources/MongoSwift/CursorCommon.swift index 8073395c6..c1ede067c 100644 --- a/Sources/MongoSwift/CursorCommon.swift +++ b/Sources/MongoSwift/CursorCommon.swift @@ -40,6 +40,11 @@ internal protocol CursorProtocol { */ func tryNext() -> EventLoopFuture + /// Retrieves all the documents currently available in this cursor. If the cursor is not tailable, exhausts it. If + /// the cursor is tailable or is a change stream, this method may return more data if it is called again while the + /// cursor is still alive. + func toArray() -> EventLoopFuture<[T]> + /** * Kills this cursor. * @@ -248,7 +253,7 @@ internal class Cursor { /// Retreive all the currently available documents in the result set. /// This will not exhaust the cursor. /// This method is blocking and should only be run via the executor. - internal func all() throws -> [Document] { + internal func toArray() throws -> [Document] { return try self.lock.withLock { var results: [Document] = [] while let result = try self.getNextDocument() { diff --git a/Sources/MongoSwift/MongoCursor.swift b/Sources/MongoSwift/MongoCursor.swift index 2b53efb83..779878bee 100644 --- a/Sources/MongoSwift/MongoCursor.swift +++ b/Sources/MongoSwift/MongoCursor.swift @@ -152,11 +152,11 @@ public class MongoCursor: CursorProtocol { * * If this cursor is not tailable, this method will exhaust it. * - * If this cursor is tailable, `all` will only fetch the currently available results, and it + * If this cursor is tailable, `toArray` will only fetch the currently available results, and it * may return more data if it is called again while the cursor is still alive. * * - Returns: - * An `EventLoopFuture<[T]>` evaluating to the results currently available to this cursor or an error. + * An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error. * * If the future evaluates to an error, that error is likely one of the following: * - `CommandError` if an error occurs while fetching more results from the server. @@ -164,7 +164,11 @@ public class MongoCursor: CursorProtocol { * - `LogicError` if this function is called and the session associated with this cursor is inactive. * - `DecodingError` if an error occurs decoding the server's responses. */ - internal func all() -> EventLoopFuture<[T]> { + public func toArray() -> EventLoopFuture<[T]> { + guard self.isAlive else { + return self.client.operationExecutor.makeFailedFuture(ClosedCursorError) + } + return self.client.operationExecutor.execute { var results: [T] = [] if case let .cached(result) = self.cached.clear(), let unwrappedResult = result { @@ -172,7 +176,7 @@ public class MongoCursor: CursorProtocol { } // If the cursor still could have more results after clearing the cache, collect them too. if self.isAlive { - results += try self.wrappedCursor.all().map { try self.decode(doc: $0) } + results += try self.wrappedCursor.toArray().map { try self.decode(doc: $0) } } return results } From 34c5d294a20aa4f5430c34e1eac419d75fd66726 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 13 Feb 2020 12:39:15 -0500 Subject: [PATCH 2/9] wip --- Sources/MongoSwift/MongoCollection+Indexes.swift | 2 +- Sources/MongoSwiftSync/MongoCursor.swift | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/MongoSwift/MongoCollection+Indexes.swift b/Sources/MongoSwift/MongoCollection+Indexes.swift index 97b6878e9..35fcc7aa0 100644 --- a/Sources/MongoSwift/MongoCollection+Indexes.swift +++ b/Sources/MongoSwift/MongoCollection+Indexes.swift @@ -408,7 +408,7 @@ extension MongoCollection { */ public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> { return self.listIndexes().flatMap { cursor in - cursor.all() + cursor.toArray() }.flatMapThrowing { models in try models.map { model in guard let name = model.options?.name else { diff --git a/Sources/MongoSwiftSync/MongoCursor.swift b/Sources/MongoSwiftSync/MongoCursor.swift index cf1c7b7c2..4621f5163 100644 --- a/Sources/MongoSwiftSync/MongoCursor.swift +++ b/Sources/MongoSwiftSync/MongoCursor.swift @@ -91,7 +91,7 @@ public class MongoCursor: CursorProtocol { } /** - * Returns an array of type `T` from the results of this cursor. + * Returns an array of type `T` containing the results of this cursor. * * If this cursor is tailable, this method will block until the cursor is closed or exhausted. * @@ -102,7 +102,7 @@ public class MongoCursor: CursorProtocol { * - `LogicError` if this function is called and the session associated with this cursor is inactive. * - `DecodingError` if an error occurs decoding the server's response. */ - internal func _all() throws -> [T] { + internal func toArray() throws -> [T] { return try self.map { switch $0 { case let .success(t): From fc229a3dfebafd6b3d2c39797cf92a4eebb20b74 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 13 Feb 2020 15:48:54 -0500 Subject: [PATCH 3/9] Revert "wip" This reverts commit 34c5d294a20aa4f5430c34e1eac419d75fd66726. --- Sources/MongoSwift/MongoCollection+Indexes.swift | 2 +- Sources/MongoSwiftSync/MongoCursor.swift | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/MongoSwift/MongoCollection+Indexes.swift b/Sources/MongoSwift/MongoCollection+Indexes.swift index 35fcc7aa0..97b6878e9 100644 --- a/Sources/MongoSwift/MongoCollection+Indexes.swift +++ b/Sources/MongoSwift/MongoCollection+Indexes.swift @@ -408,7 +408,7 @@ extension MongoCollection { */ public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> { return self.listIndexes().flatMap { cursor in - cursor.toArray() + cursor.all() }.flatMapThrowing { models in try models.map { model in guard let name = model.options?.name else { diff --git a/Sources/MongoSwiftSync/MongoCursor.swift b/Sources/MongoSwiftSync/MongoCursor.swift index 4621f5163..cf1c7b7c2 100644 --- a/Sources/MongoSwiftSync/MongoCursor.swift +++ b/Sources/MongoSwiftSync/MongoCursor.swift @@ -91,7 +91,7 @@ public class MongoCursor: CursorProtocol { } /** - * Returns an array of type `T` containing the results of this cursor. + * Returns an array of type `T` from the results of this cursor. * * If this cursor is tailable, this method will block until the cursor is closed or exhausted. * @@ -102,7 +102,7 @@ public class MongoCursor: CursorProtocol { * - `LogicError` if this function is called and the session associated with this cursor is inactive. * - `DecodingError` if an error occurs decoding the server's response. */ - internal func toArray() throws -> [T] { + internal func _all() throws -> [T] { return try self.map { switch $0 { case let .success(t): From bf2d50b19c15a82a5e050bfe69ad6671dc346a64 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Thu, 13 Feb 2020 18:11:05 -0500 Subject: [PATCH 4/9] fix compile and tests --- Sources/MongoSwift/MongoCollection+Indexes.swift | 2 +- Tests/MongoSwiftTests/MongoCursorTests.swift | 14 ++++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/Sources/MongoSwift/MongoCollection+Indexes.swift b/Sources/MongoSwift/MongoCollection+Indexes.swift index 97b6878e9..35fcc7aa0 100644 --- a/Sources/MongoSwift/MongoCollection+Indexes.swift +++ b/Sources/MongoSwift/MongoCollection+Indexes.swift @@ -408,7 +408,7 @@ extension MongoCollection { */ public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> { return self.listIndexes().flatMap { cursor in - cursor.all() + cursor.toArray() }.flatMapThrowing { models in try models.map { model in guard let name = model.options?.name else { diff --git a/Tests/MongoSwiftTests/MongoCursorTests.swift b/Tests/MongoSwiftTests/MongoCursorTests.swift index 85c0b1bda..7f7da6857 100644 --- a/Tests/MongoSwiftTests/MongoCursorTests.swift +++ b/Tests/MongoSwiftTests/MongoCursorTests.swift @@ -27,7 +27,7 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // insert and read out one document _ = try coll.insertOne(doc1).wait() cursor = try coll.find().wait() - var results = try cursor.all().wait() + var results = try cursor.toArray().wait() expect(results).to(haveCount(1)) expect(results[0]).to(equal(doc1)) // cursor should be closed now that its exhausted @@ -35,10 +35,10 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // iterating a dead cursor should error expect(try cursor.next().wait()).to(throwError()) - // iterating after calling all should error. + // iterating after calling toArray should error. _ = try coll.insertMany([doc2, doc3]).wait() cursor = try coll.find().wait() - results = try cursor.all().wait() + results = try cursor.toArray().wait() expect(results).to(haveCount(3)) expect(results).to(equal([doc1, doc2, doc3])) // cursor should be closed now that its exhausted @@ -46,6 +46,12 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // iterating dead cursor should error expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + // calling toArray on a closed cursor should error. + cursor = try coll.find().wait() + results = try cursor.toArray().wait() + expect(results).to(haveCount(3)) + expect(try cursor.toArray().wait()).to(throwError()) + cursor = try coll.find(options: FindOptions(batchSize: 1)).wait() expect(try cursor.next().wait()).toNot(throwError()) @@ -106,7 +112,7 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // for each doc we insert, check that it arrives in the cursor next, // and that the cursor is still alive afterward let checkNextResult: (Document) throws -> Void = { doc in - let results = try cursor.all().wait() + let results = try cursor.toArray().wait() expect(results).to(haveCount(1)) expect(results[0]).to(equal(doc)) expect(cursor.isAlive).to(beTrue()) From 0f6ca2a7d79fcfb2f742bfbeef4e0b8c0b1b2dcd Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 14 Feb 2020 14:48:05 -0500 Subject: [PATCH 5/9] add tests --- Tests/MongoSwiftTests/ChangeStreamTests.swift | 30 ++++++++ Tests/MongoSwiftTests/MongoCursorTests.swift | 72 ++++++++++++++----- 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/Tests/MongoSwiftTests/ChangeStreamTests.swift b/Tests/MongoSwiftTests/ChangeStreamTests.swift index 2425ffeb6..64906e4eb 100644 --- a/Tests/MongoSwiftTests/ChangeStreamTests.swift +++ b/Tests/MongoSwiftTests/ChangeStreamTests.swift @@ -90,4 +90,34 @@ final class ChangeStreamTests: MongoSwiftTestCase { expect(try nextFuture.wait()).to(beNil()) } } + + func testChangeStreamToArray() throws { + guard MongoSwiftTestCase.topologyType != .single else { + print(unsupportedTopologyMessage(testName: self.name)) + return + } + try self.withTestClient { client in + let db = client.db(type(of: self).testDatabase) + try? db.collection(self.getCollectionName()).drop().wait() + let coll = try db.createCollection(self.getCollectionName()).wait() + + let stream = try coll.watch().wait() + expect(stream.isAlive).to(beTrue()) + + expect(try stream.toArray().wait()).to(beEmpty()) + expect(stream.isAlive).to(beTrue()) + + _ = try coll.insertOne(["x": 1]).wait() + + let results = try stream.toArray().wait() + expect(results[0].fullDocument?["x"]).to(equal(1)) + expect(stream.isAlive).to(beTrue()) + + expect(try stream.toArray().wait()).to(beEmpty()) + expect(stream.isAlive).to(beTrue()) + + try stream.kill().wait() + expect(stream.isAlive).to(beFalse()) + } + } } diff --git a/Tests/MongoSwiftTests/MongoCursorTests.swift b/Tests/MongoSwiftTests/MongoCursorTests.swift index 7f7da6857..f55aaaf81 100644 --- a/Tests/MongoSwiftTests/MongoCursorTests.swift +++ b/Tests/MongoSwiftTests/MongoCursorTests.swift @@ -27,7 +27,7 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // insert and read out one document _ = try coll.insertOne(doc1).wait() cursor = try coll.find().wait() - var results = try cursor.toArray().wait() + let results = try cursor.toArray().wait() expect(results).to(haveCount(1)) expect(results[0]).to(equal(doc1)) // cursor should be closed now that its exhausted @@ -35,23 +35,6 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { // iterating a dead cursor should error expect(try cursor.next().wait()).to(throwError()) - // iterating after calling toArray should error. - _ = try coll.insertMany([doc2, doc3]).wait() - cursor = try coll.find().wait() - results = try cursor.toArray().wait() - expect(results).to(haveCount(3)) - expect(results).to(equal([doc1, doc2, doc3])) - // cursor should be closed now that its exhausted - expect(cursor.isAlive).to(beFalse()) - // iterating dead cursor should error - expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) - - // calling toArray on a closed cursor should error. - cursor = try coll.find().wait() - results = try cursor.toArray().wait() - expect(results).to(haveCount(3)) - expect(try cursor.toArray().wait()).to(throwError()) - cursor = try coll.find(options: FindOptions(batchSize: 1)).wait() expect(try cursor.next().wait()).toNot(throwError()) @@ -170,4 +153,57 @@ final class AsyncMongoCursorTests: MongoSwiftTestCase { expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) } } + + func testCursorToArray() throws { + // normal cursor + try self.withTestNamespace { _, _, coll in + // query empty collection + var cursor = try coll.find().wait() + expect(try cursor.toArray().wait()).to(equal([])) + expect(cursor.isAlive).to(beFalse()) + // iterating dead cursor should error + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // iterating after calling toArray should error. + _ = try coll.insertMany([doc1, doc2, doc3]).wait() + cursor = try coll.find().wait() + var results = try cursor.toArray().wait() + expect(results).to(equal([doc1, doc2, doc3])) + // cursor should be closed now that its exhausted + expect(cursor.isAlive).to(beFalse()) + // iterating dead cursor should error + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // calling toArray on a closed cursor should error. + cursor = try coll.find().wait() + results = try cursor.toArray().wait() + expect(results).to(haveCount(3)) + expect(try cursor.toArray().wait()).to(throwError()) + } + + // tailable cursor + let collOptions = CreateCollectionOptions(capped: true, max: 3, size: 1000) + try self.withTestNamespace(collectionOptions: collOptions) { _, _, coll in + let cursorOpts = FindOptions(cursorType: .tailable) + + var cursor = try coll.find(options: cursorOpts).wait() + defer { try? cursor.kill().wait() } + + expect(try cursor.toArray().wait()).to(beEmpty()) + // no documents matched initial query, so cursor is dead + expect(cursor.isAlive).to(beFalse()) + expect(try cursor.next().wait()).to(throwError(errorType: LogicError.self)) + + // insert a doc so something matches initial query + _ = try coll.insertOne(doc1).wait() + cursor = try coll.find(options: cursorOpts).wait() + expect(try cursor.toArray().wait()).to(equal([doc1])) + expect(cursor.isAlive).to(beTrue()) + + // newly inserted docs will be returned by toArray + _ = try coll.insertMany([doc2, doc3]).wait() + expect(try cursor.toArray().wait()).to(equal([doc2, doc3])) + expect(cursor.isAlive).to(beTrue()) + } + } } From 382871d95cdbabe70e39ac7288500a2b5eff6059 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 14 Feb 2020 14:52:10 -0500 Subject: [PATCH 6/9] Update ChangeStreamTests.swift --- Tests/MongoSwiftTests/ChangeStreamTests.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Tests/MongoSwiftTests/ChangeStreamTests.swift b/Tests/MongoSwiftTests/ChangeStreamTests.swift index 64906e4eb..435283499 100644 --- a/Tests/MongoSwiftTests/ChangeStreamTests.swift +++ b/Tests/MongoSwiftTests/ChangeStreamTests.swift @@ -104,15 +104,17 @@ final class ChangeStreamTests: MongoSwiftTestCase { let stream = try coll.watch().wait() expect(stream.isAlive).to(beTrue()) + // initially, no events, but stream should stay alive expect(try stream.toArray().wait()).to(beEmpty()) expect(stream.isAlive).to(beTrue()) + // we should get back single event now via toArray _ = try coll.insertOne(["x": 1]).wait() - let results = try stream.toArray().wait() expect(results[0].fullDocument?["x"]).to(equal(1)) expect(stream.isAlive).to(beTrue()) + // no more events, but stream should stay alive expect(try stream.toArray().wait()).to(beEmpty()) expect(stream.isAlive).to(beTrue()) From 07db27994c7a89fe6defbbe28fd951105f6026d1 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 14 Feb 2020 14:52:36 -0500 Subject: [PATCH 7/9] Update LinuxMain.swift --- Tests/LinuxMain.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index bdc7f395b..378e69270 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -13,6 +13,7 @@ extension AsyncMongoCursorTests { ("testTailableAwaitAsyncCursor", testTailableAwaitAsyncCursor), ("testTailableAsyncCursor", testTailableAsyncCursor), ("testAsyncNext", testAsyncNext), + ("testCursorToArray", testCursorToArray), ] } @@ -49,6 +50,7 @@ extension ChangeStreamTests { ("testChangeStreamNext", testChangeStreamNext), ("testChangeStreamError", testChangeStreamError), ("testChangeStreamEmpty", testChangeStreamEmpty), + ("testChangeStreamToArray", testChangeStreamToArray), ] } From f94eeac973767f76e368f87acd2fff4a982f8065 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 18 Feb 2020 15:49:18 -0500 Subject: [PATCH 8/9] move caching logic into CursorCommon --- Sources/MongoSwift/ChangeStream.swift | 22 +++-- Sources/MongoSwift/CursorCommon.swift | 66 ++++++++++++++- Sources/MongoSwift/MongoCursor.swift | 115 ++++++-------------------- 3 files changed, 99 insertions(+), 104 deletions(-) diff --git a/Sources/MongoSwift/ChangeStream.swift b/Sources/MongoSwift/ChangeStream.swift index 000fe9dfb..a4a4f6f4d 100644 --- a/Sources/MongoSwift/ChangeStream.swift +++ b/Sources/MongoSwift/ChangeStream.swift @@ -63,6 +63,14 @@ public class ChangeStream: CursorProtocol { /// The cursor this change stream is wrapping. private let wrappedCursor: Cursor + /// Process an event before returning it to the user, or does nothing and returns nil if the provided event is nil. + private func processEvent(_ event: Document?) throws -> T? { + guard let event = event else { + return nil + } + return try self.processEvent(event) + } + /// Process an event before returning it to the user. private func processEvent(_ event: Document) throws -> T { // Update the resumeToken with the `_id` field from the document. @@ -136,10 +144,7 @@ public class ChangeStream: CursorProtocol { */ public func next() -> EventLoopFuture { return self.client.operationExecutor.execute { - guard let next = try self.wrappedCursor.next() else { - return nil - } - return try self.processEvent(next) + try self.processEvent(self.wrappedCursor.next()) } } @@ -163,10 +168,7 @@ public class ChangeStream: CursorProtocol { */ public func tryNext() -> EventLoopFuture { return self.client.operationExecutor.execute { - guard let next = try self.wrappedCursor.tryNext() else { - return nil - } - return try self.processEvent(next) + try self.processEvent(self.wrappedCursor.tryNext()) } } @@ -186,10 +188,6 @@ public class ChangeStream: CursorProtocol { * - `DecodingError` if an error occurs decoding the server's responses. */ public func toArray() -> EventLoopFuture<[T]> { - guard self.isAlive else { - return self.client.operationExecutor.makeFailedFuture(ClosedCursorError) - } - return self.client.operationExecutor.execute { try self.wrappedCursor.toArray().map(self.processEvent) } diff --git a/Sources/MongoSwift/CursorCommon.swift b/Sources/MongoSwift/CursorCommon.swift index c1ede067c..bb7de88e3 100644 --- a/Sources/MongoSwift/CursorCommon.swift +++ b/Sources/MongoSwift/CursorCommon.swift @@ -101,10 +101,28 @@ internal class Cursor { /// The state of this cursor. private var state: State + /// Used to store a cached next value to return, if one exists. + private enum CachedDocument { + /// Indicates that the associated value is the next value to return. This value may be nil. + case cached(Document?) + /// Indicates that there is no value cached. + case none + + /// Get the contents of the cache and clear it. + fileprivate mutating func clear() -> CachedDocument { + let copy = self + self = .none + return copy + } + } + + /// Tracks the caching status of this cursor. + private var cached: CachedDocument + /// The type of this cursor. Useful for indicating whether or not it is tailable. private let type: CursorType - /// Lock used to synchronize usage of the internal state. + /// Lock used to synchronize usage of the internal state: specifically the `state` and `cached` properties. /// This lock should only be acquired in the bodies of non-private methods. private let lock: Lock @@ -207,16 +225,31 @@ internal class Cursor { self.type = type self.lock = Lock() self.isClosing = NIOAtomic.makeAtomic(value: false) + self.cached = .none // If there was an error constructing the cursor, throw it. if let error = self.getMongocError() { self.close() throw error } + + self.cached = try .cached(self.tryNext()) } /// Whether the cursor is alive. internal var isAlive: Bool { + return self.lock.withLock { + self._isAlive + } + } + + /// Checks whether the cursor is alive. Meant for private use only. + /// This property should only be read while the lock is held. + private var _isAlive: Bool { + if case .cached = self.cached { + return true + } + switch self.state { case .open: return true @@ -229,11 +262,20 @@ internal class Cursor { /// This method is blocking and should only be run via the executor. internal func next() throws -> Document? { return try self.lock.withLock { - guard self.isAlive else { + guard self._isAlive else { throw ClosedCursorError } + + if case let .cached(result) = self.cached.clear() { + // If there are no more results forthcoming after clearing the cache, or the cache had a non-nil + // result in it, return that. + if !self._isAlive || result != nil { + return result + } + } + // Keep trying until either the cursor is killed or a notification has been sent by close - while self.isAlive && !self.isClosing.load() { + while self._isAlive && !self.isClosing.load() { if let doc = try self.getNextDocument() { return doc } @@ -246,7 +288,10 @@ internal class Cursor { /// This method is blocking and should only be run via the executor. internal func tryNext() throws -> Document? { return try self.lock.withLock { - try self.getNextDocument() + if case let .cached(result) = self.cached.clear() { + return result + } + return try self.getNextDocument() } } @@ -255,7 +300,19 @@ internal class Cursor { /// This method is blocking and should only be run via the executor. internal func toArray() throws -> [Document] { return try self.lock.withLock { + guard self._isAlive else { + throw ClosedCursorError + } + var results: [Document] = [] + if case let .cached(result) = self.cached.clear(), let unwrappedResult = result { + results.append(unwrappedResult) + } + // the only value left was the cached one + guard self._isAlive else { + return results + } + while let result = try self.getNextDocument() { results.append(result) } @@ -269,6 +326,7 @@ internal class Cursor { internal func kill() { self.isClosing.store(true) self.lock.withLock { + self.cached = .none self.close() } self.isClosing.store(false) diff --git a/Sources/MongoSwift/MongoCursor.swift b/Sources/MongoSwift/MongoCursor.swift index 779878bee..57a2fff07 100644 --- a/Sources/MongoSwift/MongoCursor.swift +++ b/Sources/MongoSwift/MongoCursor.swift @@ -39,24 +39,6 @@ public class MongoCursor: CursorProtocol { /// Decoder from the client, database, or collection that created this cursor. internal let decoder: BSONDecoder - /// Used to store a cached next value to return, if one exists. - private enum CachedDocument { - /// Indicates that the associated value is the next value to return. This value may be nil. - case cached(T?) - /// Indicates that there is no value cached. - case none - - /// Get the contents of the cache and clear it. - fileprivate mutating func clear() -> CachedDocument { - let copy = self - self = .none - return copy - } - } - - /// Tracks the caching status of this cursor. - private var cached: CachedDocument - /** * Initializes a new `MongoCursor` instance. Not meant to be instantiated directly by a user. When `forceIO` is * true, this initializer will force a connection to the server if one is not already established. @@ -75,7 +57,6 @@ public class MongoCursor: CursorProtocol { ) throws { self.client = client self.decoder = decoder - self.cached = .none self.wrappedCursor = try Cursor( mongocCursor: MongocCursor(referencing: cursorPtr), @@ -83,17 +64,6 @@ public class MongoCursor: CursorProtocol { session: session, type: cursorType ?? .nonTailable ) - - let next = try self.decode(result: self.wrappedCursor.tryNext()) - self.cached = .cached(next) - } - - /// Close this cursor - /// - /// This method should only be called while the lock is held. - internal func blockingKill() { - self.cached = .none - self.wrappedCursor.kill() } /// Asserts that the cursor was closed. @@ -111,12 +81,7 @@ public class MongoCursor: CursorProtocol { /// Decodes the given document to the generic type. private func decode(doc: Document) throws -> T { - do { - return try self.decoder.decode(T.self, from: doc) - } catch { - self.blockingKill() - throw error - } + return try self.decoder.decode(T.self, from: doc) } /** @@ -130,9 +95,6 @@ public class MongoCursor: CursorProtocol { * This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`. */ public var isAlive: Bool { - if case .cached = self.cached { - return true - } return self.wrappedCursor.isAlive } @@ -147,41 +109,6 @@ public class MongoCursor: CursorProtocol { } } - /** - * Consolidate the currently available results of the cursor into an array of type `T`. - * - * If this cursor is not tailable, this method will exhaust it. - * - * If this cursor is tailable, `toArray` will only fetch the currently available results, and it - * may return more data if it is called again while the cursor is still alive. - * - * - Returns: - * An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error. - * - * If the future evaluates to an error, that error is likely one of the following: - * - `CommandError` if an error occurs while fetching more results from the server. - * - `LogicError` if this function is called after the cursor has died. - * - `LogicError` if this function is called and the session associated with this cursor is inactive. - * - `DecodingError` if an error occurs decoding the server's responses. - */ - public func toArray() -> EventLoopFuture<[T]> { - guard self.isAlive else { - return self.client.operationExecutor.makeFailedFuture(ClosedCursorError) - } - - return self.client.operationExecutor.execute { - var results: [T] = [] - if case let .cached(result) = self.cached.clear(), let unwrappedResult = result { - results.append(unwrappedResult) - } - // If the cursor still could have more results after clearing the cache, collect them too. - if self.isAlive { - results += try self.wrappedCursor.toArray().map { try self.decode(doc: $0) } - } - return results - } - } - /** * Attempt to get the next `T` from the cursor, returning `nil` if there are no results. * @@ -204,10 +131,7 @@ public class MongoCursor: CursorProtocol { */ public func tryNext() -> EventLoopFuture { return self.client.operationExecutor.execute { - if case let .cached(result) = self.cached.clear() { - return result - } - return try self.decode(result: self.wrappedCursor.tryNext()) + try self.decode(result: self.wrappedCursor.tryNext()) } } @@ -230,15 +154,30 @@ public class MongoCursor: CursorProtocol { */ public func next() -> EventLoopFuture { return self.client.operationExecutor.execute { - if case let .cached(result) = self.cached.clear() { - // If there are no more results forthcoming after clearing the cache, or the cache had a non-nil - // result in it, return that. - if !self.isAlive || result != nil { - return result - } - } - // Otherwise iterate until a result is received. - return try self.decode(result: self.wrappedCursor.next()) + try self.decode(result: self.wrappedCursor.next()) + } + } + + /** + * Consolidate the currently available results of the cursor into an array of type `T`. + * + * If this cursor is not tailable, this method will exhaust it. + * + * If this cursor is tailable, `toArray` will only fetch the currently available results, and it + * may return more data if it is called again while the cursor is still alive. + * + * - Returns: + * An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error. + * + * If the future evaluates to an error, that error is likely one of the following: + * - `CommandError` if an error occurs while fetching more results from the server. + * - `LogicError` if this function is called after the cursor has died. + * - `LogicError` if this function is called and the session associated with this cursor is inactive. + * - `DecodingError` if an error occurs decoding the server's responses. + */ + public func toArray() -> EventLoopFuture<[T]> { + return self.client.operationExecutor.execute { + try self.wrappedCursor.toArray().map { try self.decode(doc: $0) } } } @@ -253,7 +192,7 @@ public class MongoCursor: CursorProtocol { */ public func kill() -> EventLoopFuture { return self.client.operationExecutor.execute { - self.blockingKill() + self.wrappedCursor.kill() } } } From 8f0008af8787d9c603cd7e29b3ee4c68c091d4ef Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Tue, 18 Feb 2020 17:04:10 -0500 Subject: [PATCH 9/9] only cache for cursors --- Sources/MongoSwift/ChangeStream.swift | 2 ++ Sources/MongoSwift/CursorCommon.swift | 8 +++++++- Sources/MongoSwift/MongoCursor.swift | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Sources/MongoSwift/ChangeStream.swift b/Sources/MongoSwift/ChangeStream.swift index a4a4f6f4d..55204cf9c 100644 --- a/Sources/MongoSwift/ChangeStream.swift +++ b/Sources/MongoSwift/ChangeStream.swift @@ -6,6 +6,8 @@ import NIO private struct MongocChangeStream: MongocCursorWrapper { internal let pointer: OpaquePointer + internal static var isLazy: Bool { return false } + fileprivate init(stealing ptr: OpaquePointer) { self.pointer = ptr } diff --git a/Sources/MongoSwift/CursorCommon.swift b/Sources/MongoSwift/CursorCommon.swift index bb7de88e3..8752df0a8 100644 --- a/Sources/MongoSwift/CursorCommon.swift +++ b/Sources/MongoSwift/CursorCommon.swift @@ -72,6 +72,9 @@ internal protocol MongocCursorWrapper { /// The underlying libmongoc pointer. var pointer: OpaquePointer { get } + /// Indicates whether this type lazily sends its corresponding initial command to the server. + static var isLazy: Bool { get } + /// Method wrapping the appropriate libmongoc "error" function (e.g. `mongoc_cursor_error_document`). func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer) -> Bool @@ -233,7 +236,10 @@ internal class Cursor { throw error } - self.cached = try .cached(self.tryNext()) + // if this type lazily sends its initial command, retrieve and cache the first document so that we start I/O. + if CursorKind.isLazy { + self.cached = try .cached(self.tryNext()) + } } /// Whether the cursor is alive. diff --git a/Sources/MongoSwift/MongoCursor.swift b/Sources/MongoSwift/MongoCursor.swift index 57a2fff07..a6f548eea 100644 --- a/Sources/MongoSwift/MongoCursor.swift +++ b/Sources/MongoSwift/MongoCursor.swift @@ -7,6 +7,8 @@ import NIOConcurrencyHelpers internal struct MongocCursor: MongocCursorWrapper { internal let pointer: OpaquePointer + internal static var isLazy: Bool { return true } + internal init(referencing pointer: OpaquePointer) { self.pointer = pointer }