## Imports

In [2]:
import ibis
from ibis import _
ibis.options.interactive = True
import ibis.selectors as s
import os

## Functions to run the TPC-H queries

In [3]:
def tpc_h01(con):
    """    
    This query reports the amount of business that was billed, shipped, and returned.
    
    The Pricing Summary Report Query provides a summary pricing report for all lineitems
    shipped as of a given date. The date is within 60 - 120 days of the greatest ship date
    contained in the database. The query lists totals for extended price, discounted extended
    price, discounted extended price plus tax, average quantity, average extended
    price, and average discount. These aggregates are grouped by RETURNFLAG and LINESTATUS,
    and listed in ascending order of RETURNFLAG and LINESTATUS. A count of the number of
    lineitems in each group is included.
    """
    lineitem = con.table("lineitem")
    q01 = (
        lineitem
        .filter(_.l_shipdate <= (ibis.date('1998-12-01') - ibis.interval(days=90)))
        .aggregate(
            by=[_.l_returnflag, _.l_linestatus],
            sum_qty=_.l_quantity.sum(),
            sum_base_price=_.l_extendedprice.sum(),
            sum_disc_price=(_.l_extendedprice * (1 - _.l_discount)).sum(),
            sum_charge=(_.l_extendedprice * (1 - _.l_discount) * (1 + _.l_tax)).sum(),
            avg_qty=_.l_quantity.mean(),
            avg_price=_.l_extendedprice.mean(),
            avg_disc=_.l_discount.mean(),
            count_order=_.count()
        )
        .order_by([_.l_returnflag, _.l_linestatus])
    )
    return q01


def tpc_h10(con):
    """
    The query identifies customers who might be having problems with the parts that are
    shipped to them.
    
    The Returned Item Reporting Query finds the top 20 customers, in terms of their effect
    on lost revenue for a given quarter, who have returned parts. The query considers only
    parts that were ordered in the specified quarter. The query lists the customer's name,
    address, nation, phone number, account balance, comment information and revenue lost.
    The customers are listed in descending order of lost revenue. Revenue lost is defined
    as sum(l_extendedprice*(1-l_discount)) for all qualifying lineitems.
    """
    lineitem = con.table('lineitem')
    orders = con.table('orders')
    customer = con.table("customer")
    nation = con.table("nation")
    
    q = (
        customer
            .join(orders, orders.o_custkey == customer.c_custkey)
            .join(lineitem, lineitem.l_orderkey == orders.o_orderkey)
            .join(nation, customer.c_nationkey == nation.n_nationkey)
    )

    q = q.filter(
        [
            (q.o_orderdate >= ibis.date("1993-10-01")) & (q.o_orderdate < (ibis.date("1993-10-01") + ibis.interval(months=3))),
            q.l_returnflag == "R",
        ]
    )

    gq = q.group_by(
        [
            q.c_custkey,
            q.c_name,
            q.c_acctbal,
            q.c_phone,
            q.n_name,
            q.c_address,
            q.c_comment,
        ]
    )
    q = gq.aggregate(revenue=(q.l_extendedprice * (1 - q.l_discount)).sum())

    q = q.order_by(ibis.desc(q.revenue))
    return q.limit(20)


## Postgres

In [4]:
pg_user = os.environ.get("PG_USER")
pg_pwd = os.environ.get("PG_PWD")

con_pg = ibis.postgres.connect(database='tpc-h-10gb', host='localhost', user=pg_user, password=pg_pwd)

### Example TPC-H query results

In [5]:
tpc_h01(con_pg)

In [18]:
tpc_h10(con_pg)

### TPC-H queries benchmarking on Postgres

In [6]:
%%timeit -r10
tpc_h01(con_pg).execute()

27 s ± 9.39 s per loop (mean ± std. dev. of 10 runs, 1 loop each)


In [25]:
%%timeit -r10
tpc_h10(con_pg).execute()

5.69 s ± 162 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


## DuckDB

In [20]:
con_ibis = ibis.duckdb.connect()
con_ibis.register("/home/francois/datasets/tpc-h-10GB/parquet/lineitem.parquet", table_name="lineitem")
con_ibis.register("/home/francois/datasets/tpc-h-10GB/parquet/orders.parquet", table_name="orders")
con_ibis.register("/home/francois/datasets/tpc-h-10GB/parquet/customer.parquet", table_name="customer")
con_ibis.register("/home/francois/datasets/tpc-h-10GB/parquet/nation.parquet", table_name="nation")
con_ibis.list_tables()

['nation', 'customer', 'orders', 'lineitem']

### TPC-H queries benchmarking on DuckDB

In [23]:
%%timeit -r 10
tpc_h01(con_ibis).execute()

3.48 s ± 69.7 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


In [26]:
%%timeit -r 10
tpc_h10(con_ibis).execute()

3.99 s ± 250 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)


## Spark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "25g").appName("tpc-h-10GB").getOrCreate()

In [5]:
con_spark = ibis.pyspark.connect(spark)

In [33]:
con_spark.register("/home/francois/datasets/tpc-h-10GB/parquet/lineitem.parquet", table_name="lineitem")
con_spark.register("/home/francois/datasets/tpc-h-10GB/parquet/orders.parquet", table_name="orders")
con_spark.register("/home/francois/datasets/tpc-h-10GB/parquet/customer.parquet", table_name="customer")
con_spark.register("/home/francois/datasets/tpc-h-10GB/parquet/nation.parquet", table_name="nation")

### TPC-H queries benchmarking on PySpark

In [21]:
%%timeit -r 10
tpc_h01(con_spark).execute()



7.46 s ± 125 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)



                                                                                

In [35]:
%%timeit -r 10
tpc_h10(con_spark).execute()



7.87 s ± 688 ms per loop (mean ± std. dev. of 10 runs, 1 loop each)



                                                                                