Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion Sources/MongoSwift/ChangeStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import NIO
private struct MongocChangeStream: MongocCursorWrapper {
internal let pointer: OpaquePointer

internal static var isLazy: Bool { return false }

fileprivate init(stealing ptr: OpaquePointer) {
self.pointer = ptr
}
Expand Down Expand Up @@ -63,11 +65,16 @@ public class ChangeStream<T: Codable>: CursorProtocol {
/// The cursor this change stream is wrapping.
private let wrappedCursor: Cursor<MongocChangeStream>

/// Process an event before returning it to the user.
/// Process an event before returning it to the user, or does nothing and returns nil if the provided event is nil.
private func processEvent(_ event: Document?) throws -> T? {
guard let event = event else {
return nil
}
return try self.processEvent(event)
}

/// Process an event before returning it to the user.
private func processEvent(_ event: Document) throws -> T {
// Update the resumeToken with the `_id` field from the document.
guard let resumeToken = event["_id"]?.documentValue else {
throw InternalError(message: "_id field is missing from the change stream document.")
Expand Down Expand Up @@ -167,6 +174,27 @@ public class ChangeStream<T: Codable>: CursorProtocol {
}
}

/**
* Consolidate the currently available results of the change stream into an array of type `T`.
*
* Since `toArray` will only fetch the currently available results, it may return more data if it is called again
* while the change stream is still alive.
*
* - Returns:
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this change stream, or an error.
*
* If the future evaluates to an error, that error is likely one of the following:
* - `CommandError` if an error occurs while fetching more results from the server.
* - `LogicError` if this function is called after the change stream has died.
* - `LogicError` if this function is called and the session associated with this change stream is inactive.
* - `DecodingError` if an error occurs decoding the server's responses.
*/
public func toArray() -> EventLoopFuture<[T]> {
return self.client.operationExecutor.execute {
try self.wrappedCursor.toArray().map(self.processEvent)
}
}

/**
* Kill this change stream.
*
Expand Down
79 changes: 74 additions & 5 deletions Sources/MongoSwift/CursorCommon.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ internal protocol CursorProtocol {
*/
func tryNext() -> EventLoopFuture<T?>

/// Retrieves all the documents currently available in this cursor. If the cursor is not tailable, exhausts it. If
/// the cursor is tailable or is a change stream, this method may return more data if it is called again while the
/// cursor is still alive.
func toArray() -> EventLoopFuture<[T]>

/**
* Kills this cursor.
*
Expand Down Expand Up @@ -67,6 +72,9 @@ internal protocol MongocCursorWrapper {
/// The underlying libmongoc pointer.
var pointer: OpaquePointer { get }

/// Indicates whether this type lazily sends its corresponding initial command to the server.
static var isLazy: Bool { get }

/// Method wrapping the appropriate libmongoc "error" function (e.g. `mongoc_cursor_error_document`).
func errorDocument(bsonError: inout bson_error_t, replyPtr: UnsafeMutablePointer<BSONPointer?>) -> Bool

Expand Down Expand Up @@ -96,10 +104,28 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
/// The state of this cursor.
private var state: State

/// Used to store a cached next value to return, if one exists.
private enum CachedDocument {
/// Indicates that the associated value is the next value to return. This value may be nil.
case cached(Document?)
/// Indicates that there is no value cached.
case none

/// Get the contents of the cache and clear it.
fileprivate mutating func clear() -> CachedDocument {
let copy = self
self = .none
return copy
}
}

/// Tracks the caching status of this cursor.
private var cached: CachedDocument

/// The type of this cursor. Useful for indicating whether or not it is tailable.
private let type: CursorType

/// Lock used to synchronize usage of the internal state.
/// Lock used to synchronize usage of the internal state: specifically the `state` and `cached` properties.
/// This lock should only be acquired in the bodies of non-private methods.
private let lock: Lock

Expand Down Expand Up @@ -202,16 +228,34 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
self.type = type
self.lock = Lock()
self.isClosing = NIOAtomic.makeAtomic(value: false)
self.cached = .none

// If there was an error constructing the cursor, throw it.
if let error = self.getMongocError() {
self.close()
throw error
}

// if this type lazily sends its initial command, retrieve and cache the first document so that we start I/O.
if CursorKind.isLazy {
self.cached = try .cached(self.tryNext())
}
}

/// Whether the cursor is alive.
internal var isAlive: Bool {
return self.lock.withLock {
self._isAlive
}
}

/// Checks whether the cursor is alive. Meant for private use only.
/// This property should only be read while the lock is held.
private var _isAlive: Bool {
if case .cached = self.cached {
return true
}

switch self.state {
case .open:
return true
Expand All @@ -224,11 +268,20 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
/// This method is blocking and should only be run via the executor.
internal func next() throws -> Document? {
return try self.lock.withLock {
guard self.isAlive else {
guard self._isAlive else {
throw ClosedCursorError
}

if case let .cached(result) = self.cached.clear() {
// If there are no more results forthcoming after clearing the cache, or the cache had a non-nil
// result in it, return that.
if !self._isAlive || result != nil {
return result
}
}

// Keep trying until either the cursor is killed or a notification has been sent by close
while self.isAlive && !self.isClosing.load() {
while self._isAlive && !self.isClosing.load() {
if let doc = try self.getNextDocument() {
return doc
}
Expand All @@ -241,16 +294,31 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
/// This method is blocking and should only be run via the executor.
internal func tryNext() throws -> Document? {
return try self.lock.withLock {
try self.getNextDocument()
if case let .cached(result) = self.cached.clear() {
return result
}
return try self.getNextDocument()
}
}

/// Retreive all the currently available documents in the result set.
/// This will not exhaust the cursor.
/// This method is blocking and should only be run via the executor.
internal func all() throws -> [Document] {
internal func toArray() throws -> [Document] {
return try self.lock.withLock {
guard self._isAlive else {
throw ClosedCursorError
}

var results: [Document] = []
if case let .cached(result) = self.cached.clear(), let unwrappedResult = result {
results.append(unwrappedResult)
}
// the only value left was the cached one
guard self._isAlive else {
return results
}

while let result = try self.getNextDocument() {
results.append(result)
}
Expand All @@ -264,6 +332,7 @@ internal class Cursor<CursorKind: MongocCursorWrapper> {
internal func kill() {
self.isClosing.store(true)
self.lock.withLock {
self.cached = .none
self.close()
}
self.isClosing.store(false)
Expand Down
2 changes: 1 addition & 1 deletion Sources/MongoSwift/MongoCollection+Indexes.swift
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ extension MongoCollection {
*/
public func listIndexNames(session _: ClientSession? = nil) -> EventLoopFuture<[String]> {
return self.listIndexes().flatMap { cursor in
cursor.all()
cursor.toArray()
}.flatMapThrowing { models in
try models.map { model in
guard let name = model.options?.name else {
Expand Down
113 changes: 29 additions & 84 deletions Sources/MongoSwift/MongoCursor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import NIOConcurrencyHelpers
internal struct MongocCursor: MongocCursorWrapper {
internal let pointer: OpaquePointer

internal static var isLazy: Bool { return true }

internal init(referencing pointer: OpaquePointer) {
self.pointer = pointer
}
Expand Down Expand Up @@ -39,24 +41,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
/// Decoder from the client, database, or collection that created this cursor.
internal let decoder: BSONDecoder

/// Used to store a cached next value to return, if one exists.
private enum CachedDocument {
/// Indicates that the associated value is the next value to return. This value may be nil.
case cached(T?)
/// Indicates that there is no value cached.
case none

/// Get the contents of the cache and clear it.
fileprivate mutating func clear() -> CachedDocument {
let copy = self
self = .none
return copy
}
}

/// Tracks the caching status of this cursor.
private var cached: CachedDocument

/**
* Initializes a new `MongoCursor` instance. Not meant to be instantiated directly by a user. When `forceIO` is
* true, this initializer will force a connection to the server if one is not already established.
Expand All @@ -75,25 +59,13 @@ public class MongoCursor<T: Codable>: CursorProtocol {
) throws {
self.client = client
self.decoder = decoder
self.cached = .none

self.wrappedCursor = try Cursor(
mongocCursor: MongocCursor(referencing: cursorPtr),
connection: connection,
session: session,
type: cursorType ?? .nonTailable
)

let next = try self.decode(result: self.wrappedCursor.tryNext())
self.cached = .cached(next)
}

/// Close this cursor
///
/// This method should only be called while the lock is held.
internal func blockingKill() {
self.cached = .none
self.wrappedCursor.kill()
}

/// Asserts that the cursor was closed.
Expand All @@ -111,12 +83,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {

/// Decodes the given document to the generic type.
private func decode(doc: Document) throws -> T {
do {
return try self.decoder.decode(T.self, from: doc)
} catch {
self.blockingKill()
throw error
}
return try self.decoder.decode(T.self, from: doc)
}

/**
Expand All @@ -130,9 +97,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
* This cursor will be dead as soon as `next` returns `nil` or an error, regardless of the `CursorType`.
*/
public var isAlive: Bool {
if case .cached = self.cached {
return true
}
return self.wrappedCursor.isAlive
}

Expand All @@ -147,37 +111,6 @@ public class MongoCursor<T: Codable>: CursorProtocol {
}
}

/**
* Consolidate the currently available results of the cursor into an array of type `T`.
*
* If this cursor is not tailable, this method will exhaust it.
*
* If this cursor is tailable, `all` will only fetch the currently available results, and it
* may return more data if it is called again while the cursor is still alive.
*
* - Returns:
* An `EventLoopFuture<[T]>` evaluating to the results currently available to this cursor or an error.
*
* If the future evaluates to an error, that error is likely one of the following:
* - `CommandError` if an error occurs while fetching more results from the server.
* - `LogicError` if this function is called after the cursor has died.
* - `LogicError` if this function is called and the session associated with this cursor is inactive.
* - `DecodingError` if an error occurs decoding the server's responses.
*/
internal func all() -> EventLoopFuture<[T]> {
return self.client.operationExecutor.execute {
var results: [T] = []
if case let .cached(result) = self.cached.clear(), let unwrappedResult = result {
results.append(unwrappedResult)
}
// If the cursor still could have more results after clearing the cache, collect them too.
if self.isAlive {
results += try self.wrappedCursor.all().map { try self.decode(doc: $0) }
}
return results
}
}

/**
* Attempt to get the next `T` from the cursor, returning `nil` if there are no results.
*
Expand All @@ -200,10 +133,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {
*/
public func tryNext() -> EventLoopFuture<T?> {
return self.client.operationExecutor.execute {
if case let .cached(result) = self.cached.clear() {
return result
}
return try self.decode(result: self.wrappedCursor.tryNext())
try self.decode(result: self.wrappedCursor.tryNext())
}
}

Expand All @@ -226,15 +156,30 @@ public class MongoCursor<T: Codable>: CursorProtocol {
*/
public func next() -> EventLoopFuture<T?> {
return self.client.operationExecutor.execute {
if case let .cached(result) = self.cached.clear() {
// If there are no more results forthcoming after clearing the cache, or the cache had a non-nil
// result in it, return that.
if !self.isAlive || result != nil {
return result
}
}
// Otherwise iterate until a result is received.
return try self.decode(result: self.wrappedCursor.next())
try self.decode(result: self.wrappedCursor.next())
}
}

/**
* Consolidate the currently available results of the cursor into an array of type `T`.
*
* If this cursor is not tailable, this method will exhaust it.
*
* If this cursor is tailable, `toArray` will only fetch the currently available results, and it
* may return more data if it is called again while the cursor is still alive.
*
* - Returns:
* An `EventLoopFuture<[T]>` evaluating to the results currently available in this cursor, or an error.
*
* If the future evaluates to an error, that error is likely one of the following:
* - `CommandError` if an error occurs while fetching more results from the server.
* - `LogicError` if this function is called after the cursor has died.
* - `LogicError` if this function is called and the session associated with this cursor is inactive.
* - `DecodingError` if an error occurs decoding the server's responses.
*/
public func toArray() -> EventLoopFuture<[T]> {
return self.client.operationExecutor.execute {
try self.wrappedCursor.toArray().map { try self.decode(doc: $0) }
}
}

Expand All @@ -249,7 +194,7 @@ public class MongoCursor<T: Codable>: CursorProtocol {
*/
public func kill() -> EventLoopFuture<Void> {
return self.client.operationExecutor.execute {
self.blockingKill()
self.wrappedCursor.kill()
}
}
}
Loading