In [3]:
import ibis
import duckdb

SCALE_FACTOR = 1

# Connect to DuckDB
duckdb_con = duckdb.connect("tpch.duckdb")
con = ibis.connect("duckdb:///Users/Nok_Lam_Chan/conference/pycon2024/tpch.duckdb")

# Generate Data

In [4]:
# Generate TPC-H data at scale factor 1 (1 GB)
duckdb_con.sql(f"CALL dbgen(sf={SCALE_FACTOR})")

# Export each table to Parquet format
tables = ["lineitem", "orders", "customer", "supplier", "nation", "region", "part", "partsupp"]
for table in tables:
    duckdb_con.sql(f"COPY {table} TO 'data/{table}.parquet' (FORMAT PARQUET)")


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [6]:
part = con.table("part")
partsupp = con.table("partsupp")


In [7]:
part = part.sql("SELECT * from part")
partsupp = partsupp.sql("SELECT * from partsupp")

In [8]:
r = part.join(partsupp, part.p_partkey == partsupp.ps_partkey)

In [9]:
r.head().to_pandas()

Unnamed: 0,p_partkey,p_name,p_mfgr,p_brand,p_type,p_size,p_container,p_retailprice,p_comment,ps_partkey,ps_suppkey,ps_availqty,ps_supplycost,ps_comment
0,92161,coral misty blush black blanched,Manufacturer#2,Brand#25,ECONOMY BRUSHED STEEL,31,LG PACK,1153.16,subla,92161,2162,5815,284.89,ts sleep carefully according to the theodolite...
1,92161,coral misty blush black blanched,Manufacturer#2,Brand#25,ECONOMY BRUSHED STEEL,31,LG PACK,1153.16,subla,92161,4671,4415,141.93,final requests thrash after the bold pearls. ...
2,92161,coral misty blush black blanched,Manufacturer#2,Brand#25,ECONOMY BRUSHED STEEL,31,LG PACK,1153.16,subla,92161,7180,9557,982.01,". silent deposits cajole. special, bold excuse..."
3,92161,coral misty blush black blanched,Manufacturer#2,Brand#25,ECONOMY BRUSHED STEEL,31,LG PACK,1153.16,subla,92161,9689,6518,190.21,about the furiously unusual escapades should ...
4,92162,coral steel tomato red puff,Manufacturer#3,Brand#33,MEDIUM ANODIZED TIN,4,LG DRUM,1154.16,e stealthily among th,92162,2163,5906,232.07,ecial excuses. blithely silent accounts integr...


In [10]:
from pathlib import Path
cwd = str(Path.cwd())

In [11]:
cwd

'/Users/Nok_Lam_Chan/conference/pycon2024'

In [12]:
from __future__ import annotations


import ibis
import datetime
from dateutil.relativedelta import relativedelta

def add_date(datestr: str, dy: int = 0, dm: int = 0, dd: int = 0) -> ir.DateScalar:
    dt = datetime.date.fromisoformat(datestr)
    dt += relativedelta(years=dy, months=dm, days=dd)
    return ibis.date(dt.isoformat())

## Ibis Dataframe

In [13]:

lineitem = con.table("lineitem", database=f"tpch")

t = lineitem
q = t.filter(t.l_shipdate <= add_date("1998-12-01", dd=-90))
discount_price = t.l_extendedprice * (1 - t.l_discount)
charge = discount_price * (1 + t.l_tax)
q = q.group_by(["l_returnflag", "l_linestatus"])
q = q.aggregate(
    sum_qty=t.l_quantity.sum(),
    sum_base_price=t.l_extendedprice.sum(),
    sum_disc_price=discount_price.sum(),
    sum_charge=charge.sum(),
    avg_qty=t.l_quantity.mean(),
    avg_price=t.l_extendedprice.mean(),
    avg_disc=t.l_discount.mean(),
    count_order=lambda t: t.count(),
)
q = q.order_by(["l_returnflag", "l_linestatus"])
print(q)
print(ibis.to_sql(q))

r0 := DatabaseTable: tpch.lineitem
  l_orderkey      !int64
  l_partkey       !int64
  l_suppkey       !int64
  l_linenumber    !int64
  l_quantity      !decimal(15, 2)
  l_extendedprice !decimal(15, 2)
  l_discount      !decimal(15, 2)
  l_tax           !decimal(15, 2)
  l_returnflag    !string
  l_linestatus    !string
  l_shipdate      !date
  l_commitdate    !date
  l_receiptdate   !date
  l_shipinstruct  !string
  l_shipmode      !string
  l_comment       !string

r1 := Filter[r0]
  r0.l_shipdate <= datetime.date(1998, 9, 2)

r2 := Aggregate[r1]
  groups:
    l_returnflag: r1.l_returnflag
    l_linestatus: r1.l_linestatus
  metrics:
    sum_qty:        Sum(r1.l_quantity)
    sum_base_price: Sum(r1.l_extendedprice)
    sum_disc_price: Sum(r1.l_extendedprice * 1 - r1.l_discount)
    sum_charge:     Sum(r1.l_extendedprice * 1 - r1.l_discount * r1.l_tax + 1)
    avg_qty:        Mean(r1.l_quantity)
    avg_price:      Mean(r1.l_extendedprice)
    avg_disc:       Mean(r1.l_discount)
   

In [14]:
q.to_pandas().head()


Unnamed: 0,l_returnflag,l_linestatus,sum_qty,sum_base_price,sum_disc_price,sum_charge,avg_qty,avg_price,avg_disc,count_order
0,A,F,37734107.0,56586554400.73,53758257134.87,55909065222.83,25.52,38273.13,0.05,1478493
1,N,F,991417.0,1487504710.38,1413082168.05,1469649223.19,25.52,38284.47,0.05,38854
2,N,O,74476040.0,111701729697.74,106118230307.61,110367043872.5,25.5,38249.12,0.05,2920374
3,R,F,37719753.0,56568041380.9,53741292684.6,55889619119.83,25.51,38250.85,0.05,1478870


## Using the `Table.sql` method

In [15]:
sql = """
SELECT
    l_returnflag,
    l_linestatus,
    SUM(l_quantity) AS sum_qty,
    SUM(l_extendedprice) AS sum_base_price,
    SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
    SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
    AVG(l_quantity) AS avg_qty,
    AVG(l_extendedprice) AS avg_price,
    AVG(l_discount) AS avg_disc,
    COUNT(*) AS count_order
FROM
    lineitem
WHERE
    l_shipdate <= DATE '1998-12-01' - INTERVAL '90' DAY
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,
    l_linestatus;
"""

In [16]:
t.sql(sql).to_pandas().head()

Unnamed: 0,l_returnflag,l_linestatus,sum_qty,sum_base_price,sum_disc_price,sum_charge,avg_qty,avg_price,avg_disc,count_order
0,A,F,37734107.0,56586554400.73,53758257134.87,55909065222.82769,25.522006,38273.129735,0.049985,1478493
1,N,F,991417.0,1487504710.38,1413082168.0541,1469649223.194375,25.516472,38284.467761,0.050093,38854
2,N,O,74476040.0,111701729697.74,106118230307.6056,110367043872.497,25.502227,38249.117989,0.049997,2920374
3,R,F,37719753.0,56568041380.9,53741292684.604,55889619119.831924,25.505794,38250.854626,0.050009,1478870


# Now do this with Spark

In [3]:
%env JAVA_HOME"/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home"

env: JAVA_HOME"/Library/Internet=Plug-Ins/JavaAppletPlugin.plugin/Contents/Home"


In [4]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('TPCH Benchmark for Python').getOrCreate()


decrease = udf(lambda x, y: x * (1 - y), FloatType())
increase = udf(lambda x, y: x * (1 + y), FloatType())


lineitem = spark.read.parquet("data/lineitem.parquet")


24/11/04 13:44:51 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.ja

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed
	at java.base/javax.security.auth.Subject.getSubject(Subject.java:347)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:577)
	at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2416)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2416)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:329)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:501)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:485)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1575)


In [None]:
lineitem.filter(col("l_shipdate") <= "1998-09-02") \
.groupBy(col("l_returnflag"), col("l_linestatus")) \
.agg(F.sum(col("l_quantity")).alias("sum_qty"),
        F.sum(col("l_extendedprice")).alias("sum_base_price"),
        F.sum(decrease(col("l_extendedprice"), col("l_discount"))).alias("sum_disc_price"),
        F.sum(increase(decrease(col("l_extendedprice"), col("l_discount")), col("l_tax"))).alias("sum_charge"),
        F.avg(col("l_quantity")).alias("avg_qty"),
        F.avg(col("l_extendedprice")).alias("avg_price"),
        F.avg(col("l_discount")).alias("avg_disc"),
        F.count(col("l_quantity")).alias("count_order")) \
.sort(col("l_returnflag"), col("l_linestatus")) \
.show()