-
Notifications
You must be signed in to change notification settings - Fork 413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel hangs when using Dask as a back-end #875
Comments
I've conducted some more experiments with varying number of workers (ranging from 1 to 10) and the results are the same (the problem is always there). The number of tasks executed is the same, so the problem may be more to be related to joblib and not Dask, IMO. In one of those experiments, several batches (generally 6 batches, see what follows) are dispatched, but only one This can be seen by counting the logs : everything seems to be executed accordingly to the batches processed (see the logs that are done 6 times vs the logs that are only done once). Logs count```1 BatchCompletionCallBack.__call__ called 1 Client.__init__ called 1 Client._start called 1 Client.start called 1 Creating Dask client 1 Creating joblib.Parallel(n_jobs=4, verbose=100, pre_dispatch=n_jobs, backend=dask) 1 DaskDistributedBackend.configure called 1 DaskDistributedBackend.effective_n_jobs called 1 DaskDistributedBackend.start_call called 1 Early stopping enabled 1 Entering Dask context 1 Parallel.__call__ called 1 Parallel.__call__ dispatched initial batch 1 Parallel.__call__: iterator created 1 Parallel.__init__ called 1 Parallel._dispatch: job of id 0 inserted to back-end 1 Parallel._dispatch: job of id 1 inserted to back-end 1 Parallel._dispatch: job of id 2 inserted to back-end 1 Parallel._dispatch: job of id 3 inserted to back-end 1 Parallel._dispatch: job of id 4 inserted to back-end 1 Parallel._dispatch: job of id 5 inserted to back-end 1 Parallel._dispatch: submitting job of id 0 to back-end of size 1 1 Parallel._dispatch: submitting job of id 1 to back-end of size 1 1 Parallel._dispatch: submitting job of id 2 to back-end of size 1 1 Parallel._dispatch: submitting job of id 3 to back-end of size 1 1 Parallel._dispatch: submitting job of id 4 to back-end of size 1 1 Parallel._dispatch: submitting job of id 5 to back-end of size 1 1 Parallel._dispatch: submitting job of id 6 to back-end of size 1 1 Parallel._initialize_backend called 1 Parallel.print_progress called 1 Scheduler.__init__ called 1 Scheduler._setup_logging called 1 Scheduler.add_worker called 1 Scheduler.finished called 1 Scheduler.handle_worker called 1 Worker.get_client called 1 [Parallel(n_jobs=4)]: Done 1 tasks | elapsed: 2.3s 1 [Parallel(n_jobs=4)]: Using backend DaskDistributedBackend with 8 concurrent workers. 1 register_parallel_backend called 2 Parallel._print called 2 Scheduler.add_client called 5 Parallel.__call__ dispatched one batch 6 Appending tasks done 6 Batch.__init__ called 6 Batch.__reduce__ called 6 Client._graph_to_futures called 6 Client.submit called 6 DaskDistributedBackend.apply_async: Added future to tasks 6 DaskDistributedBackend.apply_async: Done submitting to the Client 6 DaskDistributedBackend.apply_async: Executing callback_wrapper 6 DaskDistributedBackend.apply_async: Returning future 6 DaskDistributedBackend.apply_async: Submitting to Client 6 Parallel.dispatch_one_batch: leaving the lock after self._dispatch 6 Scheduler._add_to_memory called 6 Scheduler._remove_from_processing called 6 Scheduler.consumre_resources called 6 Scheduler.handle_task_finished called 6 Scheduler.release_resources called 6 Scheduler.send_task_to_worker called 6 Scheduler.update_graph called 6 Scheduler.valid_workers called 7 BatchCompletionCallBack.__init__ called 7 BatchedCalls.__init__ : wraps 1 objects 7 BatchedCalls.__init__ called 7 DaskDistributedBackend._to_func_args called 7 DaskDistributedBackend.apply_async called 7 DaskDistributedBackend.get_nested_backend called 7 Looping on func.items 7 Parallel._dispatch called 7 Parallel.dispatch: calling self._dispatch 7 Parallel.dispatch_one_batch called 7 Parallel.dispatch_one_batch: took the lock 7 delayed called 7 in '_dask._to_func_args' 8 DaskDistributedBackend.__init__ called 10 Scheduler.worker_send called 12 Scheduler.decide_worker called 16 Scheduler.scatter called 16 Scheduler.update_data called 16 [f] = self.client.scatter([arg]) 16 maybe to future called 17 Worker.get_worker called 24 Scheduler.transitions called 33 Scheduler.check_idle_saturated called 44 Scheduler.report_on_key called 70 Scheduler.report called Here, a 7th batch is to be submitted but is dispatch is not entirely done.
My guess is that a race condition is present on I'll run another experiment with even more logs on the lock. |
I've created a repository for a minimal reproducible example: I am trying to tweak things to have the same behavior. |
FYI @TomAugspurger |
Thanks very much @jjerphan for investigating. I had meant to have a look into this issue for quite some time but I have been busy with talks, reviews and the scikit-learn release. I hope I will find the time to review this soon but unfortunately probably not this week nor the next either... |
No problem @ogrisel. :) Here are some other logs that focus on the lock, especially on the race condition between the dispatch of jobs in Briefly (see the tail), the
The |
OK, so I am done with the reproducible setup. If you have a kubernetes cluster, you can observe the behavior this with a I have new problems now that are a bit different from the first ones I had (see those logs). I'll inspect this tomorrow. |
I am trying to understand the mecanism and role of the RLock in This lock is taken twice in I now nothing about using locks for concurrency. What is the advantages/goal of this mecanism? What is the typical design decision behind this? @ogrisel: do you have some clues about this ? |
I now have a setup that hangs on I have put the logs here. I am listing problems and logs I have in my setup as they arise. |
The dispatch functions can be called by the main thread in |
Any datastructure that manages the queue of tasks, pending results and collected results. It depends on each backend implementation. |
I was thinking of something similar to this, but I hardly see how can an additional lock help. |
Why is a nestedbackend used in the case of |
I am trying to understand the logic of |
I've tried to use It seems to be a subtle problem of interface between joblib and distributed. I have some ideas but I don't want to be speculative. For now, I'll stop to work on this issue, but if anyone is interested in investigating it, please, feel free to contact me. :) |
Fixed by #914. |
Hello,
I have been using Dask through joblib and had somes problem of deadlocks for the execution of some tasks.
I am using
Parallel
with Dask as a backend.Parallel.__call__
does not returns.After having modified some of the code with logs in
scikit-learn 0.20.x
distribution of joblib anddistributed master
; what I observed is that Tasks are not submitted at one point. Apart from that it seems that tasks on workers are executed properly.I have inspected
Parallel
code, I think that I may have found a possible explanation of possible deadlocks.The dispatch of tasks in made as follows in
Parallel.__call__
:in
Parallel.dispatch_one_batch
Parallel._dispatch
is called.As indicated in the docstring
Parallel._dispatch
"this method is not thread-safe: it should be only called indirectly via dispatch_one_batch"._dispatch
does useBatchCompletionCallBack
's objects that are called by the backendapply_async
. In the case ofSequentialBackend
's, the execution is made in the same thread so there are no problems ; but in the case of Dask,DaskDistributedBackend.apply_async
does register this callback ontornado.IOLoop
:And as indicated, it "gets called in another thread" ; hence possibly producing a dead-lock if the lock is already taken in the main thread.
I am not entirely sure this could be a valid explanation. Also, I do not see a way to do handle the callback differently in
DaskDistributedBackend
, nor do I understand all the moving parts of this connector.I am inspecting it currently. I'll be glad to have your thought on it, @ogrisel and @mrocklin.
While I can hardly give a minimal complete reproducible example with my setup (sorry), I think that the problem goes hand in hand with specific workloads and can be present in a variety of setups and observed in a variety of errors that are hard to predict.
However, you will find logs that present the problem ; I hope they are useful.
Please let me know if you need some more clues.
Setup:
Client-side logs
The text was updated successfully, but these errors were encountered: