Skip to content

Commit

Permalink
[Data] Change FileMetadataShuffler interface to take tuple (#38508)
Browse files Browse the repository at this point in the history
This is a followup of #38373, to change interface of `FileMetadataShuffler` to take tuple instead of two lists. This guarantees the paths and sizes are the same length from API perspective.

Signed-off-by: Cheng Su <scnju13@gmail.com>
  • Loading branch information
c21 committed Aug 16, 2023
1 parent d46e4e8 commit bc37a91
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 13 deletions.
6 changes: 3 additions & 3 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,10 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
reader_args = self._reader_args
partitioning = self._partitioning

paths, file_sizes = self._file_metadata_shuffler.shuffle_files(
self._paths,
self._file_sizes,
paths_and_sizes = self._file_metadata_shuffler.shuffle_files(
list(zip(self._paths, self._file_sizes))
)
paths, file_sizes = list(map(list, zip(*paths_and_sizes)))

read_stream = self._delegate._read_stream
filesystem = _wrap_s3_serialization_workaround(self._filesystem)
Expand Down
17 changes: 7 additions & 10 deletions python/ray/data/datasource/file_metadata_shuffler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,23 @@ def __init__(self, reader_args: Dict[str, Any]):

def shuffle_files(
self,
paths: List[str],
file_sizes: List[int],
) -> Tuple[List[str], List[int]]:
paths_and_sizes: List[Tuple[str, int]],
) -> List[Tuple[str, int]]:
"""Shuffle files in the given paths and sizes.
Args:
paths: The file paths to shuffle.
file_sizes: The size of file paths, corresponding to `paths`.
paths_and_sizes: The file paths and file sizes to shuffle.
Returns:
The file paths and their size after shuffling.
The file paths and their sizes after shuffling.
"""
raise NotImplementedError


class SequentialFileMetadataShuffler(FileMetadataShuffler):
def shuffle_files(
self,
paths: List[str],
file_sizes: List[int],
) -> Tuple[List[str], List[int]]:
paths_and_sizes: List[Tuple[str, int]],
) -> List[Tuple[str, int]]:
"""Return files in the given paths and sizes sequentially."""
return (paths, file_sizes)
return paths_and_sizes

0 comments on commit bc37a91

Please sign in to comment.