Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dask dataframes #99

Merged
merged 7 commits into from
May 14, 2021
Merged

Conversation

krfricke
Copy link
Collaborator

Closes #92

Note that the locality-aware scheduling can currently not be tested reliably. This is due to 2 reasons:

  1. We cannot guarantee that dask partitions end up living on a specific node (to set an initial state for redistribution)
  2. We cannot guarantee that tasks that determine the dask partition node are co-scheduled with the respective partition.

Once the ray.state.objects() API or something similar comes back, the second option should be easy to enable, and the first option will be easy to confirm.

@krfricke
Copy link
Collaborator Author

Currently it seems that the problem lies with 1) - ray memory shows that each partition lives on the head node after calling persist(). I'll try to find a solution to that.

@richardliaw richardliaw requested a review from amogkam May 13, 2021 07:38

# Dask does not support iloc() for row selection, so we have to
# compute a local pandas dataframe first
local_df = data.compute()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this materialize the full distributed dataframe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. However, this part of the code concerns the local loading - i.e. loading of either a partition or centralized loading, where access to the whole dataframe is assumed anyway. This is the part where we end up with a pandas dataframe.

Comment on lines +150 to +151
# Pass tuples here (integers can be misinterpreted as row numbers)
ip_to_parts[ip].append((pid, ))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why would it be misinterpreted?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In load_data(), we are doing this check:


        if indices is not None and len(indices) > 0 and isinstance(
                indices[0], Tuple):

Usually, numerical indices would indicate the row numbers a worker should load. This is also valid for Dask (e.g. for centralized loading). However, in this case the numbers should indicate the partition numbers. To not confuse these with row indices, we use a different type. IMO tuples are a good choice because they are immutable and should produce very few overhead.

Note that in Modin we use Ray ObjectIDs instead, but we don't have these for Dask at this point in the code.

Copy link
Collaborator

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks great! The main question i have is about some of the implementation details.

@krfricke krfricke merged commit 888cdd1 into ray-project:master May 14, 2021
@krfricke krfricke deleted the dask-df branch May 14, 2021 16:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Distributed Dask dataframes
2 participants