Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] cudf.read_parquet takes too much time(due to cudaMallocHost overhead etc.) to load the zstd compressed parquet files with few thousands to millions of rows #15481

Open
pmixer opened this issue Apr 8, 2024 · 5 comments
Labels
bug Something isn't working

Comments

@pmixer
Copy link

pmixer commented Apr 8, 2024

Describe the bug
Performance improvement proposal for cudf parquet file reading efficiency.

Steps/Code to reproduce bug

import pandas as pd

df = pd.DataFrame({'jnac': [None] * 1000})
df.to_parquet('/dev/shm/jnac.parquet', compression='ZSTD')

# cd to /dev/shm now

import cudf
import pandas
import pyarrow.parquet

import time

# not accurate timing, while the diff is so obvious which do not require more accurate timing temporrally

ts = time.time(); tb = cudf.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
time.sleep(1)
ts = time.time(); tb = cudf.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
print(te - ts)

ts = time.time(); tb = pandas.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
time.sleep(1)
ts = time.time(); tb = pandas.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
print(te - ts)

ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jnac.parquet'); te = time.time()
time.sleep(1)
ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jnac.parquet'); te = time.time()
print(te - ts)

Expected behavior

>>> ts = time.time(); tb = cudf.read_parquet('jnac.parquet'); te = time.time()
>>> print(te - ts)
0.006829023361206055
>>>
>>> ts = time.time(); tb = pandas.read_parquet('jnac.parquet'); te = time.time()
>>> time.sleep(1)

>>> ts = time.time(); tb = pandas.read_parquet('jnac.parquet'); te = time.time()
>>> print(te - ts)
0.003950357437133789
>>>
>>> ts = time.time(); tb = pyarrow.parquet.read_table('jnac.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pyarrow.parquet.read_table('jnac.parquet'); te = time.time()
>>> print(te - ts)
0.0013420581817626953
>>>

Environment overview (please complete the following information)
internal T4 node, py3.9, cudf 24.02.02

Additional context

It just takes too much time to process entries, especially for cudf when num rows is just 1K(similar latency cost for 10M rows NA though).

@pmixer pmixer added the bug Something isn't working label Apr 8, 2024
@pmixer
Copy link
Author

pmixer commented Apr 8, 2024

cudf-na-issue
read_jnac_parquet_and_nsys_file.zip
get to know what's happening under the hood, gpu kernel trace very sparse, but CUDA API calls takes so long...

@pmixer
Copy link
Author

pmixer commented Apr 9, 2024

There might be some tricks to avoid the long time 1st round run cudaHostAlloc, which I haven't figured out yet, code as below may only handle gpu side mem pre-alloc.

import rmm

# rmm.reinitialize(pool_allocator=True, initial_pool_size= 4 * 10 ** 9)

@pmixer
Copy link
Author

pmixer commented Apr 9, 2024

I also tried 1M rows all (same) integers and 1M rows all (same) string column, cudf.read_parquet still suffering the perf issue, very likely due to the long cudaMallocHost call.

df = pandas.DataFrame({'j2333c': [2333] * 1000000})
df.to_parquet('/dev/shm/j2333c.parquet', compression='ZSTD')

>>> import cudf
>>> import pandas
>>> import pyarrow.parquet
>>> 
>>> import time
>>> 
>>> # not accurate timing, while the diff is so obvious which do not require more accurate timing temporrally
>>> 
>>> ts = time.time(); tb = cudf.read_parquet('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = cudf.read_parquet('j2333c.parquet'); te = time.time()
>>> print(te - ts)
0.08919477462768555
>>> 
>>> ts = time.time(); tb = pandas.read_parquet('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pandas.read_parquet('j2333c.parquet'); te = time.time()
>>> print(te - ts)
0.026215314865112305
>>> 
>>> ts = time.time(); tb = pyarrow.parquet.read_table('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pyarrow.parquet.read_table('j2333c.parquet'); te = time.time()
>>> print(te - ts)
0.014030933380126953


>>> import cudf
>>> import pandas
>>> import pyarrow.parquet
>>> 
>>> import time
>>> 
>>> import rmm
>>> 
>>> rmm.reinitialize(pool_allocator=True, initial_pool_size= 4 * 10 ** 9)
>>> 
>>> # not accurate timing, while the diff is so obvious which do not require more accurate timing temporrally
>>> 
>>> ts = time.time(); tb = cudf.read_parquet('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = cudf.read_parquet('j2333c.parquet'); te = time.time()

>>> print(te - ts)
0.08475613594055176
>>> 
>>> ts = time.time(); tb = pandas.read_parquet('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pandas.read_parquet('j2333c.parquet'); te = time.time()
>>> print(te - ts)
0.025774002075195312
>>> 
>>> ts = time.time(); tb = pyarrow.parquet.read_table('j2333c.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pyarrow.parquet.read_table('j2333c.parquet'); te = time.time()
>>> print(te - ts)
0.011544227600097656

df = pandas.DataFrame({'jstrc', ['2333'] * 1000000})
df.to_parquet('/dev/shm/jstrc.parquet', compression='ZSTD')

>>> import cudf
>>> import pandas
>>> import pyarrow.parquet
>>> 
>>> import time
>>> 
>>> import rmm
>>> 
>>> # rmm.reinitialize(pool_allocator=True, initial_pool_size= 4 * 10 ** 9)
>>> 
>>> # not accurate timing, while the diff is so obvious which do not require more accurate timing temporrally
>>> 
>>> ts = time.time(); tb = cudf.read_parquet('/dev/shm/jstrc.parquet'); te = time.time()
>>> time.sleep(1)

>>> ts = time.time(); tb = cudf.read_parquet('/dev/shm/jstrc.parquet'); te = time.time()
>>> print(te - ts)
0.08581995964050293
>>> 
>>> ts = time.time(); tb = pandas.read_parquet('/dev/shm/jstrc.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pandas.read_parquet('/dev/shm/jstrc.parquet'); te = time.time()
>>> print(te - ts)
0.057205915451049805
>>> 
>>> ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jstrc.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jstrc.parquet'); te = time.time()
>>> print(te - ts)
0.022694826126098633

@pmixer
Copy link
Author

pmixer commented Apr 9, 2024

Well, as GPUs are throughput machine, if increasing rows num from millions to billions, the advantages got well shown:

>>> import cudf
>>> import pandas as pd
>>> 
>>> df = pd.DataFrame({'jnac': [None] * 1000000000})
>>> df.to_parquet('/dev/shm/jnac.parquet', compression='ZSTD')
>>> import cudf
>>> import pandas
>>> import pyarrow.parquet
>>> 
>>> import time
>>> 
>>> # not accurate timing, while the diff is so obvious which do not require more accurate timing temporrally
>>> 
>>> ts = time.time(); tb = cudf.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = cudf.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
>>> print(te - ts)
0.15029525756835938
>>> 
>>> ts = time.time(); tb = pandas.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pandas.read_parquet('/dev/shm/jnac.parquet'); te = time.time()
>>> print(te - ts)
7.30379843711853
>>> 
>>> ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jnac.parquet'); te = time.time()
>>> time.sleep(1)
>>> ts = time.time(); tb = pyarrow.parquet.read_table('/dev/shm/jnac.parquet'); te = time.time()
>>> print(te - ts)
1.51247239112854
>>> 

so, now the major problem is how to resolve the issue for millions scale row num chunked tables.

@pmixer pmixer changed the title [BUG] cudf.read_parquet takes too much time to load the zstd compressed all <NA> column parquet file [BUG] cudf.read_parquet takes too much time(due to cudaMallocHost overhead etc.) to load the zstd compressed parquet files with few thousands to millions of rows Apr 9, 2024
@etseidl
Copy link
Contributor

etseidl commented Apr 16, 2024

TL;DR There is a fixed overhead to using CUDA and cuDF, so very small files are not going to show any improvement, but you can cut down on the startup overhead some, and there are other ways to improve efficiency some.

I'll show some example traces on my RTX A6000 using the 1M integer example from above. First the output:

% python3 jnac.py 
0.00894021987915039
0.005298137664794922
0.0025205612182617188

and the associated trace
Screenshot from 2024-04-16 10-22-41
The area highlighted in green is the actual decode time and is around 21ms. You can see the time is dominated by CUDA initialization (54ms) and setup involved in using kvikio/cuFile (187ms).

The second read_parquet call is then much faster (9ms), and is dominated by the decode kernel (6.5ms).
Screenshot from 2024-04-16 10-25-18

Since the file is so small, we can skip using cuFile by setting the LIBCUDF_CUFILE_POLICY envvar to OFF. This has no impact on the measured read time, but greatly reduces the setup time.

% env LIBCUDF_CUFILE_POLICY=OFF python3 jnac.py 
0.009036540985107422
0.007204532623291016
0.0022492408752441406

Screenshot from 2024-04-16 10-29-08

Another thing to notice from the above is that pandas is writing all 1M rows into a single page. But libcudf parallelizes parquet reads at the page level, so to see any improvement you'll want more pages. Parquet-mr and libcudf default to 20000 rows max per page, so using cudf to write the initial file has a measurable impact for the 1M row case.

df = pd.DataFrame({'jnac': [2333] * 1000000})
#df.to_parquet(fname, compression='ZSTD')
cdf = cudf.from_pandas(df)
cdf.to_parquet(fname, compression='ZSTD')
% env LIBCUDF_CUFILE_POLICY=OFF python3 jnac.py
0.002679586410522461
0.005636692047119141
0.0023217201232910156

Screenshot from 2024-04-16 10-31-34

Now the to_parquet() call is bearing the price of cuda initialization. The decode time (again in highlighted in green) has gone from 20ms to around 11, and the decode kernel has gone from a single threadblock and 6.5ms to 50 threadblocks and 140us. And the second read_parquet is now down to around 2.6ms.

In summary, we're seeing a pretty fixed cost of 50ms for CUDA setup, 190ms for cuFile setup, and around 10ms for cuDF setup (there are some buffer initializations and a stream pool to set up). Python adds its own latency too, and there's the actual file I/O to take into account. So the minimum time you'll see for a single file read is going to be something over 60ms. If you can read files in batches and amortize the startup penalty, you'll still only see performance on par with arrow for such small files. As you've discovered already, to really see the benefit you need files that are large enough to move the bottleneck from setup/IO to the actual compute kernels that will show the parallelization benefit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: In Progress
Development

No branches or pull requests

2 participants