Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance issue with tpch q7 after dropping columns and using sink_parquet #16694

Open
2 tasks done
lostmygithubaccount opened this issue Jun 3, 2024 · 4 comments
Open
2 tasks done
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars

Comments

@lostmygithubaccount
Copy link

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

this is slightly involved but you should be able to copy/paste below after pip install 'ibis-framework[duckdb]' in addition to having Polars installed. I am on the latest release of Polars (0.20.31). this breaks down at sf=20, works fine on sf=10

function to generate the data:

import os
import ibis

def get_data_dir(sf, n_partitions):
    return os.path.join("tpch_data", f"sf={sf}", f"n={n_partitions}")


def generate_data(sf, n_partitions):
    con = ibis.connect("duckdb://")
    con.raw_sql("set enable_progress_bar = false")

    data_directory = get_data_dir(sf, n_partitions)

    if not os.path.exists(data_directory):
        for i in range(n_partitions):
            con.raw_sql(f"call dbgen(sf={sf}, children={n_partitions}, step={i})")
            for table in con.list_tables():
                if i == 0:
                    os.makedirs(os.path.join(data_directory, table), exist_ok=True)

                con.table(table).to_parquet(
                    os.path.join(data_directory, table, f"{i:04d}.parquet")
                )

                con.drop_table(table)

run for sf=10 and sf=20:

n = 1

sfs = [10, 20]

for sf in sfs:
    generate_data(sf, n)

now we can read the data:

customer = pl.scan_parquet("tpch_data/sf=10/n=1/customer/*.parquet")
customer.head().collect()

you'll notice Polars by default (and Ibis on the DuckDB/Polars backends) creates the hive-partitioned sf and n as columns in the data. this was throwing some things off, so in my longer get_polars_tables function I dropped those columns:

def get_polars_tables(sf, n_partitions, lazy=True):
    import os

    os.environ["POLARS_ACTIVATE_DECIMAL"] = (
        "1"  # https://github.com/pola-rs/polars/issues/16603#issuecomment-2141701041
    )
    data_directory = get_data_dir(sf, n_partitions)

    if lazy:
        customer = pl.scan_parquet(f"{data_directory}/customer/*.parquet")
        lineitem = pl.scan_parquet(f"{data_directory}/lineitem/*.parquet")
        nation = pl.scan_parquet(f"{data_directory}/nation/*.parquet")
        orders = pl.scan_parquet(f"{data_directory}/orders/*.parquet")
        part = pl.scan_parquet(f"{data_directory}/part/*.parquet")
        partsupp = pl.scan_parquet(f"{data_directory}/partsupp/*.parquet")
        region = pl.scan_parquet(f"{data_directory}/region/*.parquet")
        supplier = pl.scan_parquet(f"{data_directory}/supplier/*.parquet")
    else:
        customer = pl.read_parquet(f"{data_directory}/customer/*.parquet")
        lineitem = pl.read_parquet(f"{data_directory}/lineitem/*.parquet")
        nation = pl.read_parquet(f"{data_directory}/nation/*.parquet")
        orders = pl.read_parquet(f"{data_directory}/orders/*.parquet")
        part = pl.read_parquet(f"{data_directory}/part/*.parquet")
        partsupp = pl.read_parquet(f"{data_directory}/partsupp/*.parquet")
        region = pl.read_parquet(f"{data_directory}/region/*.parquet")
        supplier = pl.read_parquet(f"{data_directory}/supplier/*.parquet")

    # TODO: report issue(s) (issue(s) at higher SFs)
    def _decimal_to_float(df):
        return df.with_columns((ps.decimal().cast(pl.Float64)))

    customer = _decimal_to_float(customer)
    lineitem = _decimal_to_float(lineitem)
    nation = _decimal_to_float(nation)
    orders = _decimal_to_float(orders)
    part = _decimal_to_float(part)
    partsupp = _decimal_to_float(partsupp)
    region = _decimal_to_float(region)
    supplier = _decimal_to_float(supplier)

    # TODO: keep this or figure something out and remove
    def _drop_hive_cols(df):
        return df.drop(["sf", "n"])

    customer = _drop_hive_cols(customer)
    lineitem = _drop_hive_cols(lineitem)
    nation = _drop_hive_cols(nation)
    orders = _drop_hive_cols(orders)
    part = _drop_hive_cols(part)
    partsupp = _drop_hive_cols(partsupp)
    region = _drop_hive_cols(region)
    supplier = _drop_hive_cols(supplier)

    return customer, lineitem, nation, orders, part, partsupp, region, supplier

you can then read in the tables:

sf = 20
n_partitions = 1
lazy = True

customer, lineitem, nation, orders, part, partsupp, region, supplier = (
    get_polars_tables(sf=sf, n_partitions=n_partitions, lazy=lazy)
)

perhaps a separate bug but I'll move forward -- at this point the dataframes still have the n and sf columns, even though they should have been dropped. this does not seem to be an issue in the eager API

now we define q7:

def q7(customer, lineitem, nation, orders, supplier, **kwargs):
    var1 = "FRANCE"
    var2 = "GERMANY"
    var3 = date(1995, 1, 1)
    var4 = date(1996, 12, 31)

    n1 = nation.filter(pl.col("n_name") == var1)
    n2 = nation.filter(pl.col("n_name") == var2)

    q1 = (
        customer.join(n1, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
        .join(n2, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    q2 = (
        customer.join(n2, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(lineitem, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier, left_on="l_suppkey", right_on="s_suppkey")
        .join(n1, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    q_final = (
        pl.concat([q1, q2])
        .filter(pl.col("l_shipdate").is_between(var3, var4))
        .with_columns(
            (pl.col("l_extendedprice") * (1 - pl.col("l_discount"))).alias("volume"),
            pl.col("l_shipdate").dt.year().alias("l_year"),
        )
        .group_by("supp_nation", "cust_nation", "l_year")
        .agg(pl.sum("volume").alias("revenue"))
        .sort(by=["supp_nation", "cust_nation", "l_year"])
    )

    return q_final

and run it, calling sink_parquet on the result:

res = q7(
    customer=customer,
    lineitem=lineitem,
    nation=nation,
    orders=orders,
    part=part,
    partsupp=partsupp,
    region=region,
    supplier=supplier,
)
res.sink_parquet("temp.parquet")

at sf=10 it works fine, but at sf=20 it hangs for a very long time. it also uses 100% CPU while doing this

as I'm writing this it actually did finish -- while .collect().write_parquet() takes 1.5s at sf=20, the sink_parquet call takes ~9s at sf=10 and ~60s at sf=20

I was originally testing this at sf=50 and sf=100 so assumed it was hanging forever, particularly compared to the previous numbers I was seeing before I added those drop column calls. I'll still submit this

NOTE: the log output below was too long (parquet file must be read...) for GitHub so I deleted a bunch of it, that seems like it'd be the issue though (reading the parquet file(s) a ton of times?)

Log output

parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
POLARS PREFETCH_SIZE: 20
RUN STREAMING PIPELINE
[parquet -> hstack -> callback -> fast_projection -> callback -> fast_projection -> callback -> fast_projection -> function -> fast_projection -> union -> hstack -> generic-group_by, parquet -> fast_projection -> callback -> fast_projection -> function -> generic_join_build, parquet -> fast_projection -> callback -> fast_projection -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> hstack -> callback -> fast_projection -> callback -> fast_projection -> callback -> fast_projection -> function -> fast_projection -> union -> hstack -> generic-group_by -> sort_multiple -> parquet_sink, parquet -> fast_projection -> callback -> fast_projection -> function -> generic_join_build, parquet -> fast_projection -> callback -> fast_projection -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build, parquet -> generic_join_build]
STREAMING CHUNK SIZE: 25000 rows
parquet file must be read, statistics not sufficient for predicate.
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 25000 rows
parquet file must be read, statistics not sufficient for predicate.
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 25000 rows
STREAMING CHUNK SIZE: 10000 rows
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...
parquet file must be read, statistics not sufficient for predicate.
finish streaming aggregation with local in-memory table

Issue description

two potential issues:

  1. noticed columns aren't dropped for LazyFrames when they should be (and are for regular DataFrames)

  2. potential performance issue involving dropping columns + sink_parquet

  3. columns are dropped

  4. no performance issue w/ the above (I can work around this w/ .collect().write_parquet it seems)

Expected behavior

  1. columns are dropped
  2. no performance issue w/ the above (I can work around this w/ .collect().write_parquet it seems)

Installed versions

--------Version info---------
Polars:               0.20.31
Index type:           UInt32
Platform:             macOS-14.5-arm64-arm-64bit
Python:               3.11.8 (main, Feb 20 2024, 20:00:15) [Clang 14.0.3 (clang-1403.0.22.14.1)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               2024.5.0
gevent:               <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         1.6.0
numpy:                1.26.4
openpyxl:             <not installed>
pandas:               2.2.2
pyarrow:              16.1.0
pydantic:             <not installed>
pyiceberg:            <not installed>
pyxlsb:               <not installed>
sqlalchemy:           <not installed>
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@lostmygithubaccount lostmygithubaccount added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Jun 3, 2024
@lostmygithubaccount
Copy link
Author

this was very nasty...it took me a while to figure out what changed (adding that drop columns thing) -- I'm going to move forward by just removing sink_parquet in my code in favor of collect().write_parquet(), as it was causing other issues too

the columns not being dropped was particularly surprising. but if overall this is something y'all are addressing in the new streaming engine or there's something obvious here of course feel free to close

@lostmygithubaccount
Copy link
Author

and one more note, I also observed this in 0.20.30 -- I upgraded to see if it fixed itself

@ritchie46
Copy link
Member

ritchie46 commented Jun 4, 2024

sink_parquet uses the streaming engine, whereas collect().write_parquet() uses the in-memory engine. We are completely redesigning the streaming engine and will likely not improve the performance of the current one (it will be discontinued).

We don't recommend using the streaming engine at the moment (if it works for you great), but we are not happy with it. If you use it for benchmarking, I think you should make clear that it is polars-streaming you are benchmarking. ;)

On the mentioned bug, can you create an MWE that only shows the bug and can be repeated on a small dataset without any dependencies?

@lostmygithubaccount
Copy link
Author

that's helpful, thanks! I'll probably avoid streaming for this but keep it flexible so we can redo it once the new engine is out (and clearly note what we're using) -- btw I'll share the benchmark in the communities before publishing for any other feedback or corrections (hopefully this week)

I'll also try to reproduce this on smaller data with a better MWE (but won't be a high priority for me), feel free to close this out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars
Projects
None yet
Development

No branches or pull requests

2 participants