In [None]:
!pip uninstall apache-beam -y && pip install -U pandas polars pyarrow narwhals ibis-framework

In [None]:
import pandas as pd
import polars as pl

pd.options.mode.copy_on_write = True
pd.options.future.infer_string = True

In [None]:
from datetime import date
from datetime import datetime
from typing import Any

import narwhals as nw


def q7_pandas_native(
    nation_ds,
    customer_ds,
    line_item_ds,
    orders_ds,
    supplier_ds,
) -> None:
    var1 = "FRANCE"
    var2 = "GERMANY"
    var3 = date(1995, 1, 1)
    var4 = date(1996, 12, 31)

    n1 = nation_ds[(nation_ds["n_name"] == var1)]
    n2 = nation_ds[(nation_ds["n_name"] == var2)]

    # Part 1
    jn1 = customer_ds.merge(n1, left_on="c_nationkey", right_on="n_nationkey")
    jn2 = jn1.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
    jn2 = jn2.rename({"n_name": "cust_nation"}, axis="columns")
    jn3 = jn2.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
    jn4 = jn3.merge(supplier_ds, left_on="l_suppkey", right_on="s_suppkey")
    jn5 = jn4.merge(n2, left_on="s_nationkey", right_on="n_nationkey")
    df1 = jn5.rename({"n_name": "supp_nation"}, axis="columns")

    # Part 2
    jn1 = customer_ds.merge(n2, left_on="c_nationkey", right_on="n_nationkey")
    jn2 = jn1.merge(orders_ds, left_on="c_custkey", right_on="o_custkey")
    jn2 = jn2.rename({"n_name": "cust_nation"}, axis="columns")
    jn3 = jn2.merge(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
    jn4 = jn3.merge(supplier_ds, left_on="l_suppkey", right_on="s_suppkey")
    jn5 = jn4.merge(n1, left_on="s_nationkey", right_on="n_nationkey")
    df2 = jn5.rename({"n_name": "supp_nation"}, axis="columns")

    # Combine
    total = pd.concat([df1, df2])

    total = total[(total["l_shipdate"] >= var3) & (total["l_shipdate"] <= var4)]
    total["volume"] = total["l_extendedprice"] * (1.0 - total["l_discount"])
    total["l_year"] = total["l_shipdate"].dt.year

    gb = total.groupby(["supp_nation", "cust_nation", "l_year"], as_index=False)
    agg = gb.agg(revenue=pd.NamedAgg(column="volume", aggfunc="sum"))

    return agg.sort_values(by=["supp_nation", "cust_nation", "l_year"])  # type: ignore[no-any-return]

In [None]:
def q7(
    nation_ds,
    customer_ds,
    line_item_ds,
    orders_ds,
    supplier_ds,
) -> None:
    nation_ds = nw.from_native(nation_ds)
    customer_ds = nw.from_native(customer_ds)
    line_item_ds = nw.from_native(line_item_ds)
    orders_ds = nw.from_native(orders_ds)
    supplier_ds = nw.from_native(supplier_ds)

    n1 = nation_ds.filter(nw.col("n_name") == "FRANCE")
    n2 = nation_ds.filter(nw.col("n_name") == "GERMANY")

    var_1 = datetime(1995, 1, 1)
    var_2 = datetime(1996, 12, 31)

    df1 = (
        customer_ds.join(n1, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders_ds, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier_ds, left_on="l_suppkey", right_on="s_suppkey")
        .join(n2, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    df2 = (
        customer_ds.join(n2, left_on="c_nationkey", right_on="n_nationkey")
        .join(orders_ds, left_on="c_custkey", right_on="o_custkey")
        .rename({"n_name": "cust_nation"})
        .join(line_item_ds, left_on="o_orderkey", right_on="l_orderkey")
        .join(supplier_ds, left_on="l_suppkey", right_on="s_suppkey")
        .join(n1, left_on="s_nationkey", right_on="n_nationkey")
        .rename({"n_name": "supp_nation"})
    )

    result = (
        nw.concat([df1, df2])
        .filter(nw.col("l_shipdate").is_between(var_1, var_2))
        .with_columns(
            (nw.col("l_extendedprice") * (1 - nw.col("l_discount"))).alias("volume")
        )
        .with_columns(nw.col("l_shipdate").dt.year().alias("l_year"))
        .group_by("supp_nation", "cust_nation", "l_year")
        .agg(nw.sum("volume").alias("revenue"))
        .sort(by=["supp_nation", "cust_nation", "l_year"])
    )
    return nw.to_native(result)

In [None]:
import ibis


def q7_ibis(
    nation: Any, customer: Any, lineitem: Any, orders: Any, supplier: Any, *, tool: str
) -> None:
    var1 = "FRANCE"
    var2 = "GERMANY"
    var3 = datetime(1995, 1, 1)
    var4 = datetime(1996, 12, 31)

    n1 = nation.filter(nation["n_name"] == var1)
    n2 = nation.filter(nation["n_name"] == var2)

    q1 = (
        customer.join(n1, customer["c_nationkey"] == n1["n_nationkey"])
        .join(orders, customer["c_custkey"] == orders["o_custkey"])
        .rename({"cust_nation": "n_name"})
        .join(lineitem, orders["o_orderkey"] == lineitem["l_orderkey"])
        .join(supplier, lineitem["l_suppkey"] == supplier["s_suppkey"])
        .join(n2, supplier["s_nationkey"] == n2["n_nationkey"])
        .rename({"supp_nation": "n_name"})
    )

    q2 = (
        customer.join(n2, customer["c_nationkey"] == n2["n_nationkey"])
        .join(orders, customer["c_custkey"] == orders["o_custkey"])
        .rename({"cust_nation": "n_name"})
        .join(lineitem, orders["o_orderkey"] == lineitem["l_orderkey"])
        .join(supplier, lineitem["l_suppkey"] == supplier["s_suppkey"])
        .join(n1, supplier["s_nationkey"] == n1["n_nationkey"])
        .rename({"supp_nation": "n_name"})
    )

    q_final = (
        q1.union(q2)
        .filter((ibis._["l_shipdate"] >= var3) & (ibis._["l_shipdate"] <= var4))
        .mutate(
            volume=(ibis._["l_extendedprice"] * (1 - ibis._["l_discount"])),
            l_year=ibis._["l_shipdate"].year(),
        )
        .group_by("supp_nation", "cust_nation", "l_year")
        .agg(revenue=ibis._["volume"].sum())
        .order_by("supp_nation", "cust_nation", "l_year")
    )

    if tool == "pandas":
        return q_final.to_pandas()
    if tool == "polars":
        return q_final.to_polars()
    raise ValueError("expected pandas or polars")

In [None]:
dir_ = "/kaggle/input/tpc-h-data-parquet-s-2/"
region = dir_ + "region.parquet"
nation = dir_ + "nation.parquet"
customer = dir_ + "customer.parquet"
lineitem = dir_ + "lineitem.parquet"
orders = dir_ + "orders.parquet"
supplier = dir_ + "supplier.parquet"
part = dir_ + "part.parquet"
partsupp = dir_ + "partsupp.parquet"

In [None]:
con_pd = ibis.pandas.connect()
con_pl = ibis.polars.connect()

IO_FUNCS = {
    "pandas": lambda x: pd.read_parquet(x, engine="pyarrow"),
    "pandas[pyarrow]": lambda x: pd.read_parquet(
        x, engine="pyarrow", dtype_backend="pyarrow"
    ),
    "pandas[pyarrow][ibis]": lambda x: con_pd.read_parquet(
        x, engine="pyarrow", dtype_backend="pyarrow"
    ),
    "polars[eager]": lambda x: pl.read_parquet(x),
    "polars[lazy]": lambda x: pl.scan_parquet(x),
    "polars[lazy][ibis]": lambda x: con_pl.read_parquet(x),
}

In [None]:
results = {}

## pandas, pyarrow dtypes, via ibis

In [None]:
tool = "pandas[pyarrow][ibis]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7_ibis(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier), tool='pandas')
results[tool] = timings.all_runs

## polars, lazy, via ibis

In [None]:
tool = "polars[lazy][ibis]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7_ibis(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier), tool='polars')
results[tool] = timings.all_runs

## pandas, pyarrow dtypes, native

In [None]:
tool = "pandas[pyarrow]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7_pandas_native(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool + "[native]"] = timings.all_runs

## pandas via Narwhals

In [None]:
tool = "pandas"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## pandas, pyarrow dtypes, via Narwhals

In [None]:
tool = "pandas[pyarrow]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## Polars read_parquet

In [None]:
tool = "polars[eager]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier))
results[tool] = timings.all_runs

## Polars scan_parquet

In [None]:
tool = "polars[lazy]"
fn = IO_FUNCS[tool]
timings = %timeit -o -q q7(fn(nation), fn(customer), fn(lineitem), fn(orders), fn(supplier)).collect()
results[tool] = timings.all_runs

## Save

In [None]:
import json

with open("results.json", "w") as fd:
    json.dump(results, fd)