New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#5809: New implementation of the Ray lazy execution queue #6731
Conversation
a5cd7c5
to
298cbd8
Compare
Looks promising! I tried to test it on a simple example and the results are quite good. The second case is the worst one - it's when we need to return a temporary result for each node. It's a bit slower than the current master, but I think that it shouldn't occur that often. script to measureimport modin.pandas as pd
import numpy as np
from timeit import default_timer as timer
import modin.config as cfg
N = 10_000_000
M = 10
# initialize workers and trigger import
pd.DataFrame(np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())).to_numpy()
df = pd.DataFrame({f"col{i}": np.arange(N) for i in range(M)})._query_compiler._modin_frame
df.wait_computations()
# CASE 1, completely lazy:
t1 = timer()
df1 = df.map(lambda df: df + 10)
df2 = df1.map(lambda df: df.astype(float))
df3 = df2.map(lambda df: df * 3)
df4 = df3.map(lambda df: df.fillna(0))
df5 = df4.map(lambda df: df / 10)
del df
del df1
del df2
del df3
del df4
df5.wait_computations()
print(timer() - t1)
# CASE 2, return temporary result for each step
t1 = timer()
df1 = df.map(lambda df: df + 10)
df2 = df1.map(lambda df: df.astype(float))
df3 = df2.map(lambda df: df * 3)
df4 = df3.map(lambda df: df.fillna(0))
df5 = df4.map(lambda df: df / 10)
# del df
# del df1
# del df2
# del df3
# del df4
df5.wait_computations()
print(timer() - t1)
# CASE 3, return temporary result for some steps
t1 = timer()
df1 = df.map(lambda df: df + 10)
df2 = df1.map(lambda df: df.astype(float))
df3 = df2.map(lambda df: df * 3)
df4 = df3.map(lambda df: df.fillna(0))
df5 = df4.map(lambda df: df / 10)
del df
# del df1
del df2
# del df3
del df4
df5.wait_computations()
print(timer() - t1) I'll take a deeper dive into the code later, but as a first sketch it looks pretty good! |
Has problem mentioned here gone away with this PR? |
Yes, it has. #6695 (comment) |
@AndreyPavlenko could you rebase on master in order to be able to see performance with the latest changes? |
298cbd8
to
264ddb0
Compare
Done. |
I see a slowdown on our workload of about 20-25 seconds. |
4c11f1d
to
e63faa0
Compare
e63faa0
to
0c26e08
Compare
0c26e08
to
b23ad78
Compare
8024762
to
8b66ab9
Compare
# If there are no subscribers, we still need the result here. | ||
self.subscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really get it, we don't call .unsubscribe()
at the end of the function, aren't we just adding a dummy reference here that will never be removed?
Also, shouldn't the subscribe call be under this condition to match the comment?
# If there are no subscribers, we still need the result here. | |
self.subscribe() | |
if self.subscribers == 0: | |
# If there are no subscribers, we still need the result here. | |
self.subscribe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After the execution, this node will never be executed again and this counter has no impact.
Anyway, this new implementation is broken now. Working on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fixed now. There were an issue after #6883 introduced a new remote function, that returns multiple values.
cf96119
to
38417a3
Compare
… queue Signed-off-by: Andrey Pavlenko <andrey.a.pavlenko@gmail.com>
Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
Co-authored-by: Iaroslav Igoshev <Poolliver868@mail.ru>
38417a3
to
95392a9
Compare
95392a9
to
d57f67b
Compare
if LazyExecution.get(): | ||
return self.__constructor__(de) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndreyPavlenko when and in what form do you plan to add tests when LazyExecution
is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we shouldn't overload the unit testing, but instead schedule weekly runs with this option enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndreyPavlenko thank you for responding to all my comments, there were a lot of them.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndreyPavlenko thank you for the PR, great work!
For details, please read the description of the _DeferredExecution class.
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
partition.add_to_apply_calls
) #5809docs/development/architecture.rst
is up-to-date