Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apply apply_async methods #63

Closed
riyadparvez opened this issue Oct 16, 2022 · 8 comments · Fixed by #70
Closed

Add apply apply_async methods #63

riyadparvez opened this issue Oct 16, 2022 · 8 comments · Fixed by #70
Assignees
Labels
enhancement New feature or request

Comments

@riyadparvez
Copy link

The standard multiprocessing module supports apply apply_async methods. Can support for these methods be added?

@sybrenjansen
Copy link
Owner

It's possible to add them and even though it's not hard to, it would require quite some engineering work. I'm currently swamped in other responsibilities, so if anyone wants to take a stab at it, then that would be appreciated.

I might have some time later this year, but can't promise anything in the short term.

@sybrenjansen sybrenjansen added help wanted Extra attention is needed enhancement New feature or request labels Oct 17, 2022
@thedrow
Copy link

thedrow commented Jan 8, 2023

@sybrenjansen Can you please describe what needs to be done?

@sybrenjansen
Copy link
Owner

sybrenjansen commented Jan 10, 2023

@thedrow We would basically be implementing the multiprocessing versions of apply and apply_async, but then applied to how mpire is set up of course.

The apply is very simple, that's just calling apply_async and then immediately calling .get on the result object that it returns.

The apply_async is a bit more complicated.

  • First, we need to check if there are workers yet. If not, start them.
  • Second, we need to create a result class where the future result is stored. In the multiprocessing implementation, there is also support for a callback and error callback once the task is done. We basically want the same implementation as ApplyResult in the multiprocessing lib.
  • The results object that is created once apply_async is called should not only be returned by apply_async, but it should also be stored in the WorkerPool somewhere. This is because we need to be able to set the result once the worker is done. Creating an apply_results dictionary in the WorkerPool init should suffice. The key is then the job identifier, which I'll explain below.
  • We also want to have a separate results queue for apply tasks. Otherwise this will interfere with map calls in between.
  • Currently, mpire assumes that each worker gets the same tasks. When that task changes it uses WorkerComms.add_new_map_params to change the function of all workers. In the apply case, each call to apply can be with a different function and other parameters. So the worker class needs to handle this differently. I think it would be best if we add a new add_apply_task to the WorkerComms class that adds the new params for a single worker only and then adds the task and a job identifier to the task queue for that worker. The job identifier is needed such that we can store the returned result in the right results object in the main thread. We also need to reset WorkerPool.map_params, because it's no longer valid. This will ensure that once a map function is called new map parameters are passed on to the workers.
  • In the worker class it then automatically picks up these new params. The only thing that needs to happen now is that the job identifier should be handled correctly. I.e., it should be obtained, but not passed on to the actual function that is to be called by the worker, and then returned together with the function results. So I think it's best to introduce a new kind of PILL as I call it in mpire. An identifier that signals to the worker that this is an apply task and needs to be treated slightly differently.
  • The results of this task should be put in the new results queue as mentioned earlier.
  • When this is all done, the worker correctly receives the function parameters, executed the job to be done, and puts the results in the new results queue. Now the main thread needs to listen to this results queue and put the results in the right results object.
  • For this, we can start a listener thread once the WorkerComms are initialized (i.e., in WorkerPool._start_workers). This thread should constantly listen to this new results queue for new results. If new results come in, it can use the job identifier to access the right results object from the apply_results dictionary and set the results. The results class then calls the right callback if provided. We can then also remove this object from the apply_results dictionary, as it's no longer needed.
  • Of course, all of this needs unit tests.

Then, there's one more worry. What if people call imap and then immediately call apply_async? Then the workers will be in conflict as the apply function will overwrite the function to be executed by the worker. I think it makes most sense to simply make this illegal. I.e., set an Event variable once a map function is called to indicate it is running, and reset it once all tasks have been processed. Then, in apply_async it checks that variable and throws an error if it's set.

I think that's about it. Quite some work as I told you. But if anyone wants to take a stab at it, be my guest :)

@sybrenjansen
Copy link
Owner

FYI: I'm currently taking a stab at this. I'm making some architectural changes in the process, which will make adding apply-like functions a lot simpler. It's quite a bit of work, though, so it will take a while.

@thedrow
Copy link

thedrow commented Jan 29, 2023

That's great.
If you'll succeed I'll consider adding an mpire-based concurrency pool for Celery and hopefully finally deprecate billiard.

@sybrenjansen sybrenjansen self-assigned this Jan 30, 2023
@sybrenjansen
Copy link
Owner

@thedrow that's very cool. I'll better get to it then ;)

@sybrenjansen
Copy link
Owner

Quick status update. I have the functionality working. Need a bit of time still to clean things up, write some additional docs and get it reviewed. I expect to have it completely ready in a week or two

@sybrenjansen sybrenjansen removed the help wanted Extra attention is needed label Mar 17, 2023
sybrenjansen added a commit that referenced this issue Mar 17, 2023
* Added the apply and apply_async functions (Fixes #63)
* When inside a Jupyter notebook, the progress bar will not automatically switch to a widget anymore. tqdm cannot always determine with certainty that someone is in a notebook or, e.g., a Jupyter console. Another reason is to avoid the many errors people get when having widgets or javascript disabled (Fixes #71)
* The dashboard.connect_to_dashboard function now raises a ConnectionRefused error when the dashboard isn't running, instead of silently failing and deadlocking the next map call with a progress bar (Fixes #68)
* Added support for a progress bar without knowing the size of the iterable. It used to disable the progress bar when the size was unknown
* Changed how max_tasks_active is handled. It now applies to the number of tasks that are currently being processed, instead of the number of chunks of tasks, as you would expect from the name. Previously, when the chunk size was set to anything other than 1, the number of active tasks could be higher than max_tasks_active
* Updated some exception messages and docs (Fixes #69)
* Changed how worker results, restarts, timeouts, unexpected deaths, and exceptions are handled. They are now handled by individual threads such that the main thread is more responsive. The API is the same, so no user changes are needed
* Mixing multiple map calls now raises an error (see docs)
* Fixed a bug where calling a map function with a progress bar multiple times in a row didn't display the progress bar correctly
* Fixed a bug where the dashboard didn't show an error when an exit function raised an exception

---------

Co-authored-by: sybrenjansen <sybren.jansen@gmail.com>
@sybrenjansen
Copy link
Owner

Released in v2.7.0 (@thedrow @riyadparvez )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants