Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance degradation in scan_parquet() vs. scan_pyarrow_dataset() #13908

Closed
2 tasks done
lmocsi opened this issue Jan 22, 2024 · 25 comments · Fixed by #14244
Closed
2 tasks done

Performance degradation in scan_parquet() vs. scan_pyarrow_dataset() #13908

lmocsi opened this issue Jan 22, 2024 · 25 comments · Fixed by #14244
Labels
A-interop-arrow Area: interoperability with other Arrow implementations (such as pyarrow) A-io-parquet Area: reading/writing Parquet files bug Something isn't working P-medium Priority: medium python Related to Python Polars

Comments

@lmocsi
Copy link

lmocsi commented Jan 22, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

!pip install --upgrade polars==0.20.3
!pip install --upgrade pyarrow==15.0.0

import polars as pl
import pyarrow.dataset as ds
tr = pl.scan_pyarrow_dataset(ds.dataset(parq_path+"my_table", partitioning='hive')) # Wall time: 6.14 s
#tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet") # Wall time: 1min 59s

df = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-07-21'), pl.lit('2024-01-22'))) &
                     (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
         )

Log output

No log.

Issue description

The above code with scan_pyarrow_dataset() runs in about 6 second.
The same code with scan_parquet() runs in 1 min 52 secs!!!
But both of them does this in less than 1.5 GB of RAM.

If I upgrade polars to the current 0.20.5 version, then the scan_pyarrow_dataset() way runs out of 32 GB memory, as if it was not able to filter on partition column of CALENDAR_DATE.
The scan_parquet() version runs for roughly the same time as it does on 0.20.3 polars.

Expected behavior

Scan_parquet() should complete in about the same time as scan_pyarrow_dateset() does.
Scan_pyarrow_dataset() should be able to filter on partition column on 0.20.5 polars, as well.

Installed versions

--------Version info---------
Polars:               0.20.3
Index type:           UInt32
Platform:             Linux-4.18.0-372.71.1.el8_6.x86_64-x86_64-with-glibc2.28
Python:               3.9.13 (main, Oct 13 2022, 21:15:33) 
[GCC 11.2.0]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          2.0.0
connectorx:           <not installed>
deltalake:            <not installed>
fsspec:               2022.02.0
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           3.8.0
numpy:                1.23.5
openpyxl:             3.0.9
pandas:               2.1.0
pyarrow:              15.0.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           1.4.27
xlsx2csv:             <not installed>
xlsxwriter:           3.1.3
@lmocsi lmocsi added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jan 22, 2024
@ritchie46
Copy link
Member

Can you make a reproducable example? There is nothing here that we can test.

@lmocsi
Copy link
Author

lmocsi commented Jan 22, 2024

I guess I cannot create / upload several GB-s of partitioned parquet file here...

@deanm0000
Copy link
Collaborator

Run the examples with the trace log on please. Which column(s) are part of the hive partition?

@lmocsi
Copy link
Author

lmocsi commented Jan 23, 2024

Partitioned only on calendar_date.
How do I turn on trace logging?

@ritchie46
Copy link
Member

I guess I cannot create / upload several GB-s of partitioned parquet file here...

You can create dummy data in your script. And you likely don't need several GB to you show the effect. Please make an effort to produce something we can reproduce if you want it fixed.

@deanm0000
Copy link
Collaborator

deanm0000 commented Jan 23, 2024

before your import polars line, run these, then do everything else

import os
os.environ['POLARS_VERBOSE']='1'
os.environ["RUST_BACKTRACE"]="1"

You probably don't need the rust one here but just for reference.

@stinodego stinodego added the A-io-parquet Area: reading/writing Parquet files label Jan 23, 2024
@lmocsi
Copy link
Author

lmocsi commented Jan 23, 2024

With logging turned on, I cannot see more details:

Traceback (most recent call last):
File "/tmp/1000920000/ipykernel_1125/610731593.py", line 2, in
df = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-07-21'), pl.lit('2024-01-22')) &
File "/opt/conda/envs/Python-3.9-Premium/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1730, in collect
return wrap_df(ldf.collect())
polars.exceptions.ComputeError: ArrowMemoryError: malloc of size 2490368 failed

@deanm0000
Copy link
Collaborator

Are you saying that before it was just slow but now it's generating an error?

@lmocsi
Copy link
Author

lmocsi commented Jan 23, 2024

Scan_parquet() was slow in 0.20.3 and is slow in 0.20.4/0.20.5
Scan_pyarrow_dataset() was fast in 0.20.3 and is running out of memory in 0.20.4/0.20.5 (as if unable to filter on hive-partitioned columns)

@lmocsi
Copy link
Author

lmocsi commented Jan 24, 2024

Weird thing is that with synthetic data it is vica versa:

  • scan_pyarrow_dataset is running the same query for 43 seconds, while
  • scan_parquet completes in 3.9 seconds...

@ion-elgreco
Copy link
Contributor

@lmocsi can you share the plan with .explain() with older and new Polars version?

@lmocsi
Copy link
Author

lmocsi commented Jan 29, 2024

With a synthetic dataset (created for 100000 customers, see here: apache/arrow#39768):
Polars==0.20.6 + scan_pyarrow_dataset:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]
        FILTER [(col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])) & ([(col("CREDIT_FL")) == (String(Y))])] FROM
          PYTHON SCAN 
          PROJECT 3/4 COLUMNS

df.shape: (100_000, 2)
Max memory occupation: ~13 GB
Final memory occupation: 9.7 GB


Polars==0.20.6 + scan_parquet:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 186 files: first file: my_table\\CALENDAR_DATE=2023-07-21%2000%3A00%3A00\\part-0.parquet
          PROJECT 3/4 COLUMNS
          SELECTION: [(col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])) & ([(col("CREDIT_FL")) == (String(Y))])]

df.shape: (100_000, 2)
Max memory occupation: ~4.9 GB
Final memory occupation: 1.7 GB


Polars==0.20.3 + scan_pyarrow_dataset:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          PYTHON SCAN 
          PROJECT 3/4 COLUMNS
          SELECTION: (((pa.compute.field(\'CALENDAR_DATE\') >= \'2023-07-21\') & (pa.compute.field(\'CALENDAR_DATE\') <= \'2024-01-22\')) & (pa.compute.field(\'CREDIT_FL\') == \'Y\'))

df.shape: (100_000, 2)
Max memory occupation: ~10.1 GB
Final memory occupation: 7 GB


Polars==0.20.3 + scan_parquet:

Explain:
 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 185 files: first file: my_table\\CALENDAR_DATE=2023-07-21%2000%3A00%3A00\\part-0.parquet
          PROJECT 3/4 COLUMNS
          SELECTION: [([([(col("CALENDAR_DATE")) >= (String(2023-07-21))]) & ([(col("CALENDAR_DATE")) <= (String(2024-01-22))])]) & ([(col("CREDIT_FL")) == (String(Y))])]

df.shape: (100_000, 2)
Max memory occupation: ~6.4 GB
Final memory occupation: 6.4 GB

@lmocsi
Copy link
Author

lmocsi commented Jan 31, 2024

With the real dataset:
Polars==0.20.6 + scan_pyarrow_dataset:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]
        FILTER col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)]) FROM


          PYTHON SCAN 
          PROJECT 2/27 COLUMNS

df.shape: -
Max memory occupation: <runs out of 32 GB>
Final memory occupation: <runs out of 32 GB>
Wall time: -


Polars==0.20.6 + scan_parquet:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 1846 files: first file: /mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-ef01-4535-a3df-f9e09af6c12b.c000.snappy.parquet
          PROJECT 2/27 COLUMNS
          SELECTION: col("CALENDAR_DATE").is_between([String(2023-07-21), String(2024-01-22)])

df.shape: (1_075_661, 2)
Max memory occupation: 5.1 GB
Final memory occupation: 2.2 GB
Wall time: 53 s


Polars==0.20.3 + scan_pyarrow_dataset:

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          PYTHON SCAN 
          PROJECT 2/27 COLUMNS
          SELECTION: ((pa.compute.field(\'CALENDAR_DATE\') >= \'2023-07-21\') & (pa.compute.field(\'CALENDAR_DATE\') <= \'2024-01-22\'))

df.shape: (1_075_661, 2)
Max memory occupation: 13.1 GB
Final memory occupation: 2.4 GB
Wall time: 14.1 s


Polars==0.20.3 + scan_parquet:

<no plan here since df.explain() is running for more than 1h 43 minutes!!!>

df.shape: (1_075_661, 2)
Max memory occupation: 11.9 GB
Final memory occupation: 2.2 GB
Wall time: 14.9 s

Lots of warnings here:
First lots of lines like this:
"parquet file can be skipped, the statistics were sufficient to apply the predicate."

Then lots of lines like this:
"parquet file must be read, statistics not sufficient for predicate."

Then mixed lines like:
"parquet file must be read, statistics not sufficient for predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate."

And finally:
"hive partitioning: skipped 1661 files, first file : /mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-ef01-4535-a3df-f9e09af6c12b.c000.snappy.parquet"

@ion-elgreco
Copy link
Contributor

ion-elgreco commented Jan 31, 2024

@lmocsi it looks like the filter in scan_pyarrow_dataset is not pushed down to pyarrow, the selection should show the pyarrow.compute expressions in v0.20.6 but it doesn't.

@ritchie46 seems like there is no more filter pushdown to pyarrow dataset and the filter only happens in polars after everything is loaded into memory

@deanm0000
Copy link
Collaborator

Is your calendar date stored as a string and not date?

@lmocsi
Copy link
Author

lmocsi commented Jan 31, 2024

Calendar date is returned as a string column, since that is the partition column.
Files look like this in this partitioned parquet file/table:
/mypath/mytable/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00-35c18ead-ef01.c000.snappy.parquet

@deanm0000
Copy link
Collaborator

if you do scan_parquet on just the one file, does it give you a column with

2019-01-01 00%3A00%3A00 or 2019-01-01 00:00:00

Try doing something like:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")

df = (
    pl.select(
        CALENDAR_DATE=pl.date_range(pl.date(2023,7,21),pl.date(2024,1,11),'1d').dt.strftime("%Y-%m-%d") + "%2000%3A00%3A00"
        ).lazy()
    .join(tr, on='CALENDAR_DATE')
    .filter(         (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
)

@lmocsi
Copy link
Author

lmocsi commented Feb 1, 2024

<polars==0.20.6>

This:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")
tr.limit(1).select(pl.col('CALENDAR_DATE')).collect().rows()

Gives me:
[('2019-01-01 00:00:00',)]

But the actual directory looks like this:
CALENDAR_DATE=2019-01-01 00%3A00%3A00
So there must be some conversion somewhere in between...

The above join statement leads to kernel-die somewhere above 17 GB of ram usage (in a 32 GB environment).

@deanm0000
Copy link
Collaborator

What about...

df = (
    tr
    .filter(         (pl.col('CALENDAR_DATE').is_in(
    pl.select(
        CALENDAR_DATE=pl.date_range(pl.date(2023,7,21),pl.date(2024,1,11),'1d').dt.strftime("%Y-%m-%d") + " 00:00:00"
        ).to_series().to_list()
) & (pl.col('CRE_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('LE_ID') == 2905) &
                     (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('TR_ID') != 7010)
                     )
             .select('PARTY_ID').unique()
             .rename({'PARTY_ID':'PART_ID'})
             .with_columns(pl.lit(1).alias('NEXT_FL'))
             .collect()
)

Ultimately, I think there's a good amount of potential improvement to do in the polars hive partitioning optimization workhorse. While the current state isn't as robust as the pyarrow's hive dataset scanner, I think the effort is going to be in making those aforementioned improvements rather than strengthening those pyarrow linkages.

As such, assuming the last snippet doesn't work, I think you probably want to just do more of the filtering directly in pyarrow.

So something like

df = (
    pl.scan_pyarrow_dataset(
        ds.dataset(parq_path+"my_table", partitioning='hive')
        .filter(
            (ds.field('CALENDAR_DATE')>='2023-07-21') &
            (ds.field('CALENDAR_DATE')<='2024-01-22') &
            (ds.field('CRE_FL') == 'I') &
            (ds.field('SEC_ID') > 3) &
            (ds.field('LE_ID') == 2905) &
            (~ds.field('PARTY_ID').isin([5086634, 2149316, 6031676])) &
            (ds.field('TR_ID') != 7010)
        )
    )
    .select('PARTY_ID').unique()
    .rename({'PARTY_ID':'PART_ID'})
    .with_columns(pl.lit(1).alias('NEXT_FL'))
    .collect()
    )

@deanm0000 deanm0000 added needs triage Awaiting prioritization by a maintainer P-medium Priority: medium A-interop-arrow Area: interoperability with other Arrow implementations (such as pyarrow) and removed needs triage Awaiting prioritization by a maintainer labels Feb 1, 2024
@lmocsi
Copy link
Author

lmocsi commented Feb 2, 2024

The .to_series().to_list() version runs fast using scan_parquet():

<polars==0.20.6>

 WITH_COLUMNS:
 [1.alias("NEXT_FL")]
  RENAME
    UNIQUE BY None
      FAST_PROJECT: [PARTY_ID]

          Parquet SCAN 175 files: first file: /mypath/myfile/CALENDAR_DATE=2023-07-21 00%3A00%3A00/part-00000-015dea05.c000.snappy.parquet
          PROJECT 3/27 COLUMNS
          SELECTION: [(col("CALENDAR_DATE").is_in([Series])) & ([(col("CRE_FL")) == (String(I))])]

Max memory occupation: 2.7 GB
Final memory occupation: 2.1 GB
Wall time: 15.1 s

But it is cumbersome to filter like that... :(

@deanm0000
Copy link
Collaborator

yeah it only seems to know how to skip files with exact matches otherwise it wants to read the file.

It also only works if all the files are 00:00:00, if you've got some 00:12:00, etc then it'd miss those entirely.

Another thing you can do, which isn't any less code but it is more resilient:

tr = pl.scan_parquet(parq_path+"my_table/*/*.parquet")
import json
df=(
    pl.scan_parquet(
        pl.select(paths=json.loads(tr.serialize())['Scan']['paths'])
        .explode('paths')
        .with_columns(
            CALENDAR_DATE=pl.col('paths').str.extract("CALENDAR_DATE=(.+?)/")
            .str.replace_all("%3A",":")
            .str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S"),
            )
        .filter(pl.col('CALENDAR_DATE').is_between(
            pl.datetime(2023,7,21), pl.datetime(2024,1,22)
            ))
        .get_column('paths')
        .to_list()
    )
    .filter(
        (pl.col('CRE_FL') == 'I') &
        (pl.col('SEC_ID') > 3) &
        (pl.col('LE_ID') == 2905) &
        (~pl.col('PARTY_ID').is_in([5086634, 2149316, 6031676])) &
        (pl.col('TR_ID') != 7010)
        )
    .select('PARTY_ID').unique()
    .rename({'PARTY_ID':'PART_ID'})
    .with_columns(pl.lit(1).alias('NEXT_FL'))
    .collect()
    )

In this way you do a first scan just to get the file list then you use have to use serialize and json.loads to extract the file list and use regex to extract your CALENDAR_DATE and strptime it to Datetime which you can then filter. That gives you the file list that satisfies your date filter and then do another scan which only includes those files.

@ion-elgreco
Copy link
Contributor

@deanm0000 these are all workarounds though. Polars should be able to parse those characters properly

@deanm0000
Copy link
Collaborator

@deanm0000 these are all workarounds though. Polars should be able to parse those characters properly

agreed, I'm not saying otherwise.

@deanm0000 deanm0000 removed the needs triage Awaiting prioritization by a maintainer label Feb 2, 2024
@deanm0000
Copy link
Collaborator

deanm0000 commented Feb 2, 2024

I think what it needs would go here...

match function {
FunctionExpr::Boolean(BooleanFunction::IsNull) => {
let root = expr_to_leaf_column_name(&self.expr)?;
match stats.get_stats(&root).ok() {
Some(st) => match st.null_count() {
Some(0) => Ok(false),
_ => Ok(true),
},
None => Ok(true),
}
},
#[cfg(feature = "is_in")]
FunctionExpr::Boolean(BooleanFunction::IsIn) => {
let should_read = || -> Option<bool> {
let root = expr_to_leaf_column_name(&input[0]).ok()?;
let Expr::Literal(LiteralValue::Series(input)) = &input[1] else {
return None;
};
#[allow(clippy::explicit_auto_deref)]
let input: &Series = &**input;
let st = stats.get_stats(&root).ok()?;
let min = st.to_min()?;
let max = st.to_max()?;
if max.get(0).unwrap() == min.get(0).unwrap() {
let one_equals =
|value: &Series| Some(ChunkCompare::equal(input, value).ok()?.any());
return one_equals(min);
}
let smaller = ChunkCompare::lt(input, min).ok()?;
let bigger = ChunkCompare::gt(input, max).ok()?;
Some(!(smaller | bigger).all())
};
Ok(should_read().unwrap_or(true))
},
_ => Ok(true),

We need another match on FunctionExpr::Boolean(BooleanFunction::IsBetween)

@deanm0000
Copy link
Collaborator

@ritchie46 see my previous comment. I think that's the fix but I don't know how to implement it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-interop-arrow Area: interoperability with other Arrow implementations (such as pyarrow) A-io-parquet Area: reading/writing Parquet files bug Something isn't working P-medium Priority: medium python Related to Python Polars
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

5 participants