-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Postpone reader.get_read_tasks
until execution
#38373
Conversation
@@ -1413,12 +1413,6 @@ def test_unsupported_pyarrow_versions_check_disabled( | |||
except ImportError as e: | |||
pytest.fail(f"_check_pyarrow_version failed unexpectedly: {e}") | |||
|
|||
# Test read_parquet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pyarrow 5 does not support pickling the Parquet reader class. Given we do not support Pyarrow 5, remove the test code here. Already verified for Pyarrow 6+, it's not an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just remove the PyArrow 5 CI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already removed. CI only tests 6 and 12. This test manually install 5.
@@ -384,6 +387,7 @@ def read_datasource( | |||
# Compute the number of blocks the read will return. If the number of blocks is | |||
# expected to be less than the requested parallelism, boost the number of blocks | |||
# by adding an additional split into `k` pieces to each read task. | |||
additional_split_factor = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small issue. For the new code path, get_read_tasks
has actually created the read tasks. But the read tasks are only used for the following calculations and then discarded.
I'm wondering if we can move the following calculation code to the reader.
So here we only create the reader, but not the read tasks. Also we don't need to expose this additional_split_factor
to the operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, that's the major weird thing here. The other code path LazyBlockList
has all code paths depending on List[ReadTask]
, so I don't spend time to refactoring LazyBlockList
. Shall we just delete DatasetPipeline
and LazyBlockList
/BlockList
during 2.8?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's fine. can you leave a todo here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG, added.
@@ -1413,12 +1413,6 @@ def test_unsupported_pyarrow_versions_check_disabled( | |||
except ImportError as e: | |||
pytest.fail(f"_check_pyarrow_version failed unexpectedly: {e}") | |||
|
|||
# Test read_parquet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just remove the PyArrow 5 CI?
python/ray/data/_default_config.py
Outdated
@@ -0,0 +1,2 @@ | |||
# Default file shuffler class to use. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to move all of default config value (https://github.com/ray-project/ray/blob/master/python/ray/data/context.py#L17-L133) to this file. But that involves more code change because other code paths directly use them. I plan to do it as a separate PR.
@@ -513,3 +513,20 @@ def unify_block_metadata_schema( | |||
# return the first schema. | |||
return schemas_to_unify[0] | |||
return None | |||
|
|||
|
|||
def get_attribute_from_class_name(class_name: str) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked online, it looks like it's the recommended way to do it - https://stackoverflow.com/questions/452969/does-python-have-an-equivalent-to-java-class-forname .
paths: List[str], | ||
file_sizes: List[int], | ||
reader_args: Dict[str, Any] | ||
) -> Tuple[List[str], List[int]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add docstring for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but this file is deleted.
python/ray/data/read_api.py
Outdated
@@ -370,6 +372,7 @@ def read_datasource( | |||
min_safe_parallelism, | |||
inmemory_size, | |||
read_tasks, | |||
reader, | |||
) = ray.get( | |||
get_read_tasks.remote( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is get_read_tasks
being postponed if it's still being called here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment below, this is needed for LazyBlockList
code path.
super().__init__(f"Read{datasource.get_name()}{suffix}", None, ray_remote_args) | ||
self._datasource = datasource | ||
self._estimated_num_blocks = estimated_num_blocks | ||
self._read_tasks = read_tasks | ||
self._reader = reader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does self._reader
get used here?
@@ -42,7 +42,10 @@ def _plan_read_op(op: Read) -> PhysicalOperator: | |||
""" | |||
|
|||
def get_input_data() -> List[RefBundle]: | |||
read_tasks = op._read_tasks | |||
read_tasks = op._reader.get_read_tasks(op._parallelism) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where does self._reader get used here?
@amogkam - this is used in planner here.
@@ -30,4 +34,4 @@ def fusable(self) -> bool: | |||
as fusion would prevent the blocks from being dispatched to multiple processes | |||
for parallel processing in downstream operators. | |||
""" | |||
return self._estimated_num_blocks == len(self._read_tasks) | |||
return self._parallelism == self._estimated_num_blocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just self._additional_split_factor is None
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if _additional_split_factor == 1
, we can still do the fusion right?
def get_read_tasks(self, parallelism: int) -> List[ReadTask]: | ||
from bson.objectid import ObjectId | ||
|
||
self._create_client() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, this would be clearer client = self._get_or_create_client()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
python/ray/data/_default_config.py
Outdated
@@ -0,0 +1,2 @@ | |||
# Default file shuffler class to use. | |||
DEFAULT_FILE_SHUFFLER = "ray.data.datasource.file_shuffler.SequentialFileShuffler" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we store the concrete class here? not just a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to store the class name here, because in the runtime, different objects will have different parameters. For example, Spark is doing this - https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2317-L2322 .
python/ray/data/context.py
Outdated
@@ -204,6 +206,7 @@ def __init__( | |||
self.use_ray_tqdm = use_ray_tqdm | |||
self.use_legacy_iter_batches = use_legacy_iter_batches | |||
self.enable_progress_bars = enable_progress_bars | |||
self.file_shuffler = file_shuffler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, there is no need to add add this __init__
argument.
@property
def file_shuffler_cls(self):
# import here to avoid cyclic dependencies.
return DEFAULT_FILE_SHUFFLER;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess so, we can change the whole __init__
later. Let's do a separate PR? Just follow other configs here.
from typing import Any, Dict, List, Tuple | ||
|
||
|
||
class FileShuffler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call it FileMetadataShuffler
. The current name sounds like it is shuffling the files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, updated.
2fb516a
to
214c438
Compare
def __init__(self, reader_args: Dict[str, Any]): | ||
self._reader_args = reader_args | ||
|
||
def shuffle_files( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; similar to class name update, should we update the method name + docstrings to something like shuffle_file_metadatas
or shuffle_metadatas
? Is it possibly confusing with shuffling files instead of their metadata?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I guess it's probably fine? given we already have FileMetadataShuffler
as class name.
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Cheng Su <scnju13@gmail.com>
This is a followup of #38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective. Signed-off-by: Cheng Su <scnju13@gmail.com>
…38373) This PR is to postpone `reader.get_read_tasks()` (i.e. generate the `List[ReadTask]`) until Dataset is executed. Also introduce a hook to allow post processing input files inside `reader.get_read_tasks()`, so we can have custom logic to do post processing of input files, before returning the `List[ReadTask]`. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: harborn <gangsheng.wu@intel.com>
…ct#38508) This is a followup of ray-project#38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: harborn <gangsheng.wu@intel.com>
…38373) This PR is to postpone `reader.get_read_tasks()` (i.e. generate the `List[ReadTask]`) until Dataset is executed. Also introduce a hook to allow post processing input files inside `reader.get_read_tasks()`, so we can have custom logic to do post processing of input files, before returning the `List[ReadTask]`. Signed-off-by: Cheng Su <scnju13@gmail.com>
…ct#38508) This is a followup of ray-project#38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective. Signed-off-by: Cheng Su <scnju13@gmail.com>
…38373) This PR is to postpone `reader.get_read_tasks()` (i.e. generate the `List[ReadTask]`) until Dataset is executed. Also introduce a hook to allow post processing input files inside `reader.get_read_tasks()`, so we can have custom logic to do post processing of input files, before returning the `List[ReadTask]`. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
…ct#38508) This is a followup of ray-project#38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
…38373) This PR is to postpone `reader.get_read_tasks()` (i.e. generate the `List[ReadTask]`) until Dataset is executed. Also introduce a hook to allow post processing input files inside `reader.get_read_tasks()`, so we can have custom logic to do post processing of input files, before returning the `List[ReadTask]`. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: Victor <vctr.y.m@example.com>
…ct#38508) This is a followup of ray-project#38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective. Signed-off-by: Cheng Su <scnju13@gmail.com> Signed-off-by: Victor <vctr.y.m@example.com>
Why are these changes needed?
This PR is to postpone
reader.get_read_tasks()
(i.e. generate theList[ReadTask]
) until Dataset is executed. Also introduce a hook to allow post processing input files insidereader.get_read_tasks()
, so we can have custom logic to do post processing of input files, before returning theList[ReadTask]
.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.