Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] dask dataframe to_parquet fails with missing StringIndex attribute #2637

Closed
beckernick opened this issue Aug 20, 2019 · 1 comment
Closed
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.

Comments

@beckernick
Copy link
Member

beckernick commented Aug 20, 2019

Built from source in a 0.10 nightly development container, calling to_parquet on a dask cudf dataframe fails with the traceback below. PyArrow seems to expect this from Dask, but standard cuDF succeeds.

import cudf
import pandas as pd
import dask_cudf
import dask.dataframe as dddf = cudf.DataFrame({'a':range(50), 'b':range(50)})
pdf = df.to_pandas()
​
ddf = dask_cudf.from_cudf(df, 3)
dpdf = dd.from_pandas(pdf, 3)
​
dpdf.to_parquet('example_parquet1') # works fine
ddf.to_parquet('example_parquet2') # fails
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-13-dacf95e1101c> in <module>
     11 
     12 dpdf.to_parquet('example_parquet1') # works fine
---> 13 ddf.to_parquet('example_parquet2') # fails

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
   3627         from .io import to_parquet
   3628 
-> 3629         return to_parquet(self, path, *args, **kwargs)
   3630 
   3631     @derived_from(pd.DataFrame)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, ignore_divisions, partition_on, storage_options, write_metadata_file, compute, **kwargs)
    481 
    482     if compute:
--> 483         out = out.compute()
    484     return out
    485 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    173         dask.base.compute
    174         """
--> 175         (result,) = compute(self, traverse=False, **kwargs)
    176         return result
    177 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    444     keys = [x.__dask_keys__() for x in collections]
    445     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446     results = schedule(dsk, keys, **kwargs)
    447     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    448 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     80         get_id=_thread_get_id,
     81         pack_exception=pack_exception,
---> 82         **kwargs
     83     )
     84 

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    489                         _execute_task(task, data)  # Re-execute locally
    490                     else:
--> 491                         raise_exception(exc, tb)
    492                 res, worker_id = loads(res_info)
    493                 state["cache"][key] = res

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/compatibility.py in reraise(exc, tb)
    128         if exc.__traceback__ is not tb:
    129             raise exc.with_traceback(tb)
--> 130         raise exc
    131 
    132     import pickle as cPickle

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    231     try:
    232         task, data = loads(task_info)
--> 233         result = _execute_task(task, data)
    234         id = get_id()
    235         result = dumps((result, id))

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/compatibility.py in apply(func, args, kwargs)
    105     def apply(func, args, kwargs=None):
    106         if kwargs:
--> 107             return func(*args, **kwargs)
    108         else:
    109             return func(*args)

/opt/conda/envs/rapids/lib/python3.7/site-packages/dask/dataframe/io/parquet/arrow.py in write_partition(df, path, fs, filename, partition_on, return_metadata, fmd, compression, index_cols, schema, **kwargs)
    410             df = df.set_index(index_cols)
    411             preserve_index = True
--> 412         t = pa.Table.from_pandas(df, preserve_index=preserve_index, schema=schema)
    413         if partition_on:
    414             pq.write_to_dataset(

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table.from_pandas()

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas_compat.py in dataframe_to_arrays(df, schema, preserve_index, nthreads, columns, safe)
    458      columns_to_convert,
    459      convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
--> 460                                                columns)
    461 
    462     # NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _get_columns_to_convert(df, schema, preserve_index, columns)
    331 
    332     index_levels = (
--> 333         _get_index_level_values(df.index) if preserve_index is not False
    334         else []
    335     )

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _get_index_level_values(index)
    404 def _get_index_level_values(index):
    405     n = len(getattr(index, 'levels', [index]))
--> 406     return [index.get_level_values(i) for i in range(n)]
    407 
    408 

/opt/conda/envs/rapids/lib/python3.7/site-packages/pyarrow/pandas_compat.py in <listcomp>(.0)
    404 def _get_index_level_values(index):
    405     n = len(getattr(index, 'levels', [index]))
--> 406     return [index.get_level_values(i) for i in range(n)]
    407 
    408 

AttributeError: 'StringIndex' object has no attribute 'get_level_values'
@beckernick
Copy link
Member Author

This was resolved by #3369 . Closing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.
Projects
None yet
Development

No branches or pull requests

1 participant