Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider adding an easier way of synchronously shutting down a thread-pool. #688

Open
emilio opened this issue Sep 3, 2019 · 9 comments

Comments

@emilio
Copy link
Contributor

emilio commented Sep 3, 2019

AFAICT ThreadPool::drop is async, so there's no easy way to ensure that all threads have shot down.

Ideally, I'd like the guarantee of all threads having shot down, including running TLS destructors, by the time ThreadPool::drop (or whatever API this would be) returns. This is so I can do deterministic leak-checking.

I think you can do something using exit_handler plus std::sync::Barrier or something like that, but without the TLS-destructors-ran guarantee. That'd only guarantee that all pre-existing jobs have finished.

cc @cuviper

@cuviper
Copy link
Member

cuviper commented Sep 6, 2019

An existing option that should even solve your TLS is build_scoped, like:

ThreadPoolBuilder::new().build_scoped(
    |thread| thread.run(), // do other setup before running if you like
    |pool| pool.install(work),
)?;
// threads will be totally gone here

I think there's also room for something like ThreadPool::wait(self), but that may be complicated by the fact that we didn't require join handles from the custom spawn_handler. :/

@emilio
Copy link
Contributor Author

emilio commented Sep 7, 2019

Right, in this case I want a global thread pool that on shutdown I can wait on and drop, though, so I think build_scoped doesn't work.

@cuviper
Copy link
Member

cuviper commented Sep 9, 2019

in this case I want a global thread pool that on shutdown I can wait on and drop

Ah, that's a harder one, and now I remember where I was playing with this before -- #483.

@gsquire
Copy link
Contributor

gsquire commented Mar 9, 2022

I was working through a similar issue of needing to await all the threads in a ThreadPool recently and Google pointed me to my own comment. 😄

Would using a WaitGroup help with your needs as well? I added a call to wait inside of my type's Drop implementation to await all tasks that were ran via spawn. I can submit a patch if this is something that would be beneficial for all users.

@MarijnS95
Copy link

Is this something that is still looked into?

My use-case is a rayon::ThreadPool to with a bunch of asynchronous loop {} workers spawned via spawn_broadcast() processing work received from an mpsc/crossbeam channel. Their output is a file on disk, so there's no synchronization back to the caller (i.e. no broadcast() which would block and yield the results from every thread).

However, at some point I'm done feeding work into the pool, and drop() the Sender. The workers are expected to yield messages from their Receiver until hitting a RecvError, and exit the thread gracefully (return from the broadcast Fn). At this point, for regular threads, I'd call JoinHandle::join() to sync this up, e.g. wait for all asynchronous thread processing to be complete.

Unfortunately there seems to be no such thing on rayon::ThreadPool. Dropping it both causes my threads to terminate early (i.e. not all files were written as expected) and it's non-blocking meaning that code after the drop() that expects the threads to have exited (to clean up transitive resources they were using) now fail.

Or is there perhaps another API that I'm missing that I might need? It basically feels like I need an fn broadcast() that returns a JoinHandle of sorts rather than the result outright, so that I can stash this away in some state somewhere and retrieve + .join() on it whenever I want to shut down.

@emilio
Copy link
Contributor Author

emilio commented Sep 7, 2023

@MarijnS95
Copy link

@emilio thanks for pointing that out, but ouch. I was looking for something more out of the box, manually defining the thread_spawn function is the same for me as calling thread::spawn() few times in a loop and adding their JoinHandles to a Vec :)

@cuviper
Copy link
Member

cuviper commented Sep 7, 2023

My use-case is a rayon::ThreadPool to with a bunch of asynchronous loop {} workers spawned via spawn_broadcast() processing work received from an mpsc/crossbeam channel. Their output is a file on disk, so there's no synchronization back to the caller (i.e. no broadcast() which would block and yield the results from every thread).

You could wrap this in a scope (or in_place_scope) and use its Scope::spawn_broadcast, and then the scope will block until all of those broadcasts return. That won't wait for thread join, but all your work should be done.

Or you could use a build_scoped pool, then all the threads will be fully joined as well.

If you're not doing anything else rayon-ish with these threads, then a plain std::thread::scope that spawns your worker threads might be more straightforward.

Unfortunately there seems to be no such thing on rayon::ThreadPool. Dropping it both causes my threads to terminate early (i.e. not all files were written as expected) and it's non-blocking meaning that code after the drop() that expects the threads to have exited (to clean up transitive resources they were using) now fail.

I would expect that the threads are still running after that drop, because there's no Rust way to interrupt a thread in progress, but if you exit the program then all threads will be terminated.

@MarijnS95
Copy link

MarijnS95 commented Sep 7, 2023

@cuviper thanks, yeah that must be what's happening, because I'm "waiting" on these threads right before app exit (and if no-one waits on them, they'll get killed when the process exits).

Indeed, std::thread seems to be more straightforward; was wondering if rayon could help "manage the pool" but it's simple enough to do manually.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants