Skip to content

Commit

Permalink
Use ray to parallelize read hdf 204 (#264)
Browse files Browse the repository at this point in the history
* modin.pandas.io.read_parquet to partition columns according to CPU cores, reading multiple columns at once using Ray

* blank line removed from EOF

* column splits recalculated according to @devin-petersohn review

* Formatting for consistency and flake8

* issue #204
Use Ray to parallelize `read_hdf`
similar to the way `read_parquet` works

* reformatted by black

* merged from master
  • Loading branch information
eavidan authored and devin-petersohn committed Nov 19, 2018
1 parent a0dd742 commit 919ac0d
Showing 1 changed file with 59 additions and 5 deletions.
64 changes: 59 additions & 5 deletions modin/pandas/io.py
Expand Up @@ -511,11 +511,44 @@ def read_excel(
return ray_frame


def read_hdf(path_or_buf, key=None, mode="r"):
warnings.warn("Defaulting to Pandas implementation", UserWarning)
port_frame = pandas.read_hdf(path_or_buf, key, mode)
ray_frame = from_pandas(port_frame)
return ray_frame
def read_hdf(path_or_buf, key=None, mode="r", columns=None):
if not columns:
empty_pd_df = pandas.read_hdf(path_or_buf, start=0, stop=0)
columns = empty_pd_df.columns

num_partitions = RayBlockPartitions._compute_num_partitions()
num_splits = min(len(columns), num_partitions)
# Each item in this list will be a list of column names of the original df
column_splits = (
len(columns) // num_partitions
if len(columns) % num_partitions == 0
else len(columns) // num_partitions + 1
)
col_partitions = [
columns[i : i + column_splits] for i in range(0, len(columns), column_splits)
]
blk_partitions = np.array(
[
_read_hdf_columns._submit(
args=(path_or_buf, cols, num_splits, key, mode),
num_return_vals=num_splits + 1,
)
for cols in col_partitions
]
).T
remote_partitions = np.array(
[
[PandasOnRayRemotePartition(obj) for obj in row]
for row in blk_partitions[:-1]
]
)
index_len = ray.get(blk_partitions[-1][0])
index = pandas.RangeIndex(index_len)
new_manager = PandasQueryCompiler(
RayBlockPartitions(remote_partitions), index, columns
)
df = DataFrame(query_compiler=new_manager)
return df


def read_feather(path, nthreads=1):
Expand Down Expand Up @@ -666,3 +699,24 @@ def _read_parquet_columns(path, columns, num_splits, kwargs):
df = pq.read_pandas(path, columns=columns, **kwargs).to_pandas()
# Append the length of the index here to build it externally
return split_result_of_axis_func_pandas(0, num_splits, df) + [len(df.index)]


@ray.remote
def _read_hdf_columns(path_or_buf, columns, num_splits, key, mode):
"""Use a Ray task to read a column from HDF5 into a Pandas DataFrame.
Args:
path: The path of the HDF5 file.
columns: The list of column names to read.
num_splits: The number of partitions to split the column into.
Returns:
A list containing the split Pandas DataFrames and the Index as the last
element. If there is not `index_col` set, then we just return the length.
This is used to determine the total length of the DataFrame to build a
default Index.
"""

df = pandas.read_hdf(path_or_buf, key, mode, columns=columns)
# Append the length of the index here to build it externally
return split_result_of_axis_func_pandas(0, num_splits, df) + [len(df.index)]

0 comments on commit 919ac0d

Please sign in to comment.