-
Notifications
You must be signed in to change notification settings - Fork 651
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#7004: use generators when returning from _deploy_ray_func remote function. #7005
Conversation
for r in result: | ||
for item in [r, len(r), len(r.columns), ip]: | ||
yield item |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the tip to use generators only helps in cases where each iteration of the yielding for-loop actually creates some objects/allocates new memory. In this for loop we already have all the objects computed (dfs in result
) and the memory for all the results was already allocated.
Can we instead use generators in split_result_of_axis_func_pandas
function that actually creates a list of resulting dataframes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we should use generators here and in split_result_of_axis_func_pandas
too.
@@ -80,7 +80,8 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None) | |||
Splitted dataframe represented by list of frames. | |||
""" | |||
if num_splits == 1: | |||
return [result] | |||
yield result | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not execute the rest of code, and exit the function. Or I will have to put the rest of code in an else block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else branch is good for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ip = get_node_ip_address() | ||
if isinstance(result, pandas.DataFrame): | ||
return result, len(result), len(result.columns), ip | ||
elif all(isinstance(r, pandas.DataFrame) for r in result): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we check if all parts of result are dataframes. What are the scenarios where the result would be heterogeneous( composed of dataframes and non dataframes)?
One possibility I can think of is results could have errors, in this scenario I think it would it be sufficient to send [r, None, None, ip]
for the errors and send[r, len(r), len(r.columns), ip]
for the results that are dataframes. Would this suffice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would suffice.
@@ -80,7 +80,8 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None) | |||
Splitted dataframe represented by list of frames. | |||
""" | |||
if num_splits == 1: | |||
return [result] | |||
yield result | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To not execute the rest of code, and exit the function. Or I will have to put the rest of code in an else block.
@arunjose696, once you fix all CI jobs, please convert this PR to ready for review and put some time and memory measurements in the PR description. |
…ay_func remote function. Signed-off-by: arunjose696 <arunjose696@gmail.com>
Signed-off-by: arunjose696 <arunjose696@gmail.com>
Signed-off-by: arunjose696 <arunjose696@gmail.com>
<style> </style>
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@AndreyPavlenko, since you are also going to use generators in remote functions for virtual partitions in #6991, can you look at the changes in this PR? How do they affect your changes and should we merge them? |
@@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions( | |||
with warnings.catch_warnings(): | |||
warnings.filterwarnings("ignore", category=FutureWarning) | |||
result = func(lt_frame, rt_frame, *f_args, **f_kwargs) | |||
return split_result_of_axis_func_pandas(axis, num_splits, result) | |||
if return_generator: | |||
return generate_result_of_axis_func_pandas( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return generate_result_of_axis_func_pandas( | |
yield from generate_result_of_axis_func_pandas( |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldnt work because using yeild in a function would turn it to a generator.
We do not require generators but lists for some branches of if , For the backends such as dask as we try to return a list of partitions, but as there is yield statement in the function a generator would still be returned and thus partitions would be empty when materialized.
https://stackoverflow.com/questions/26595895/return-and-yield-in-the-same-function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean not just yield
but yield from
. Would it work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked with yield from
as well still the function returns a generator when called so the code fails for dask.
CI is failing |
Signed-off-by: arunjose696 <arunjose696@gmail.com>
In #6991 _deploy_ray_func() is not used. A different approach is used there. The virtual partition's apply() functions do not split the resulting frame, but return a list of lazy partitions instead. Each partition has a deferred function, that should get the required piece of the df, i.e. the partition0 receives df.iloc[0:10], partition1 - df.iloc[10:20] ... and so on. These functions are executed lazy. It allows to do not split the entire frame if only a subset of the frame is required in the subsequent operations. |
@YarShev @arunjose696 In what minimal version of Ray did this feature appear? It seems we have implicitly increased it. UPD: Generators are supported starting from ray 2.1.0: https://github.com/ray-project/ray/releases/tag/ray-2.1.0 |
@anmyachev, oh, good catch! It seems true that generators are supported starting from ray 2.1.0. We started using generators since we introduced lazy execution for block partitions. It looks like we would have to change a lot of code. @AndreyPavlenko, do you think how much it takes for us to put a check for the ray version to either use generator or not? Or we can just update a minimal supported Ray version? |
What do these changes do?
Using yield in _deploy_ray_func to return generator instead of list.
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date