Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hive partitioning predicate isn't applied before reading #17045

Open
lmocsi opened this issue Jun 18, 2024 · 11 comments
Open

hive partitioning predicate isn't applied before reading #17045

lmocsi opened this issue Jun 18, 2024 · 11 comments
Labels
A-io-partitioning Area: reading/writing (Hive) partitioned files enhancement New feature or an improvement of an existing feature performance Performance issues or improvements

Comments

@lmocsi
Copy link

lmocsi commented Jun 18, 2024

Description

We have a hive partitioned parquet file, partitioned on CALENDAR_DATE.
If I read it with scan_pyarrow_dataset, it completes in 8 seconds.
If I read it with scan_parquet, it completes in 2 mins 10 seconds.

import polars as pl
import pyarrow.dataset as ds

parq_path = '/my_path/'
ext = '/**/*.parquet'

tr = pl.scan_pyarrow_dataset(ds.dataset(parq_path+"my_transaction", partitioning='hive'))
df2 = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-12-17'), pl.lit('2024-06-18'))) &
                     (pl.col('CRED_FL') == 'I') &
                     (pl.col('SEC_ID') > 3) &
                     (pl.col('AD_LE_ID') == 2905) &
                     (~pl.col('PART_ID').is_in([5086634, 2149316, 6031676])) &
                     (pl.col('AD_TR_ID') != 7010)
                     )
             .select('PART_ID').unique()
             .rename({'PART_ID':'PARTY_ID'})
             .with_columns(pl.lit(1).alias('LU_NEXT_FL'))
             .collect()
         )

Runs in 8 seconds with the following explain plan:

 WITH_COLUMNS:
 [dyn int: 1.alias("LU_NEXT_FL")] 
  RENAME
    UNIQUE[maintain_order: false, keep_strategy: Any] BY None
      simple π 1/6 ["PART_ID"]
          PYTHON SCAN 
          PROJECT 6/27 COLUMNS
          SELECTION: [([([([([([(col("AD_LE_ID")) == (2905)]) & ([(col("CRED_FL")) == (String(I))])]) & ([(col("AD_TR_ID")) != (7010)])]) & (col("PART_ID").is_in([Series]).not())]) & (col("CALENDAR_DATE").is_between([String(2023-12-17), String(2024-06-18)]))]) & ([(col("SEC_ID")) > (3)])]

But if I access the data source like this:

tr = pl.scan_parquet(parq_path+"my_transaction"+ext)

Then it runs in 2 mins 10 seconds with the following explain plan:

 WITH_COLUMNS:
 [dyn int: 1.alias("LU_NEXT_FL")] 
  RENAME
    UNIQUE[maintain_order: false, keep_strategy: Any] BY None
      simple π 1/6 ["PART_ID"]
          Parquet SCAN 1984 files: first file: /mypath/my_transaction/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-ef01-4535-a3df-f9e09af6c12b.c000.snappy.parquet
          PROJECT 6/27 COLUMNS
          SELECTION: [([([([([(col("CALENDAR_DATE").is_between([String(2023-12-17), String(2024-06-18)])) & ([(col("AD_TR_ID")) != (7010)])]) & ([(col("AD_LE_ID")) == (2905)])]) & ([(col("CRED_FL")) == (String(I))])]) & ([(col("SEC_ID")) > (3)])]) & (col("PART_ID").is_in([Series]).not())]

Can it be, that the order of the filtering criteria determines if partition filtering will be used or not?
Would be nice if I could use scan_parquet instead of scan_pyarrow_dataset.
The performance difference is there with older versions (eg. 0.20.3, 0.20.31), as well.

@lmocsi lmocsi added the enhancement New feature or an improvement of an existing feature label Jun 18, 2024
@alexander-beedie alexander-beedie added the performance Performance issues or improvements label Jun 18, 2024
@ritchie46
Copy link
Member

Can it be, that the order of the filtering criteria determines if partition filtering will be used or not?

I think this is the case. We should keep hive partitioned predicates separate.

@nameexhaustion another data point.

@ritchie46 ritchie46 changed the title Scan_pyarrow_dataset is still 16x faster than scan_parquet in 1.0.0b1 Scan_pyarrow_dataset is 16x faster than scan_parquet in 1.0.0b1 when reading hive partitioned dataset Jun 18, 2024
@ritchie46 ritchie46 changed the title Scan_pyarrow_dataset is 16x faster than scan_parquet in 1.0.0b1 when reading hive partitioned dataset hive partitioning predicate isn't applied before reading Jun 18, 2024
@ritchie46
Copy link
Member

Hmm.. We did some checks and we do apply predicate pushdown based on hive partitions. Can you create a that shows the difference? That helps us to fix it.

@lmocsi
Copy link
Author

lmocsi commented Jun 18, 2024

Create a what? A test dataset?
There is one similar that can be created: apache/arrow#39768
But I do not know if you can reproduce the problem on that.
How could I test it on the real data?

@deanm0000
Copy link
Collaborator

@ritchie46 See #13908 and subsequently #14244

@lmocsi it would be more helpful if you brought up the old issues from the start. Did it work in 0.20.7, which is the first version that the above PR was in?

@stinodego stinodego added the A-io-partitioning Area: reading/writing (Hive) partitioned files label Jun 18, 2024
@lmocsi
Copy link
Author

lmocsi commented Jun 18, 2024

The performance difference is there with older versions (eg. 0.20.3), as well.
I did not go further back in time.

@lmocsi
Copy link
Author

lmocsi commented Jun 18, 2024

Can it be, that datetime partition name formatting is confusing scan_parquet?
Thinking of the 00%3A00%3A00 part.
Eg:
/my_path/my_table/CALENDAR_DATE=2019-01-01 00%3A00%3A00/part-00000-35c18ead-whatever.c000.snappy.parquet

@ritchie46
Copy link
Member

Create a what? A test dataset?

A full reproducable example that shows the problem. So test data and the query that goes with it.

@lmocsi
Copy link
Author

lmocsi commented Jun 18, 2024

Create a what? A test dataset?

A full reproducable example that shows the problem. So test data and the query that goes with it.

I tried, but it is not clear, which part of the dataset the error comes from.
Unfortunately I cannot disclose the dataset.
Is there a way I can test, what you'd be testing on a full reproducable example?

@ritchie46
Copy link
Member

Try to create some fake data with the same hive partition schemas. It probably is only the hive partitions and the query that's important.

@lmocsi
Copy link
Author

lmocsi commented Jun 19, 2024

See the sample data creation and reading up:

#create data
import polars as pl
from faker import Faker
import random as rnd
from datetime import datetime,date
import pyarrow.dataset as ds

print(f"polars version: {pl.__version__}")

def ido():
    return datetime.now().strftime('%Y.%m.%d. %H:%M:%S')

fake = Faker()

print(ido(),'Started')

path = '/mypath/'
dflen = 10000000
df = pl.DataFrame({'ID': pl.Series(fake.unique.random_int(min=13127924000, max=14127924000) for i in range(dflen)),
                   'BA_ID': pl.Series(fake.unique.random_int(min=2, max=2585456410) for i in range(dflen)),
                   'PART_ID': pl.Series(fake.unique.random_int(min=2163520, max=16320804) for i in range(dflen)),
                   'CU_ID': pl.Series(rnd.choice([1096, 3342, 3374, 4272, 3098, 3099]) for i in range(dflen)),
                   'DEA_ID': pl.Series(fake.unique.random_int(min=996000, max=53237133) for i in range(dflen)),
                   'AM_CY': pl.Series(fake.pyfloat(min_value=10000.0, max_value=990000.0, right_digits=1) for i in range(dflen)),
                   'CR_FL': pl.Series(rnd.choice(['Y', 'N']) for i in range(dflen)),
                   'PA_COM': pl.Series(rnd.choice(["######", None]) for i in range(dflen)),
                   'CO_TE': pl.Series(rnd.choice(["######", "A:Techn. part######", " -5755.00 MAD -5755.0"]) for i in range(dflen)),
                   'OT_AC': pl.Series(rnd.choice(["121223234545565678788989", "111122224444555577778888",  None]) for i in range(dflen)),
                   'OP_AC_NA': pl.Series(rnd.choice(["Donald Arthur Biden###########", "Joe William Trump#############",  None]) for i in range(dflen)),
                   'DWS_ID': pl.Series(rnd.choice([198, 1395, 5121, 2473]) for i in range(dflen)),
                   'ADT_ID': pl.Series(rnd.choice([570, 1309, 1680, 1798, 1916, 13856, 355136]) for i in range(dflen)),
                   'ADC_ID': pl.Series(rnd.choice([1019, 1134, 1455]) for i in range(dflen)),
                   'ADK_ID': pl.Series(rnd.choice([2058, 2185, 160279, 240274]) for i in range(dflen)),
                   'ABDO_ID': pl.Series(rnd.choice([2, 31248967]) for i in range(dflen)),
                   'ADS_ID': pl.Series(rnd.choice([1271, 1265, 1399, 1342, 1652, 1266]) for i in range(dflen)),
                   'INT_FL': pl.Series(rnd.choice(['Y', None]) for i in range(dflen)),
                   'MN_DIR': pl.Series(rnd.choice(['INT', 'DOM']) for i in range(dflen)),
                   'ADC_ID': pl.Series(rnd.choice([2, 2688, 2689, 24605]) for i in range(dflen)),
                   'ADO_ID': pl.Series(rnd.choice([2, 3126]) for i in range(dflen)),
                   'REF': pl.Series(rnd.choice(['12345679801','AD789789_12345645','DAS7894561230315','12345678','81051314_239_02_01_00_4566']) for i in range(dflen)),
                   'SEC_ID': pl.Series(rnd.choice([2, 93708]) for i in range(dflen)),
                   'ADL_ID': pl.Series(rnd.choice([2, 1125, 1134, 1364, 20834]) for i in range(dflen)),
                   'CH_ID': pl.Series(rnd.choice([50141, 50016, 49904, 49838, None]) for i in range(dflen)),
                   #'CALENDAR_DATE': pl.Series(fake.date_between_dates(date(2023,1,1),date(2024,4,30)) for i in range(dflen)), # calendar_date:date -> scan_parquet reads it very fast
                   'CALENDAR_DATE': pl.Series(fake.date_between_dates(datetime(2023,1,1),datetime(2024,4,30)) for i in range(dflen)),
                  }).with_columns(AM=pl.col('AM_CY'))

print(ido(),dflen,'records created')

ds.write_dataset(
        df.to_arrow(),
        path+'my_transaction',
        format="parquet",
        partitioning=["CALENDAR_DATE"],
        partitioning_flavor="hive",
        existing_data_behavior="delete_matching",
    )

print(ido(),'finished')
# 2024.06.19. 17:26:12 Started
# 2024.06.19. 17:41:46 10000000 records created
# 2024.06.19. 17:43:02 finished

Then reading the data shows the difference: scan_pyarrow_dataset runs in 2 seconds, while scan_parquet runs in 21 seconds:

import polars as pl
from datetime import datetime
import pyarrow.dataset as ds

print(f"polars version: {pl.__version__}")

def ido():
    return datetime.now().strftime('%Y.%m.%d. %H:%M:%S')

parq_path = '/mypath/'
ext = '/**/*.parquet'

tr = pl.scan_pyarrow_dataset(ds.dataset(parq_path+"my_transaction", partitioning='hive'))
df = (tr.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-12-01'), pl.lit('2023-12-31'))) &
                   (pl.col('CR_FL') == 'I') &
                   (pl.col('SEC_ID') > 3) &
                   (pl.col('ADL_ID') == 2905) &
                   (~pl.col('PART_ID').is_in([5086634, 2149316, 6031676])) &
                   (pl.col('ADT_ID') != 7010)
                   )
           .select('PART_ID').unique()
           .rename({'PART_ID':'PARTY_ID'})
           .with_columns(pl.lit(1).alias('LU_NEXT_FL'))
         )
print(ido(),'scan_pyarrow_dataset started') # 2 sec
df.collect()
print(ido(),'scan_pyarrow_dataset finished')

# explain plan:
# WITH_COLUMNS:
#  [dyn int: 1.alias("LU_NEXT_FL")] 
#   RENAME
#     UNIQUE[maintain_order: false, keep_strategy: Any] BY None
#       simple π 1/6 ["PART_ID"]
#           PYTHON SCAN 
#           PROJECT 6/26 COLUMNS
#           SELECTION: [([([([([([(col("ADL_ID")) == (2905)]) & (col("PART_ID").is_in([Series]).not())]) & ([(col("SEC_ID")) > (3)])]) & ([(col("ADT_ID")) != (7010)])]) & ([(col("CR_FL")) == (String(I))])]) & (col("CALENDAR_DATE").is_between([String(2023-12-01), String(2023-12-31)]))]


tr2 = pl.scan_parquet(parq_path+"my_transaction"+ext)
df2 = (tr2.filter((pl.col('CALENDAR_DATE').is_between(pl.lit('2023-12-01'), pl.lit('2023-12-31'))) &
                   (pl.col('CR_FL') == 'I') &
                   (pl.col('SEC_ID') > 3) &
                   (pl.col('ADL_ID') == 2905) &
                   (~pl.col('PART_ID').is_in([5086634, 2149316, 6031676])) &
                   (pl.col('ADT_ID') != 7010)
                   )
           .select('PART_ID').unique()
           .rename({'PART_ID':'PARTY_ID'})
           .with_columns(pl.lit(1).alias('LU_NEXT_FL'))
         )
print(ido(),'scan_parquet started') # 21 sec
df2.collect()
print(ido(),'scan_parquet finished')

# explain plan:
 # WITH_COLUMNS:
 # [dyn int: 1.alias("LU_NEXT_FL")] 
 #  RENAME
 #    UNIQUE[maintain_order: false, keep_strategy: Any] BY None
 #      simple π 1/6 ["PART_ID"]
 #          Parquet SCAN 485 files: first file: /mypath/my_transaction/CALENDAR_DATE=2023-01-01/part-0.parquet
 #          PROJECT 6/26 COLUMNS
 #          SELECTION: [([([([([([(col("ADL_ID")) == (2905)]) & ([(col("ADT_ID")) != (7010)])]) & (col("CALENDAR_DATE").is_between([String(2023-12-01), String(2023-12-31)]))]) & ([(col("SEC_ID")) > (3)])]) & (col("PART_ID").is_in([Series]).not())]) & ([(col("CR_FL")) == (String(I))])]

@lmocsi
Copy link
Author

lmocsi commented Jun 19, 2024

One small addition: in the data creation part the CALENDAR_DATE field should be calculated like this:

   'CALENDAR_DATE': pl.Series(fake.date_time_between_dates(datetime(2023,1,1),datetime(2024,4,30)) for i in range(dflen)).dt.truncate('1d'),

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io-partitioning Area: reading/writing (Hive) partitioned files enhancement New feature or an improvement of an existing feature performance Performance issues or improvements
Projects
None yet
Development

No branches or pull requests

5 participants