Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Adding to_parquet and write_partition definitions to dask_cudf #3369

Merged
merged 3 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
- PR #3340 Make all benchmarks use cudf base fixture to initialize RMM pool
- PR #3337 Fix Java to pad validity buffers to 64-byte boundary
- PR #3357 Disabling `column_view` iterators for non fixed-width types
- PR #3369 Add write_partition to dask_cudf to fix to_parquet bug


# cuDF 0.10.0 (16 Oct 2019)
Expand Down
6 changes: 6 additions & 0 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ def drop(df, keep_keys):
results = [p for i, p in enumerate(parts) if uniques[i]]
return from_delayed(results, meta=self._meta).reset_index()

def to_parquet(self, path, *args, **kwargs):
""" Calls dask.dataframe.io.to_parquet with CudfEngine backend """
from dask_cudf.io import to_parquet

return to_parquet(self, path, *args, **kwargs)

@derived_from(pd.DataFrame)
def var(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
from .orc import read_orc

try:
from .parquet import read_parquet
from .parquet import read_parquet, to_parquet
except ImportError:
pass
53 changes: 53 additions & 0 deletions python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,56 @@ def read_partition(

return df

@staticmethod
def write_partition(
df,
path,
fs,
filename,
partition_on,
return_metadata,
fmd=None,
compression=None,
index_cols=None,
**kwargs,
):
# TODO: Replace `pq.write_table` with gpu-accelerated
# write after cudf.io.to_parquet is supported.

md_list = []
preserve_index = False
if index_cols:
df = df.set_index(index_cols)
preserve_index = True

# NOTE: `to_arrow` does not accept `schema` argument
t = df.to_arrow(preserve_index=preserve_index)
if partition_on:
pq.write_to_dataset(
t,
path,
partition_cols=partition_on,
filesystem=fs,
metadata_collector=md_list,
**kwargs,
)
else:
with fs.open(fs.sep.join([path, filename]), "wb") as fil:
pq.write_table(
t,
fil,
compression=compression,
metadata_collector=md_list,
**kwargs,
)
if md_list:
md_list[0].set_file_path(filename)
# Return the schema needed to write the metadata
if return_metadata:
return [{"schema": t.schema, "meta": md_list[0]}]
else:
return []


def read_parquet(path, **kwargs):
""" Read parquet files into a Dask DataFrame
Expand All @@ -112,3 +162,6 @@ class to support full functionality.
if isinstance(columns, str):
columns = [columns]
return dd.read_parquet(path, columns=columns, engine=CudfEngine, **kwargs)


to_parquet = partial(dd.to_parquet, engine=CudfEngine)
11 changes: 11 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ def test_roundtrip_from_dask(tmpdir):
assert_eq(ddf[["y"]], ddf2)


def test_roundtrip_from_dask_cudf(tmpdir):
tmpdir = str(tmpdir)
gddf = dask_cudf.from_dask_dataframe(ddf)
gddf.to_parquet(tmpdir)

# NOTE: Need `.compute()` to resolve correct index
# name after `from_dask_dataframe`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this. Is there something wrong with our metadata?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also a bit confused by this. It seems that gddf = ddf.map_partitions(cudf.from_pandas) will result in a dask_cudf dataframe with the _meta not having the same index name ans the original dask dataframe. Is it reasonable for me to raise a separate issue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can find a quick resolution to that I think that that would be ideal. If it ends up being more complicated then sure, let's wait.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can find a quick resolution to that I think that that would be ideal.

Okay - Looks like the index name is dropped in iloc calls for cudf dataframes (and not for pandas). I'll see if the fix is simple

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin My idea for a simple fix doesn't seem to work - Lets address the index-name issue separately so I can focus on higher priority items in the short term.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sounds good. Happy to merge on passed tests

gddf2 = dask_cudf.read_parquet(tmpdir)
assert_eq(gddf.compute(), gddf2)


def test_roundtrip_from_pandas(tmpdir):
fn = str(tmpdir.join("test.parquet"))

Expand Down