In [1]:
!pip uninstall apache-beam -y && pip install -U pandas polars pyarrow narwhals>=0.9.5 ibis-framework

Collecting polars
  Downloading polars-0.20.16-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (15 kB)
Downloading polars-0.20.16-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (26.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.3/26.3 MB[0m [31m67.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: polars
  Attempting uninstall: polars
    Found existing installation: polars 0.20.15
    Uninstalling polars-0.20.15:
      Successfully uninstalled polars-0.20.15
Successfully installed polars-0.20.16


In [3]:
import pandas as pd
import polars as pl

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

In [None]:
from datetime import date

def q1_pandas_native(lineitem):
    VAR1 = date(1998, 9, 2)

    sel = lineitem.l_shipdate <= VAR1
    lineitem_filtered = lineitem[sel]

    # This is lenient towards pandas as normally an optimizer should decide
    # that this could be computed before the groupby aggregation.
    # Other implementations don't enjoy this benefit.
    lineitem_filtered["disc_price"] = lineitem_filtered.l_extendedprice * (
        1 - lineitem_filtered.l_discount
    )
    lineitem_filtered["charge"] = (
        lineitem_filtered.l_extendedprice
        * (1 - lineitem_filtered.l_discount)
        * (1 + lineitem_filtered.l_tax)
    )
    gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"], as_index=False)

    total = gb.agg(
        sum_qty=pd.NamedAgg(column="l_quantity", aggfunc="sum"),
        sum_base_price=pd.NamedAgg(column="l_extendedprice", aggfunc="sum"),
        sum_disc_price=pd.NamedAgg(column="disc_price", aggfunc="sum"),
        sum_charge=pd.NamedAgg(column="charge", aggfunc="sum"),
        avg_qty=pd.NamedAgg(column="l_quantity", aggfunc="mean"),
        avg_price=pd.NamedAgg(column="l_extendedprice", aggfunc="mean"),
        avg_disc=pd.NamedAgg(column="l_discount", aggfunc="mean"),
        count_order=pd.NamedAgg(column="l_orderkey", aggfunc="size"),
    )

    result_df = total.sort_values(["l_returnflag", "l_linestatus"])

    return result_df  # type: ignore[no-any-return]

In [4]:
from typing import Any
from datetime import datetime
import narwhals as nw

@nw.narwhalify
def q1(lineitem_ds: Any) -> Any:
    var_1 = datetime(1998, 9, 2)
    return (
        lineitem_ds.filter(nw.col("l_shipdate") <= var_1)
        .with_columns(
            disc_price=nw.col("l_extendedprice") * (1 - nw.col("l_discount")),
            charge=(
                nw.col("l_extendedprice")
                * (1.0 - nw.col("l_discount"))
                * (1.0 + nw.col("l_tax"))
            ),
        )
        .group_by(["l_returnflag", "l_linestatus"])
        .agg(
            [
                nw.sum("l_quantity").alias("sum_qty"),
                nw.sum("l_extendedprice").alias("sum_base_price"),
                nw.sum("disc_price").alias("sum_disc_price"),
                nw.col("charge").sum().alias("sum_charge"),
                nw.mean("l_quantity").alias("avg_qty"),
                nw.mean("l_extendedprice").alias("avg_price"),
                nw.mean("l_discount").alias("avg_disc"),
                nw.len().alias("count_order"),
            ],
        )
        .sort(["l_returnflag", "l_linestatus"])
    )

In [None]:
def q1_ibis(lineitem: Any, *, tool):
    var1 = datetime(1998, 9, 2)
    lineitem = lineitem.filter(lineitem["l_shipdate"] <= var1)
    lineitem = lineitem.mutate(
        disc_price=lineitem["l_extendedprice"] * (1 - lineitem["l_discount"]),
        charge=(
            lineitem["l_extendedprice"]
            * (1.0 - lineitem["l_discount"])
            * (1.0 + lineitem["l_tax"])
        ),
    )
    q_final = (
        lineitem
        .group_by(["l_returnflag", "l_linestatus"])
        .aggregate(
            sum_qty=lineitem["l_quantity"].sum(),
            sum_base_price=lineitem["l_extendedprice"].sum(),
            sum_disc_price=(lineitem['disc_price'].sum()),
            sum_charge=(lineitem['charge'].sum()),
            avg_qty=lineitem["l_quantity"].mean(),
            avg_price=lineitem["l_extendedprice"].mean(),
            avg_disc=lineitem["l_discount"].mean(),
            count_order=lambda lineitem: lineitem.count(),
        )
        .order_by(["l_returnflag", "l_linestatus"])
    )
    if tool == 'pandas':
        return q_final.to_pandas()
    if tool == 'polars':
        return q_final.to_polars()
    raise ValueError("expected pandas or polars")

In [5]:
dir_ = "/kaggle/input/tpc-h-data-parquet-s-2/"
region = dir_ + 'region.parquet'
nation = dir_ + 'nation.parquet'
customer = dir_ + 'customer.parquet'
lineitem = dir_ + 'lineitem.parquet'
orders = dir_ + 'orders.parquet'
supplier = dir_ + 'supplier.parquet'
part = dir_ + 'part.parquet'
partsupp = dir_ + 'partsupp.parquet'

In [6]:
import ibis

con_pd = ibis.pandas.connect()
con_pl = ibis.polars.connect()

IO_FUNCS = {
    'pandas': lambda x: pd.read_parquet(x, engine='pyarrow'),
    'pandas[pyarrow]': lambda x: pd.read_parquet(x, engine='pyarrow', dtype_backend='pyarrow'),
    'pandas[pyarrow][ibis]': lambda x: con_pd.read_parquet(x, engine='pyarrow', dtype_backend='pyarrow'),
    'polars[eager]': lambda x: pl.read_parquet(x),
    'polars[lazy]': lambda x: pl.scan_parquet(x),
    'polars[lazy][ibis]': lambda x: con_pl.read_parquet(x),
}

In [None]:
results = {}

## pandas, pyarrow dtypes, ibis

In [None]:
tool = 'pandas[pyarrow][ibis]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1_ibis(fn(lineitem), tool='pandas')
results[tool] = timings.all_runs

24 s ± 142 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


23.841894793999984

## pandas, pyarrow dtypes, native

In [None]:
tool = 'pandas[pyarrow]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1_pandas_native(lineitem=fn(lineitem))
results[tool+'[native]'] = timings.all_runs

24 s ± 142 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


23.841894793999984

## pandas via Narwhals

In [7]:
tool = 'pandas'
fn = IO_FUNCS[tool]
timings = %timeit -o q1(lineitem_ds=fn(lineitem))
results[tool] = timings.all_runs

24 s ± 142 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


23.841894793999984

## pandas, pyarrow dtypes, via Narwhals

In [8]:
tool = 'pandas[pyarrow]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1(fn(lineitem))
results[tool] = timings.all_runs

20.2 s ± 5.8 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


16.42582530300001

## Polars read_parquet

In [9]:
tool = 'polars[eager]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1(fn(lineitem))
results[tool] = timings.all_runs

4.67 s ± 85 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


4.574684939999997

## Polars scan_parquet

In [10]:
tool = 'polars[lazy]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1(fn(lineitem)).collect()
results[tool] = timings.all_runs

595 ms ± 18.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0.5674880569999914

## Polars scan_parquet ibis

In [None]:
tool = 'polars[lazy][ibis]'
fn = IO_FUNCS[tool]
timings = %timeit -o q1_ibis(fn(lineitem), tool='polars')
results[tool] = timings.all_runs

24 s ± 142 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


23.841894793999984

## Save

In [None]:
import json
with open('results.json', 'w') as fd:
    json.dump(results, fd)
