Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] dask_cudf.read_parquet is unable to read a list of parquet files from a remote cloud bucket. #8319

Closed
akaanirban opened this issue May 21, 2021 · 4 comments
Labels
bug Something isn't working

Comments

@akaanirban
Copy link

Describe the bug
dask_cudf.read_parquet is unable to read a list of parquet files from a remote bucket, even though it is able to acquire the list of files from the bucket.

Steps/Code to reproduce bug

from dask_cuda import LocalCUDACluster
import dask_cudf
import dask
from dask.distributed import Client, wait

cluster = LocalCUDACluster(ip="")
client = Client(cluster)

workers = client.has_what().keys()
file = 'gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.10*.parquet'
df = dask_cudf.read_parquet(file, npartitions=len(workers), chunksize=25e6)
df.head()

Throws an FileNotFoundError as it is trying to read from local directory:

---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-20-c42a15b2c7cf> in <module>
----> 1 df.head()

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
   1047             Whether to compute the result, default is True.
   1048         """
-> 1049         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
   1050 
   1051     def _head(self, n, npartitions, compute, safe):

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
   1080 
   1081         if compute:
-> 1082             result = result.compute()
   1083         return result
   1084 

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    282         dask.base.compute
    283         """
--> 284         (result,) = compute(self, traverse=False, **kwargs)
    285         return result
    286 

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    564         postcomputes.append(x.__dask_postcompute__())
    565 
--> 566     results = schedule(dsk, keys, **kwargs)
    567     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    568 

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2664                     should_rejoin = False
   2665             try:
-> 2666                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2667             finally:
   2668                 for f in futures.values():

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1973             else:
   1974                 local_worker = None
-> 1975             return self.sync(
   1976                 self._gather,
   1977                 futures,

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    841             return future
    842         else:
--> 843             return sync(
    844                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    845             )

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    351     if error[0]:
    352         typ, exc, tb = error[0]
--> 353         raise exc.with_traceback(tb)
    354     else:
    355         return result[0]

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/utils.py in f()
    334             if callback_timeout is not None:
    335                 future = asyncio.wait_for(future, callback_timeout)
--> 336             result[0] = yield future
    337         except Exception as exc:
    338             error[0] = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/opt/conda/envs/rapids/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1838                             exc = CancelledError(key)
   1839                         else:
-> 1840                             raise exception.with_traceback(traceback)
   1841                         raise exc
   1842                     if errors == "skip":

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part()
    379 
    380     if isinstance(part, list):
--> 381         dfs = [
    382             func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    383             for (rg, kw) in part

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>()
    380     if isinstance(part, list):
    381         dfs = [
--> 382             func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
    383             for (rg, kw) in part
    384         ]

/opt/conda/envs/rapids/lib/python3.8/site-packages/dask_cudf/io/parquet.py in read_partition()
     69         else:
     70             with fs.open(path, mode="rb") as f:
---> 71                 df = cudf.read_parquet(
     72                     f,
     73                     engine="cudf",

/opt/conda/envs/rapids/lib/python3.8/site-packages/cudf/io/parquet.py in read_parquet()
    249 
    250     if engine == "cudf":
--> 251         return libparquet.read_parquet(
    252             filepaths_or_buffers,
    253             columns=columns,

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/parquet.pyx in cudf._lib.parquet.read_parquet()

cudf/_lib/io/utils.pyx in cudf._lib.io.utils.make_source_info()

FileNotFoundError: [Errno 2] No such file or directory: ['anaconda-public-data/nyc-taxi/nyc.parquet/part.10.parquet']

Expected behavior
This should behave like dask_cudf.read_csv, which seems to work:

workers = client.has_what().keys()
file = 'gs://anaconda-public-data/nyc-taxi/csv/2014/yellow_tripdata_2014-1*csv'
df = dask_cudf.read_csv(file, npartitions=len(workers), chunksize=25e6)
df.head()
vendor_id | pickup_datetime | dropoff_datetime | passenger_count | trip_distance | pickup_longitude | pickup_latitude | rate_code | store_and_fwd_flag | dropoff_longitude | dropoff_latitude | payment_type | fare_amount | surcharge
CMT | 2014-10-01 08:55:07 | 2014-10-01 09:11:03 | 1 | 2.2 | -74.005867 | 40.737570 | 1 | Y | -74.015534 | 40.708277 | CRD | 12.0 | 0.0 | 0.5 | 1.0 | 0.0 | 13.5
CMT | 2014-10-01 10:51:17 | 2014-10-01 11:26:11 | 1 | 15.7 | -73.873193 | 40.774056 | 1 | Y | -73.999846 | 40.631132 | CRD | 45.5 | 0.0 | 0.5 | 9.2 | 0.0 | 55.2
CMT | 2014-10-01 02:03:03 | 2014-10-01 02:06:55 | 1 | 1.0 | 0.000000 | 0.000000 | 1 | N | 0.000000 | 0.000000 | CRD | 5.0 | 0.5 | 0.5 | 1.0 | 0.0 | 7.0
CMT | 2014-10-01 00:06:35 | 2014-10-01 00:17:05 | 2 | 2.5 | -73.987151 | 40.732922 | 1 | N | -73.991831 | 40.758148 | CRD | 10.0 | 0.5 | 0.5 | 2.2 | 0.0 | 13.2
CMT | 2014-10-01 01:34:13 | 2014-10-01 01:47:02 | 1 | 4.2 | -73.983267 | 40.726577 | 1 | N | -73.937556 | 40.716380 | CRD | 15.0

dask.dataframe.read_parquet also works.

Environment overview (please complete the following information)

  • Environment location: Docker, Cloud (Azure) using dask-kubernetes
  • Method of cuDF install: [conda, Docker, or from source]
    • If method of install is [Docker], provide docker pull & docker run commands used
    • Tested with rapidsai/rapidsai:0.19-cuda11.2-runtime-ubuntu18.04-py3.8 image.

Environment details
Cudf versions in both the pods and the local client from conda list|grep cudf the following:

cudf                      0.19.2          cuda_11.2_py38_gab3b3f653a_0    rapidsai
cudf_kafka                0.19.2          py38_gab3b3f653a_0    rapidsai
dask-cudf                 0.19.2          py38_gab3b3f653a_0    rapidsai
libcudf                   0.19.2          cuda11.2_gab3b3f653a_0    rapidsai
libcudf_kafka             0.19.2            gab3b3f653a_0    rapidsai

Additional context
It should read the following files in the remote bucket due to the wildcard :

gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.10.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.100.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.101.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.102.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.103.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.104.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.105.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.106.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.107.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.108.parquet
gs://anaconda-public-data/nyc-taxi/nyc.parquet/part.109.parquet
@akaanirban akaanirban added Needs Triage Need team to review and classify bug Something isn't working labels May 21, 2021
@github-actions github-actions bot added this to Needs prioritizing in Bug Squashing May 21, 2021
@ayushdg
Copy link
Member

ayushdg commented May 22, 2021

Could you share the fsspec version in the conda environment as I suspect it might be related to #8275.

@akaanirban
Copy link
Author

This is the output I get:

(rapids) root@cf56b55ceac7:~# conda list |grep fsspec
fsspec                    2021.5.0                 pypi_0    pypi

It appears that the vanilla container image rapidsai/rapidsai:0.19-cuda11.2-runtime-ubuntu18.04-py3.8 uses fsspec 2021.4.0. fsspec must have been updated to 2021.5.0 by something I installed while running the jupyter notebooks. My bad.

But yes, #8275 looks like exactly what is happening.

@akaanirban
Copy link
Author

akaanirban commented May 24, 2021

Update: Installing the dask/gcfcs library installs the 2021.5.0 version, and has as a prerequisite fsspec==2021.5.0 . The latter is causing the system to try and read from a local directory.

I have forcibly installed fsspec==2021.4.0 everywhere, including the containers, and that seems to be working.

@kkraus14
Copy link
Collaborator

Fixed by #8275

Bug Squashing automation moved this from Needs prioritizing to Closed May 24, 2021
@bdice bdice removed the Needs Triage Need team to review and classify label Mar 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
No open projects
Development

No branches or pull requests

4 participants