From 3373fdef9fbcc27a6b8363bd719a86ce85841bfb Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Wed, 5 Feb 2020 19:00:28 -0500 Subject: [PATCH 1/7] wip --- Examples/Docs/Package.resolved | 34 +++++++ Examples/Docs/Package.swift | 7 +- .../Docs/Sources/AsyncExamples/main.swift | 95 +++++++++++++++++++ .../{DocsExamples => SyncExamples}/main.swift | 14 +-- 4 files changed, 140 insertions(+), 10 deletions(-) create mode 100644 Examples/Docs/Package.resolved create mode 100644 Examples/Docs/Sources/AsyncExamples/main.swift rename Examples/Docs/Sources/{DocsExamples => SyncExamples}/main.swift (90%) diff --git a/Examples/Docs/Package.resolved b/Examples/Docs/Package.resolved new file mode 100644 index 000000000..c43ba0fd4 --- /dev/null +++ b/Examples/Docs/Package.resolved @@ -0,0 +1,34 @@ +{ + "object": { + "pins": [ + { + "package": "MongoSwift", + "repositoryURL": "https://github.com/mongodb/mongo-swift-driver", + "state": { + "branch": "SWIFT-673/async-changestream-2", + "revision": "f4ba57e15db4d6b786dd4a4aa840074814394a31", + "version": null + } + }, + { + "package": "Nimble", + "repositoryURL": "https://github.com/Quick/Nimble.git", + "state": { + "branch": null, + "revision": "f8657642dfdec9973efc79cc68bcef43a653a2bc", + "version": "8.0.2" + } + }, + { + "package": "swift-nio", + "repositoryURL": "https://github.com/apple/swift-nio", + "state": { + "branch": null, + "revision": "4409b57d4c0c40d41ac2b320fccf02e4d451e3db", + "version": "2.13.0" + } + } + ] + }, + "version": 1 +} diff --git a/Examples/Docs/Package.swift b/Examples/Docs/Package.swift index 16f53de38..5d181ade6 100644 --- a/Examples/Docs/Package.swift +++ b/Examples/Docs/Package.swift @@ -1,12 +1,13 @@ -// swift-tools-version:4.2 +// swift-tools-version:5.0 import PackageDescription let package = Package( name: "DocsExamples", dependencies: [ - .package(url: "https://github.com/mongodb/mongo-swift-driver", .upToNextMajor(from: "0.1.0")) + .package(url: "https://github.com/mongodb/mongo-swift-driver", .branch(from: "master")) ], targets: [ - .target(name: "DocsExamples", dependencies: ["MongoSwift"]) + .target(name: "SyncExamples", dependencies: ["MongoSwiftSync"]), + .target(name: "AsyncExamples", dependencies: ["MongoSwift"]) ] ) diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift new file mode 100644 index 000000000..929f3fef2 --- /dev/null +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -0,0 +1,95 @@ +import Foundation +import MongoSwift +import NIO + +// swiftlint:disable force_unwrapping + +/// Examples used for the MongoDB documentation on Causal Consistency. +/// - SeeAlso: https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#examples +private func causalConsistency() throws { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let client1 = try MongoClient(using: elg) + + // Start Causal Consistency Example 1 + let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) + let currentDate = Date() + var dbOptions = DatabaseOptions( + readConcern: ReadConcern(.majority), + writeConcern: try WriteConcern(w: .majority, wtimeoutMS: 1000) + ) + let items = client1.db("test", options: dbOptions).collection("items") + let result1 = items.updateOne( + filter: ["sku": "111", "end": .null], + update: ["$set": ["end": .datetime(currentDate)]], + session: s1 + ).flatMap { _ in + items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1) + } + // End Causal Consistency Example 1 + + let client2 = try MongoClient(using: elg) + + // Start Causal Consistency Example 2 + let result2 = client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in + // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. + s2.advanceClusterTime(to: s1.clusterTime!) + s2.advanceOperationTime(to: s1.operationTime!) + + dbOptions.readPreference = ReadPreference(.secondary) + let items2 = client2.db("test", options: dbOptions).collection("items") + for item in try items2.find(["end": .null], session: s2) { + print(item) + } + } + // End Causal Consistency Example 2 +} + +/// Examples used for the MongoDB documentation on Change Streams. +/// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/ +// private func changeStreams() throws { +// let client = try MongoClient() +// let db = client.db("example") + +// // The following examples assume that you have connected to a MongoDB replica set and have +// // accessed a database that contains an inventory collection. + +// do { +// // Start Changestream Example 1 +// let inventory = db.collection("inventory") +// let cursor = try inventory.watch() +// let next = cursor.next() +// // End Changestream Example 1 +// } + +// do { +// // Start Changestream Example 2 +// let inventory = db.collection("inventory") +// let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) +// let next = cursor.next() +// // End Changestream Example 2 +// } + +// do { +// // Start Changestream Example 3 +// let inventory = db.collection("inventory") +// let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) +// let next = cursor.next() + +// let resumeToken = cursor.resumeToken +// let resumedCursor = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) +// let nextAfterResume = resumedCursor.next() +// // End Changestream Example 3 +// } + +// do { +// // Start Changestream Example 4 +// let pipeline: [Document] = [ +// ["$match": ["fullDocument.username": "alice"]], +// ["$addFields": ["newField": "this is an added field!"]] +// ] +// let inventory = db.collection("inventory") +// let cursor = try inventory.watch(pipeline, withEventType: Document.self) +// let next = cursor.next() +// // End Changestream Example 4 +// } +// } diff --git a/Examples/Docs/Sources/DocsExamples/main.swift b/Examples/Docs/Sources/SyncExamples/main.swift similarity index 90% rename from Examples/Docs/Sources/DocsExamples/main.swift rename to Examples/Docs/Sources/SyncExamples/main.swift index 0b430f455..203856a3d 100644 --- a/Examples/Docs/Sources/DocsExamples/main.swift +++ b/Examples/Docs/Sources/SyncExamples/main.swift @@ -1,5 +1,5 @@ import Foundation -import MongoSwift +import MongoSwiftSync // swiftlint:disable force_unwrapping @@ -9,7 +9,7 @@ private func causalConsistency() throws { let client1 = try MongoClient() // Start Causal Consistency Example 1 - let s1 = try client1.startSession(options: ClientSessionOptions(causalConsistency: true)) + let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) let currentDate = Date() var dbOptions = DatabaseOptions( readConcern: ReadConcern(.majority), @@ -54,7 +54,7 @@ private func changeStreams() throws { // Start Changestream Example 1 let inventory = db.collection("inventory") let cursor = try inventory.watch() - let next = try cursor.nextOrError() + let next = cursor.next() // End Changestream Example 1 } @@ -62,7 +62,7 @@ private func changeStreams() throws { // Start Changestream Example 2 let inventory = db.collection("inventory") let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - let next = try cursor.nextOrError() + let next = cursor.next() // End Changestream Example 2 } @@ -70,11 +70,11 @@ private func changeStreams() throws { // Start Changestream Example 3 let inventory = db.collection("inventory") let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - let next = try cursor.nextOrError() + let next = cursor.next() let resumeToken = cursor.resumeToken let resumedCursor = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) - let nextAfterResume = try resumedCursor.nextOrError() + let nextAfterResume = resumedCursor.next() // End Changestream Example 3 } @@ -86,7 +86,7 @@ private func changeStreams() throws { ] let inventory = db.collection("inventory") let cursor = try inventory.watch(pipeline, withEventType: Document.self) - let next = try cursor.nextOrError() + let next = cursor.next() // End Changestream Example 4 } } From 26408cad8fb1d0e26c632a3ecb4134399ba98198 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 21 Feb 2020 18:30:26 -0500 Subject: [PATCH 2/7] update async examples --- Examples/Docs/Package.resolved | 34 ---- Examples/Docs/Package.swift | 3 +- .../Docs/Sources/AsyncExamples/main.swift | 149 ++++++++++++------ 3 files changed, 101 insertions(+), 85 deletions(-) delete mode 100644 Examples/Docs/Package.resolved diff --git a/Examples/Docs/Package.resolved b/Examples/Docs/Package.resolved deleted file mode 100644 index c43ba0fd4..000000000 --- a/Examples/Docs/Package.resolved +++ /dev/null @@ -1,34 +0,0 @@ -{ - "object": { - "pins": [ - { - "package": "MongoSwift", - "repositoryURL": "https://github.com/mongodb/mongo-swift-driver", - "state": { - "branch": "SWIFT-673/async-changestream-2", - "revision": "f4ba57e15db4d6b786dd4a4aa840074814394a31", - "version": null - } - }, - { - "package": "Nimble", - "repositoryURL": "https://github.com/Quick/Nimble.git", - "state": { - "branch": null, - "revision": "f8657642dfdec9973efc79cc68bcef43a653a2bc", - "version": "8.0.2" - } - }, - { - "package": "swift-nio", - "repositoryURL": "https://github.com/apple/swift-nio", - "state": { - "branch": null, - "revision": "4409b57d4c0c40d41ac2b320fccf02e4d451e3db", - "version": "2.13.0" - } - } - ] - }, - "version": 1 -} diff --git a/Examples/Docs/Package.swift b/Examples/Docs/Package.swift index 5d181ade6..4fcc83df3 100644 --- a/Examples/Docs/Package.swift +++ b/Examples/Docs/Package.swift @@ -4,7 +4,8 @@ import PackageDescription let package = Package( name: "DocsExamples", dependencies: [ - .package(url: "https://github.com/mongodb/mongo-swift-driver", .branch(from: "master")) + .package(url: "https://github.com/mongodb/mongo-swift-driver", .upToNextMajor(from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio", .upToNextMajor(from: "2.0.0")) ], targets: [ .target(name: "SyncExamples", dependencies: ["MongoSwiftSync"]), diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift index 929f3fef2..cf43f3f32 100644 --- a/Examples/Docs/Sources/AsyncExamples/main.swift +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -9,6 +9,10 @@ import NIO private func causalConsistency() throws { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) let client1 = try MongoClient(using: elg) + defer { + client1.syncShutdown() + try? elg.syncShutdownGracefully() + } // Start Causal Consistency Example 1 let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true)) @@ -30,15 +34,19 @@ private func causalConsistency() throws { let client2 = try MongoClient(using: elg) // Start Causal Consistency Example 2 - let result2 = client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in + let result2: EventLoopFuture = + client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. s2.advanceClusterTime(to: s1.clusterTime!) s2.advanceOperationTime(to: s1.operationTime!) dbOptions.readPreference = ReadPreference(.secondary) let items2 = client2.db("test", options: dbOptions).collection("items") - for item in try items2.find(["end": .null], session: s2) { - print(item) + + return items2.find(["end": .null], session: s2).flatMap { cursor in + cursor.forEach { item in + print(item) + } } } // End Causal Consistency Example 2 @@ -46,50 +54,91 @@ private func causalConsistency() throws { /// Examples used for the MongoDB documentation on Change Streams. /// - SeeAlso: https://docs.mongodb.com/manual/changeStreams/ -// private func changeStreams() throws { -// let client = try MongoClient() -// let db = client.db("example") - -// // The following examples assume that you have connected to a MongoDB replica set and have -// // accessed a database that contains an inventory collection. - -// do { -// // Start Changestream Example 1 -// let inventory = db.collection("inventory") -// let cursor = try inventory.watch() -// let next = cursor.next() -// // End Changestream Example 1 -// } - -// do { -// // Start Changestream Example 2 -// let inventory = db.collection("inventory") -// let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) -// let next = cursor.next() -// // End Changestream Example 2 -// } - -// do { -// // Start Changestream Example 3 -// let inventory = db.collection("inventory") -// let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) -// let next = cursor.next() - -// let resumeToken = cursor.resumeToken -// let resumedCursor = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) -// let nextAfterResume = resumedCursor.next() -// // End Changestream Example 3 -// } - -// do { -// // Start Changestream Example 4 -// let pipeline: [Document] = [ -// ["$match": ["fullDocument.username": "alice"]], -// ["$addFields": ["newField": "this is an added field!"]] -// ] -// let inventory = db.collection("inventory") -// let cursor = try inventory.watch(pipeline, withEventType: Document.self) -// let next = cursor.next() -// // End Changestream Example 4 -// } -// } +private func changeStreams() throws { + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let client = try MongoClient(using: elg) + let db = client.db("example") + + // The following examples assume that you have connected to a MongoDB replica set and have + // accessed a database that contains an inventory collection. + + do { + // Start Changestream Example 1 + let inventory = db.collection("inventory") + let futureCursor = inventory.watch() + + // Option 1: retrieve next document via next() + let next = futureCursor.flatMap { cursor in + cursor.next() + } + + // Option 2: register a callback to execute for each document + let result = futureCursor.flatMap { cursor in + cursor.forEach { event in + // process event + } + } + // End Changestream Example 1 + } + + do { + // Start Changestream Example 2 + let inventory = db.collection("inventory") + let futureCursor = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + + // Option 1: use next() to iterate + let next = futureCursor.flatMap { cursor in + cursor.next() + } + + // Option 2: register a callback to execute for each document + let result = futureCursor.flatMap { cursor in + cursor.forEach { event in + // process event + } + } + // End Changestream Example 2 + } + + do { + // Start Changestream Example 3 + let inventory = db.collection("inventory") + let futureCursor = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + let next = futureCursor.flatMap { $0.next() } + + let resumeToken = futureCursor.and(next).map { cursor, _ in + cursor.resumeToken + } + + let resumedCursor = resumeToken.flatMap { token in + inventory.watch(options: ChangeStreamOptions(resumeAfter: token)) + } + let nextAfterResume = resumedCursor.flatMap { cursor in + cursor.next() + } + // End Changestream Example 3 + } + + do { + // Start Changestream Example 4 + let pipeline: [Document] = [ + ["$match": ["fullDocument.username": "alice"]], + ["$addFields": ["newField": "this is an added field!"]] + ] + let inventory = db.collection("inventory") + let futureCursor = inventory.watch(pipeline, withEventType: Document.self) + + // Option 1: use next() to iterate + let next = futureCursor.flatMap { cursor in + cursor.next() + } + + // Option 2: register a callback to execute for each document + let result = futureCursor.flatMap { cursor in + cursor.forEach { event in + // process event + } + } + // End Changestream Example 4 + } +} From ac9d0531d615fcb78bb915200f0a040d49f8947c Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Fri, 21 Feb 2020 18:34:06 -0500 Subject: [PATCH 3/7] format/lint --- Examples/Docs/Package.swift | 2 +- Examples/Docs/Sources/AsyncExamples/main.swift | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Examples/Docs/Package.swift b/Examples/Docs/Package.swift index 4fcc83df3..6c8d90391 100644 --- a/Examples/Docs/Package.swift +++ b/Examples/Docs/Package.swift @@ -4,7 +4,7 @@ import PackageDescription let package = Package( name: "DocsExamples", dependencies: [ - .package(url: "https://github.com/mongodb/mongo-swift-driver", .upToNextMajor(from: "1.0.0"), + .package(url: "https://github.com/mongodb/mongo-swift-driver", .upToNextMajor(from: "1.0.0")), .package(url: "https://github.com/apple/swift-nio", .upToNextMajor(from: "2.0.0")) ], targets: [ diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift index cf43f3f32..e01f58d6d 100644 --- a/Examples/Docs/Sources/AsyncExamples/main.swift +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -34,8 +34,8 @@ private func causalConsistency() throws { let client2 = try MongoClient(using: elg) // Start Causal Consistency Example 2 - let result2: EventLoopFuture = - client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in + let options = ClientSessionOptions(causalConsistency: true) + let result2: EventLoopFuture = client2.withSession(options: options) { s2 in // The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above. s2.advanceClusterTime(to: s1.clusterTime!) s2.advanceOperationTime(to: s1.operationTime!) @@ -76,6 +76,7 @@ private func changeStreams() throws { let result = futureCursor.flatMap { cursor in cursor.forEach { event in // process event + print(event) } } // End Changestream Example 1 @@ -95,6 +96,7 @@ private func changeStreams() throws { let result = futureCursor.flatMap { cursor in cursor.forEach { event in // process event + print(event) } } // End Changestream Example 2 @@ -137,6 +139,7 @@ private func changeStreams() throws { let result = futureCursor.flatMap { cursor in cursor.forEach { event in // process event + print(event) } } // End Changestream Example 4 From 052145a17b111301a4fc485533318593ca516cd9 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Mon, 24 Feb 2020 16:55:34 -0500 Subject: [PATCH 4/7] address comments --- .../Docs/Sources/AsyncExamples/main.swift | 51 ++++++++++--------- Examples/Docs/Sources/SyncExamples/main.swift | 22 ++++---- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift index e01f58d6d..d0d20c428 100644 --- a/Examples/Docs/Sources/AsyncExamples/main.swift +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -65,15 +65,15 @@ private func changeStreams() throws { do { // Start Changestream Example 1 let inventory = db.collection("inventory") - let futureCursor = inventory.watch() + let futureChangeStream = inventory.watch() // Option 1: retrieve next document via next() - let next = futureCursor.flatMap { cursor in + let next = futureChangeStream.flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document - let result = futureCursor.flatMap { cursor in + let result = futureChangeStream.flatMap { cursor in cursor.forEach { event in // process event print(event) @@ -85,16 +85,16 @@ private func changeStreams() throws { do { // Start Changestream Example 2 let inventory = db.collection("inventory") - let futureCursor = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + let futureChangeStream = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) // Option 1: use next() to iterate - let next = futureCursor.flatMap { cursor in - cursor.next() + let next = futureChangeStream.flatMap { changeStream in + changeStream.next() } // Option 2: register a callback to execute for each document - let result = futureCursor.flatMap { cursor in - cursor.forEach { event in + let result = futureChangeStream.flatMap { changeStream in + changeStream.forEach { event in // process event print(event) } @@ -105,18 +105,21 @@ private func changeStreams() throws { do { // Start Changestream Example 3 let inventory = db.collection("inventory") - let futureCursor = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - let next = futureCursor.flatMap { $0.next() } - let resumeToken = futureCursor.and(next).map { cursor, _ in - cursor.resumeToken - } - - let resumedCursor = resumeToken.flatMap { token in - inventory.watch(options: ChangeStreamOptions(resumeAfter: token)) - } - let nextAfterResume = resumedCursor.flatMap { cursor in - cursor.next() + inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + .flatMap { changeStream in + changeStream.next().map { _ in + changeStream.resumeToken + }.always { _ in + _ = changeStream.kill() + } + }.flatMap { resumeToken in + inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in + newStream.forEach { event in + // process event + print(event) + } + } } // End Changestream Example 3 } @@ -128,16 +131,16 @@ private func changeStreams() throws { ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") - let futureCursor = inventory.watch(pipeline, withEventType: Document.self) + let futureChangeStream = inventory.watch(pipeline, withEventType: Document.self) // Option 1: use next() to iterate - let next = futureCursor.flatMap { cursor in - cursor.next() + let next = futureChangeStream.flatMap { changeStream in + changeStream.next() } // Option 2: register a callback to execute for each document - let result = futureCursor.flatMap { cursor in - cursor.forEach { event in + let result = futureChangeStream.flatMap { changeStream in + changeStream.forEach { event in // process event print(event) } diff --git a/Examples/Docs/Sources/SyncExamples/main.swift b/Examples/Docs/Sources/SyncExamples/main.swift index 203856a3d..09bf4c467 100644 --- a/Examples/Docs/Sources/SyncExamples/main.swift +++ b/Examples/Docs/Sources/SyncExamples/main.swift @@ -53,28 +53,28 @@ private func changeStreams() throws { do { // Start Changestream Example 1 let inventory = db.collection("inventory") - let cursor = try inventory.watch() - let next = cursor.next() + let changeStream = try inventory.watch() + let next = changeStream.next() // End Changestream Example 1 } do { // Start Changestream Example 2 let inventory = db.collection("inventory") - let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - let next = cursor.next() + let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + let next = changeStream.next() // End Changestream Example 2 } do { // Start Changestream Example 3 let inventory = db.collection("inventory") - let cursor = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - let next = cursor.next() + let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + let next = changeStream.next() - let resumeToken = cursor.resumeToken - let resumedCursor = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) - let nextAfterResume = resumedCursor.next() + let resumeToken = changeStream.resumeToken + let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) + let nextAfterResume = resumedChangeStream.next() // End Changestream Example 3 } @@ -85,8 +85,8 @@ private func changeStreams() throws { ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") - let cursor = try inventory.watch(pipeline, withEventType: Document.self) - let next = cursor.next() + let changeStream = try inventory.watch(pipeline, withEventType: Document.self) + let next = changeStream.next() // End Changestream Example 4 } } From 4034bbb543aee3e53a75a0f7f847cc24120790d0 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Mon, 24 Feb 2020 17:00:16 -0500 Subject: [PATCH 5/7] add SwiftNIO dependency explicitly --- Examples/Docs/Package.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Examples/Docs/Package.swift b/Examples/Docs/Package.swift index 6c8d90391..888654b95 100644 --- a/Examples/Docs/Package.swift +++ b/Examples/Docs/Package.swift @@ -9,6 +9,6 @@ let package = Package( ], targets: [ .target(name: "SyncExamples", dependencies: ["MongoSwiftSync"]), - .target(name: "AsyncExamples", dependencies: ["MongoSwift"]) + .target(name: "AsyncExamples", dependencies: ["MongoSwift", "SwiftNIO"]) ] ) From cb4ec3d5718cbbe26d2d2b125765192a78fe8c31 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Mon, 24 Feb 2020 18:03:27 -0500 Subject: [PATCH 6/7] format --- .../Docs/Sources/AsyncExamples/main.swift | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift index d0d20c428..e8695f788 100644 --- a/Examples/Docs/Sources/AsyncExamples/main.swift +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -107,20 +107,20 @@ private func changeStreams() throws { let inventory = db.collection("inventory") inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) - .flatMap { changeStream in - changeStream.next().map { _ in - changeStream.resumeToken - }.always { _ in - _ = changeStream.kill() - } - }.flatMap { resumeToken in - inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in - newStream.forEach { event in - // process event - print(event) + .flatMap { changeStream in + changeStream.next().map { _ in + changeStream.resumeToken + }.always { _ in + _ = changeStream.kill() + } + }.flatMap { resumeToken in + inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in + newStream.forEach { event in + // process event + print(event) + } } } - } // End Changestream Example 3 } From 6fdbfea2985ec572bc23dbf9c16a9ce8d3de2249 Mon Sep 17 00:00:00 2001 From: Kaitlin Mahar Date: Wed, 26 Feb 2020 12:52:43 -0500 Subject: [PATCH 7/7] fix NIO dependency, get rid of futureChangeStream variables --- Examples/Docs/Package.swift | 2 +- .../Docs/Sources/AsyncExamples/main.swift | 29 +++++++++---------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/Examples/Docs/Package.swift b/Examples/Docs/Package.swift index 888654b95..d628ac726 100644 --- a/Examples/Docs/Package.swift +++ b/Examples/Docs/Package.swift @@ -9,6 +9,6 @@ let package = Package( ], targets: [ .target(name: "SyncExamples", dependencies: ["MongoSwiftSync"]), - .target(name: "AsyncExamples", dependencies: ["MongoSwift", "SwiftNIO"]) + .target(name: "AsyncExamples", dependencies: ["MongoSwift", "NIO"]) ] ) diff --git a/Examples/Docs/Sources/AsyncExamples/main.swift b/Examples/Docs/Sources/AsyncExamples/main.swift index e8695f788..edf07eb7a 100644 --- a/Examples/Docs/Sources/AsyncExamples/main.swift +++ b/Examples/Docs/Sources/AsyncExamples/main.swift @@ -65,15 +65,14 @@ private func changeStreams() throws { do { // Start Changestream Example 1 let inventory = db.collection("inventory") - let futureChangeStream = inventory.watch() // Option 1: retrieve next document via next() - let next = futureChangeStream.flatMap { cursor in + let next = inventory.watch().flatMap { cursor in cursor.next() } // Option 2: register a callback to execute for each document - let result = futureChangeStream.flatMap { cursor in + let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) @@ -85,20 +84,21 @@ private func changeStreams() throws { do { // Start Changestream Example 2 let inventory = db.collection("inventory") - let futureChangeStream = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) // Option 1: use next() to iterate - let next = futureChangeStream.flatMap { changeStream in - changeStream.next() - } + let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + .flatMap { changeStream in + changeStream.next() + } // Option 2: register a callback to execute for each document - let result = futureChangeStream.flatMap { changeStream in - changeStream.forEach { event in - // process event - print(event) + let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) + .flatMap { changeStream in + changeStream.forEach { event in + // process event + print(event) + } } - } // End Changestream Example 2 } @@ -131,15 +131,14 @@ private func changeStreams() throws { ["$addFields": ["newField": "this is an added field!"]] ] let inventory = db.collection("inventory") - let futureChangeStream = inventory.watch(pipeline, withEventType: Document.self) // Option 1: use next() to iterate - let next = futureChangeStream.flatMap { changeStream in + let next = inventory.watch(pipeline, withEventType: Document.self).flatMap { changeStream in changeStream.next() } // Option 2: register a callback to execute for each document - let result = futureChangeStream.flatMap { changeStream in + let result = inventory.watch(pipeline, withEventType: Document.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event)