# Init SparkContext

In [None]:
import os
from datetime import datetime
from pyspark import SparkContext, HiveContext
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as sf

In [None]:
spark = (SparkSession.builder.appName("tpch-benchmark-{}".format(datetime.today()))
        .master("spark://spark-master:7077")      
        .getOrCreate())

sqlContext = SQLContext(spark)

# Load data

In [None]:
def convert_dates(df_data):
    for col in df_data.columns:
        if "date" in col.lower():
            df_data = df_data.withColumn(col, sf.date_sub(sf.to_date(col, "dd.MM.yy"), 365 * 100))
    return df_data

In [None]:
%%time
base_path = "s3a://warehouse/tpch_data"
df_lineitem = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, "h_lineitem.dsv"))
)
df_lineitem = convert_dates(df_lineitem)

df_customer = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, "h_customer.dsv"))
)
df_customer = convert_dates(df_customer)

df_order = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, "h_order.dsv"))
)
df_order = convert_dates(df_order)

df_supplier = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, "h_supplier.dsv"))
)
df_supplier = convert_dates(df_supplier)

df_nation = (
    spark.read.option("delimiter", "|")
    .option("header", True)
    .option("inferSchema" , True)
    .csv(os.path.join(base_path, "h_nation.dsv"))
)
df_nation = convert_dates(df_nation)

# Benchmark

## 1. Group by, Order by

```sql
select
	l_returnflag,
	l_linestatus,
	sum(l_quantity) as sum_qty,
	sum(l_extendedprice) as sum_base_price,
	sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
	sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
	avg(l_quantity) as avg_qty,
	avg(l_extendedprice) as avg_price,
	avg(l_discount) as avg_disc,
	count(*) as count_order
from
	lineitem
group by
	l_returnflag,
	l_linestatus
order by
	l_returnflag,
	l_linestatus;
```    

In [None]:
%%time
df_analytics = (
    df_lineitem.groupBy("l_returnflag", "l_linestatus")
    .agg(*[
        sf.sum("l_quantity").alias("sum_qty"),
        sf.sum("l_extendedprice").alias("sum_base_price"),
        sf.sum(sf.col("l_extendedprice") * (sf.lit(1) - sf.col("l_discount"))).alias("sum_disc_price"),
        sf.sum(sf.col("l_extendedprice") * (sf.lit(1) - sf.col("l_discount")) * (sf.lit(1) + sf.col("l_tax"))).alias("sum_charge"),
        sf.avg("l_quantity").alias("avg_qty"),
        sf.sum("l_extendedprice").alias("avg_price"),
        sf.sum("l_discount").alias("avg_disc"),
        sf.count("*").alias("count_order")
    ])
    .orderBy(sf.col("l_returnflag"), sf.col("l_linestatus"))
)

df_analytics.limit(20).toPandas()

## 2. Join, Group By, Order By

```sql
select
	l_orderkey,
	sum(l_extendedprice * (1 - l_discount)) as revenue,
	o_orderdate,
	o_shippriority
from
	customer,
	orders,
	lineitem
where
	c_mktsegment = 'AUTOMOBILE'
	and c_custkey = o_custkey
	and l_orderkey = o_orderkey
group by
	l_orderkey,
	o_orderdate,
	o_shippriority
order by
	revenue desc,
	o_orderdate
```    

In [None]:
%%time
df_analytics = (
    df_customer.join(df_order, (df_customer["c_custkey"] == df_order["o_custkey"]))
    .join(df_lineitem, (df_lineitem["l_orderkey"] == df_order["o_orderkey"]))
    .where("c_mktsegment = 'AUTOMOBILE'")
    .groupBy("l_orderkey", "o_orderdate", "o_shippriority")
    .agg(sf.sum(sf.col("l_extendedprice") * (sf.lit(1) - sf.col("l_discount"))).alias("revenue"))
    .orderBy(sf.col("revenue").desc(), sf.col("o_orderdate"))
)

df_analytics.show()

## 3. Sub queries, Join, Group by, Order by

```sql
select
	supp_nation,
	cust_nation,
	l_year,
	sum(volume) as revenue
from
	(
		select
			n1.n_name as supp_nation,
			n2.n_name as cust_nation,
			extract(year from l_shipdate) as l_year,
			l_extendedprice * (1 - l_discount) as volume
		from
			supplier,
			lineitem,
			orders,
			customer,
			nation n1,
			nation n2
		where
			s_suppkey = l_suppkey
			and o_orderkey = l_orderkey
			and c_custkey = o_custkey
			and s_nationkey = n1.n_nationkey
			and c_nationkey = n2.n_nationkey
			and l_shipdate between date '1995-01-01' and date '1996-12-31'
	) as shipping
group by
	supp_nation,
	cust_nation,
	l_year
order by
	supp_nation,
	cust_nation,
	l_year;
```    

In [None]:
%%time
df_analytics = (
    df_supplier.join(df_lineitem, (df_supplier["s_suppkey"] == df_lineitem["l_suppkey"]))
    .join(df_order, (df_order["o_orderkey"] == df_lineitem["l_orderkey"]))
    .join(df_customer, (df_customer["c_custkey"] == df_order["o_custkey"]))
    .join(df_nation, (df_supplier["s_nationkey"] == df_nation["n_nationkey"]))
    .selectExpr(*[
        "n_name AS supp_nation", 
        "EXTRACT(YEAR FROM l_shipdate) AS l_year",
        "l_extendedprice * (1 - l_discount) AS volume",
        "c_nationkey"
    ])
    .where("l_shipdate between date '1995-01-01' and date '1996-12-31'")
    .join(df_nation, (df_customer["c_nationkey"] == df_nation["n_nationkey"]))
    .selectExpr(*[
        "supp_nation", 
        "n_name AS cust_nation",
        "l_year",
        "volume"
    ])
    .groupBy("supp_nation", "cust_nation", "l_year")
    .agg(sf.sum("volume").alias("revenue"))
    .orderBy("supp_nation", "cust_nation", "l_year")
)

df_analytics.show()

In [None]:
%%time
df_analytics = (
    sf.broadcast(df_supplier).join(df_lineitem, (df_supplier["s_suppkey"] == df_lineitem["l_suppkey"]))
    .join(df_order, (df_order["o_orderkey"] == df_lineitem["l_orderkey"]))
    .join(df_customer, (df_customer["c_custkey"] == df_order["o_custkey"]))
    .join(sf.broadcast(df_nation), (df_supplier["s_nationkey"] == df_nation["n_nationkey"]))
    .selectExpr(*[
        "n_name AS supp_nation", 
        "EXTRACT(YEAR FROM l_shipdate) AS l_year",
        "l_extendedprice * (1 - l_discount) AS volume",
        "c_nationkey"
    ])
    .where("l_shipdate between date '1995-01-01' and date '1996-12-31'")
    .join(sf.broadcast(df_nation), (df_customer["c_nationkey"] == df_nation["n_nationkey"]))
    .selectExpr(*[
        "supp_nation", 
        "n_name AS cust_nation",
        "l_year",
        "volume"
    ])
    .groupBy("supp_nation", "cust_nation", "l_year")
    .agg(sf.sum("volume").alias("revenue"))
    .orderBy("supp_nation", "cust_nation", "l_year")
)

df_analytics.show()