Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass kwargs from read_parquet() to the underlying engines. #18216

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.21.1.txt
Expand Up @@ -85,6 +85,7 @@ I/O
- Bug in :func:`read_csv` for handling null values in index columns when specifying ``na_filter=False`` (:issue:`5239`)
- Bug in :meth:`DataFrame.to_csv` when the table had ``MultiIndex`` columns, and a list of strings was passed in for ``header`` (:issue:`5539`)
- :func:`read_parquet` now allows to specify the columns to read from a parquet file (:issue:`18154`)
- :func:`read_parquet` now allows to specify kwargs which are passed to the respective engine (:issue:`18216`)

Plotting
^^^^^^^^
Expand Down
13 changes: 7 additions & 6 deletions pandas/io/parquet.py
Expand Up @@ -76,9 +76,10 @@ def write(self, df, path, compression='snappy',
table, path, compression=compression,
coerce_timestamps=coerce_timestamps, **kwargs)

def read(self, path, columns=None):
def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.parquet.read_table(path, columns=columns).to_pandas()
return self.api.parquet.read_table(path, columns=columns,
**kwargs).to_pandas()


class FastParquetImpl(object):
Expand Down Expand Up @@ -115,9 +116,9 @@ def write(self, df, path, compression='snappy', **kwargs):
self.api.write(path, df,
compression=compression, **kwargs)

def read(self, path, columns=None):
def read(self, path, columns=None, **kwargs):
path, _, _ = get_filepath_or_buffer(path)
return self.api.ParquetFile(path).to_pandas(columns=columns)
return self.api.ParquetFile(path).to_pandas(columns=columns, **kwargs)


def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
Expand Down Expand Up @@ -175,7 +176,7 @@ def to_parquet(df, path, engine='auto', compression='snappy', **kwargs):
if df.columns.inferred_type not in valid_types:
raise ValueError("parquet must have string column names")

return impl.write(df, path, compression=compression)
return impl.write(df, path, compression=compression, **kwargs)


def read_parquet(path, engine='auto', columns=None, **kwargs):
Expand Down Expand Up @@ -205,4 +206,4 @@ def read_parquet(path, engine='auto', columns=None, **kwargs):
"""

impl = get_engine(engine)
return impl.read(path, columns=columns)
return impl.read(path, columns=columns, **kwargs)
54 changes: 35 additions & 19 deletions pandas/tests/io/test_parquet.py
Expand Up @@ -105,7 +105,7 @@ def test_options_py(df_compat, pa):
with pd.option_context('io.parquet.engine', 'pyarrow'):
df.to_parquet(path)

result = read_parquet(path, compression=None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason you are removing the kw?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, compression only exists for writes, as you specify the compression when writing the data, on read you have to un-compress with whatever algorithm was used when writing the file.
Before my patch, the kw was silently dropped, now it caused exceptions, because neither backend uses it.

result = read_parquet(path)
tm.assert_frame_equal(result, df)


Expand All @@ -118,7 +118,7 @@ def test_options_fp(df_compat, fp):
with pd.option_context('io.parquet.engine', 'fastparquet'):
df.to_parquet(path, compression=None)

result = read_parquet(path, compression=None)
result = read_parquet(path)
tm.assert_frame_equal(result, df)


Expand All @@ -130,7 +130,7 @@ def test_options_auto(df_compat, fp, pa):
with pd.option_context('io.parquet.engine', 'auto'):
df.to_parquet(path)

result = read_parquet(path, compression=None)
result = read_parquet(path)
tm.assert_frame_equal(result, df)


Expand Down Expand Up @@ -162,7 +162,7 @@ def test_cross_engine_pa_fp(df_cross_compat, pa, fp):
with tm.ensure_clean() as path:
df.to_parquet(path, engine=pa, compression=None)

result = read_parquet(path, engine=fp, compression=None)
result = read_parquet(path, engine=fp)
tm.assert_frame_equal(result, df)


Expand All @@ -174,7 +174,7 @@ def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
with tm.ensure_clean() as path:
df.to_parquet(path, engine=fp, compression=None)

result = read_parquet(path, engine=pa, compression=None)
result = read_parquet(path, engine=pa)
tm.assert_frame_equal(result, df)


Expand All @@ -188,19 +188,23 @@ def check_error_on_write(self, df, engine, exc):
with tm.ensure_clean() as path:
to_parquet(df, path, engine, compression=None)

def check_round_trip(self, df, engine, expected=None, **kwargs):

def check_round_trip(self, df, engine, expected=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would refactor this helper function to have the following signature:

def check_round_trip(self, df, engine, expected=None, write_kwargs=None, read_kwargs=None)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is definitly the way to go.
Tests until now only worked because pyarrow.parquet.write_table ignores extra kwargs, and the fastparquet implementation did not pass kwargs through to the write method - else the tests would have failed on the (read-only) parameter columns already.

write_kwargs=None, read_kwargs=None):
if write_kwargs is None:
write_kwargs = {}
if read_kwargs is None:
read_kwargs = {}
with tm.ensure_clean() as path:
df.to_parquet(path, engine, **kwargs)
result = read_parquet(path, engine, **kwargs)
df.to_parquet(path, engine, **write_kwargs)
result = read_parquet(path, engine, **read_kwargs)

if expected is None:
expected = df
tm.assert_frame_equal(result, expected)

# repeat
to_parquet(df, path, engine, **kwargs)
result = pd.read_parquet(path, engine, **kwargs)
to_parquet(df, path, engine, **write_kwargs)
result = pd.read_parquet(path, engine, **read_kwargs)

if expected is None:
expected = df
Expand All @@ -222,7 +226,7 @@ def test_columns_dtypes(self, engine):

# unicode
df.columns = [u'foo', u'bar']
self.check_round_trip(df, engine, compression=None)
self.check_round_trip(df, engine, write_kwargs={'compression': None})

def test_columns_dtypes_invalid(self, engine):

Expand All @@ -246,7 +250,7 @@ def test_columns_dtypes_invalid(self, engine):
def test_write_with_index(self, engine):

df = pd.DataFrame({'A': [1, 2, 3]})
self.check_round_trip(df, engine, compression=None)
self.check_round_trip(df, engine, write_kwargs={'compression': None})

# non-default index
for index in [[2, 3, 4],
Expand Down Expand Up @@ -280,7 +284,8 @@ def test_compression(self, engine, compression):
pytest.importorskip('brotli')

df = pd.DataFrame({'A': [1, 2, 3]})
self.check_round_trip(df, engine, compression=compression)
self.check_round_trip(df, engine,
write_kwargs={'compression': compression})

def test_read_columns(self, engine):
# GH18154
Expand All @@ -289,7 +294,8 @@ def test_read_columns(self, engine):

expected = pd.DataFrame({'string': list('abc')})
self.check_round_trip(df, engine, expected=expected,
compression=None, columns=["string"])
write_kwargs={'compression': None},
read_kwargs={'columns': ['string']})


class TestParquetPyArrow(Base):
Expand Down Expand Up @@ -377,7 +383,7 @@ def test_basic(self, fp):
'timedelta': pd.timedelta_range('1 day', periods=3),
})

self.check_round_trip(df, fp, compression=None)
self.check_round_trip(df, fp, write_kwargs={'compression': None})

@pytest.mark.skip(reason="not supported")
def test_duplicate_columns(self, fp):
Expand All @@ -390,7 +396,8 @@ def test_duplicate_columns(self, fp):
def test_bool_with_none(self, fp):
df = pd.DataFrame({'a': [True, None, False]})
expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16')
self.check_round_trip(df, fp, expected=expected, compression=None)
self.check_round_trip(df, fp, expected=expected,
write_kwargs={'compression': None})

def test_unsupported(self, fp):

Expand All @@ -406,7 +413,7 @@ def test_categorical(self, fp):
if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"):
pytest.skip("CategoricalDtype not supported for older fp")
df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
self.check_round_trip(df, fp, compression=None)
self.check_round_trip(df, fp, write_kwargs={'compression': None})

def test_datetime_tz(self, fp):
# doesn't preserve tz
Expand All @@ -416,4 +423,13 @@ def test_datetime_tz(self, fp):
# warns on the coercion
with catch_warnings(record=True):
self.check_round_trip(df, fp, df.astype('datetime64[ns]'),
compression=None)
write_kwargs={'compression': None})

def test_filter_row_groups(self, fp):
d = {'a': list(range(0, 3))}
df = pd.DataFrame(d)
with tm.ensure_clean() as path:
df.to_parquet(path, fp, compression=None,
row_group_offsets=1)
result = read_parquet(path, fp, filters=[('a', '==', 0)])
assert len(result) == 1