Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do you close a change stream? #741

Closed
Zertz opened this issue Sep 10, 2022 · 6 comments
Closed

How do you close a change stream? #741

Zertz opened this issue Sep 10, 2022 · 6 comments
Assignees

Comments

@Zertz
Copy link

Zertz commented Sep 10, 2022

With Node.js, I can do this to start and stop a change stream:

const changeStream = client.db("database").collection("collection").watch();

changeStream.close();

With Rust, I can get a change stream cursor with this:

let mut change_stream = client.database("database").collection::<Document>("collection").watch(None, None).await?;

However, I'm very new to Rust and I can't figure out how to close the cursor. Any help would be greatly appreciated!

@patrickfreed
Copy link
Contributor

patrickfreed commented Sep 15, 2022

In Rust's ownership model, things are said to be "dropped" when they go out of scope. Typically, this just means the memory associated with the things being dropped gets deallocated. However, you can implement the special Drop trait if you want to perform some custom cleanup before the type's data gets deallocated. That's exactly what we've done here: when a ChangeStream instance is dropped, it automatically gets closed.

In this example, the change stream gets closed when the result is returned from this function:

async fn one_event(coll: Collection<Document>) -> Result<ChangeStreamEvent<Document>> {
    let mut cs = coll.watch(None, None).await?;
    let event = cs.try_next().await?;
    Ok(event.unwrap())
}

You can verify this by attaching a CommandEventHandler to your Client and looking for a killCursors command, which handles closing change streams server side.

If you want to close a change stream before the end of some scope, you can also invoke the drop function:

let cs = coll.watch(None, None).await?;
// do some stuff with cs
drop(cs);
// do some other stuff without cs

This actually points to a gap in our documentation: this isn't mentioned anywhere in the manual or API docs. I opened #745 to address this. Thanks for filing this issue, and please let us know if you have any other questions!

@Zertz
Copy link
Author

Zertz commented Sep 16, 2022

Thanks for the explanation @patrickfreed !

If I may borrow some more of your time, here's where I'm getting stuck. I'm using Tauri, which is basically the same as Electron but replaces Node.js with Rust.

I have that function that sets up a change stream and emits changes, which seems to be fine but the web app needs to be able to say "hey, I'm not listening to you anymore so should stop that change stream"

pub async fn watch_documents(window: Window) -> Result<bool, Box<dyn Error>> {
  let client = client::get().await?;

  let mut change_stream = client
    .database("database")
    .collection::<Document>("collection")
    .watch(None, None).await?;

  while let Some(event) = change_stream.next().await.transpose()? {
    // Send changes to the web app
    window.emit("change", Payload { full_document: event.full_document }).unwrap();

    // Listen to the web app
    window.listen("unwatch", |event| {
      // 🤔 I would like to end the change stream here
    });
  }

  Ok(true)
}

@patrickfreed
Copy link
Contributor

For that, I'd recommend moving the change stream into its own asynchronous task and sending messages to it for when it needs to close. When the task exits, the change stream will go out of scope and then be dropped. The tokio crate provides number of really useful channels that you can use for this. Since you'll only need to send a single message indicating the stream should close, you can use a oneshot channel:

let coll = client.database("ok").collection::<Document>("ok");                                         
tokio::task::spawn(async move {                                                                        
    let mut cs = coll.watch(None, None).await.unwrap();                                                
    let (tx, rx) = oneshot::channel();                                                                 
                                                                                                       
    // this is needed so we can poll the channel repeatedly in the loop.                               
    tokio::pin!(rx);                                                                                   
    loop {                                                                                             
        // this will execute whichever branch happens first.                                           
        // i.e. whether a change stream event is received or a cancellation message is received.       
        tokio::select! {                                                                               
            e = cs.try_next() => {                                                                     
                match e {                                                                              
                    Ok(Some(event)) => {                                                               
                        window.emit("change", Payload { full_document: event.full_document }).unwrap();
                    }                                                                                  
                    Ok(None) => break,                                                                 
                    Err(e) => todo!("handle this error: {:?}", e)                                      
                }                                                                                      
            },                                                                                         
            _ = &mut rx => break,                                                                      
        };                                                                                             
    }                                                                                                  
                                                                                                       
    // Use once so that we can move the sender into the closure.
    // Conceptually, we only want to close the change stream once anyways.                                       
    window.once("unwatch", |_| {                                                                       
        let _ = tx.send(());                                                                           
    });                                                                                                
});                                                                                                    

Another thing to note is that creating a Client can be a pretty expensive operation, so it's best to share them throughout your application. In this example, I do that by passing in a Collection handle to the task.

@Zertz
Copy link
Author

Zertz commented Sep 16, 2022

That looks fantastic, I would never have come up with that on my own!

I'll give a try in the next couple of days, thank you so much!

@patrickfreed
Copy link
Contributor

No problem! Happy to help. And yeah, this is a nifty pattern that you'll likely reach to frequently when writing async code (the "actor" pattern). I highly recommend this blog post about building them in tokio if you want to learn more.

@Zertz
Copy link
Author

Zertz commented Sep 18, 2022

Your example worked great!, I just had to move the window.once bit above the loop for the unwatch event to be registered. It's a pretty neat pattern, sort of like event listeners with Node.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants