-
Notifications
You must be signed in to change notification settings - Fork 433
Collect Dask results as they complete #1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This currently fails on |
Codecov Report
@@ Coverage Diff @@
## master #1025 +/- ##
==========================================
+ Coverage 94.15% 94.17% +0.01%
==========================================
Files 46 46
Lines 6522 6540 +18
==========================================
+ Hits 6141 6159 +18
Misses 381 381
Continue to review full report at Codecov.
|
I looked a bit at the failure on Python 3 (Python 2 failures can be ignored since there is a non-Python 2 compatible change in this PR) and I can reproduce it locally, it manifests itself as a "hang" (sorry for the vague term) i.e. the joblib/joblib/test/test_dask.py Lines 135 to 136 in e426cb7
What I observed is that changing the input data slightly, the "hang" goes away and the test pass. I just pushed a commit to see whether that was the case on the CI. I don't have any clue why this is the case at the moment. |
So this quirk is confirmed on the CI:
Side-comment: the "hang" was fully deterministic locally i.e. it does not seem due to some race condition. |
Right, I wonder if the imbalance of data before was intentional some way. Maybe the current implementation is not as robust to some bad user inputs. |
Is Joblib still supporting Python 2? If so, I can try to switch to older API. |
I have not been following joblib very closely, but there is an ongoing PR for dropping Python 2: #1018. I would not worry too much about Python 2 support. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for this PR @mrocklin!
joblib/_dask.py
Outdated
cf_future.set_exception(exc) | ||
else: | ||
cf_future.set_result(result) | ||
self._callbacks.pop(future)(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm wrong, but the joblib
callback contains a dask
client.scatter
call, which will be automatically made into a coroutine if running in the IOLoop
thread (such as here, as dask
would detect this call is done in an thread that should not be blocked). Would we want the callback
to be ran in the IOLoop
, we would need to make the whole joblib callback
function execution chain async
right?
Would it be possible/make sense to run such a _collect
routine in a regular thread
instead? This way the callback can remain blocking without blocking the IOLoop
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for bringing this up. I hadn't thought of it before.
Currently I see scatter calls both in the constructor, and in the apply_async
call. My guess is that both of these will be called from the user's main thread, and not from the IOLoop. If that's the case then I think that we should be safe. Dask generally runs the IOLoop in a separate thread.
You mention the callback
function and I have to admit that I don't know what happens in that function. Does that call Joblib code which might then call apply_async
? If so then yes, we might want to queue up the callbacks to run in the main thread (or some other thread)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fast feedback.
You mention the callback function and I have to admit that I don't know what happens in that function.
This callback is actually in charge of dispatching a new task from the iterator
given as input to the Parallel
object. Thus, this callback calls apply_async
which itself triggers a scatter
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, we're now always scattering from the event loop, so this should be resolved.
Oddly, we now sometimes scatter too frequently (maybe because we're now allowing some concurrency) and so there is some inefficiency here that was introduced (see the modified test). I tried avoiding this with an asyncio.Lock
but that stopped things. I'm not yet sure why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, now the whole callback is done in the event loop. This callback runs scatter
calls, which are made async (great!), thus non-blocking, but this callback runs also a lot of "not async" joblib code, and I'm afraid we affect performance by running so much joblib code in the event loop (eventhough I'm not an async
expert).
I coded an alternative here pierreglaser@00158c8, where we should run much less joblib
code inside the event
loop. Not saying this is the way to go though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, alternatively could we call just the callback in another thread? For debugging reasons it's useful to keep Dask code running the event loop. Things tend to be smoother generally
How much joblib work happens in the callback? What are these calls doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These calls are basically consuming new tasks from the original iterator sent as an input of Parallel.__call__
. So there's a fair amount of joblib code that this callbacks runs, including code inside with lock
context managers, client.scatter
calls to scatter the input, and eventually client.submit
calls. So there is a bunch of dask
code, and a bunch of joblib
code run by the callbacks.
Hrm, alternatively could we call just the callback in another thread?
Doesn't this contradict the fact that we want to keep dask
code in the event loop since the callback
contains dask
(client.submit
, client.scatter
) code?
See here for the callbacks "head of call stack"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ogrisel do you have some thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that for 90% of our users, the task generating iterator will be fast to consume, so this should not be a problem for them.
But the iterator could also be a lazy data loader from a database or a folder with large compressed files, in which case delegating to another thread is probably a good idea.
Maybe it would be worth experimenting with a synthetic, slow task itetor to see what this would mean for the current design of this PR?
This experiment could probably be turned into an integration test both for the task backend and the other thread or process based backends.
7c5999c
to
24da328
Compare
Previously we would wait until joblib called lazy_result.get() before collecting results. This would trigger a transfer in the main thread, which would block things a bit. Now we collect results as soon as they are available using dask.distributed.as_completed. This helps to reduce overhead in joblib a bit and improve overall bandwidth.
We recommend using dask.distributed imports Also we remove some Python 2 compatibility bits
This removes some additional state and lets us clean up Dask futures more quickly
24da328
to
e58f8f4
Compare
Force pushed |
I'm not sure that I understand the test failures. Are they unrelated perhaps? |
Probably. Feel free to push an empty commit to trigger the CI again. |
Woo! That's really satisfying to see. |
Do we want to go ahead with this, or wait until we resolve the issue about getting the next element from the iterator in a separate thread. As a warning, I'm pretty saturated this week and am unlikely to work more on this in the next few days. Hopefully future weeks are better, but it's hard to predcit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the slow input producer benchmark did not reveal any pathological issue I would in favor of postponing this change to the day we actually need it.
+1 for merge as is. Thank you very much @mrocklin and @pierreglaser for the benchmarks.
And thanks @lesteve as well! |
I don't think I did much on this one, but it is great to see this merged! IMO this is the kind of issue at the intersection of two libraries that is fixable if you happen two have one expert on each library working hand-in-hand but really tricky to fix otherwise. |
Previously we would wait until joblib called lazy_result.get() before
collecting results. This would trigger a transfer in the main thread,
which would block things a bit.
Now we collect results as soon as they are available using
dask.distributed.as_completed. This helps to reduce overhead in joblib
a bit and improve overall bandwidth.
Somewhat related to (but doesn't entirely fix) #1020 and dask/dask#5993
There are still performance issues. We have a lot of downtime in this process, but it's not happening in Dask (all of our profilers show that we're not spending a ton of time in Dask code). I suspect that something on the joblib end is blocking things, but profiling so far hasn't shown anything.