
## Table of Content 
**1. TPCH Data Set Exploration** \
**2. Data Modeling** \
**3. Ingestion** \
**4. Transform** \
**5. Queries**

In [0]:
dbutils.fs.ls('dbfs:/databricks-datasets/tpch/data-001/partsupp')

In [0]:
from pyspark.sql import functions as F

#1.TPCH Data Set Exploration 

In [0]:
dbutils.fs.head('dbfs:/databricks-datasets/tpch/data-001/lineitem/lineitem.tbl')

In [0]:
dataDir = "dbfs:/databricks-datasets/tpch/data-001/"
# for extraction and loading using for loop
tbl_list = ["customer", "orders", "part", "partsupp", "region", "nation", "supplier", "lineitem"]
tbl_list_cap = [tbl.capitalize() for tbl in tbl_list]

In [0]:
# hold extracted tables in df
Df = {}

# load each table, create temp view and display and row count
for tbl in tbl_list:
  Df[tbl] = spark.read.csv(f'{dataDir}{tbl}/{tbl}.tbl', sep = '|')
  Df[tbl].createOrReplaceTempView(f"{tbl}tbl")
  print(f'{tbl} Table')
  print(f'Number of rows: {Df[tbl].count()}')
  display(Df[tbl].limit(5))

In [0]:
for tbl in tbl_list_cap:
    explain = spark.sql(f'DESC EXTENDED tpch_etl_catalog.tpch_schema.{tbl}')
    display(explain)

##2. Data Modeling

In [0]:
%sql
--set up catalog, schema, and volumes 
CREATE CATALOG IF NOT EXISTS TPCH_ETL_catalog;
USE CATALOG TPCH_ETL_catalog;

CREATE SCHEMA IF NOT EXISTS TPCH_schema;
USE SCHEMA TPCH_schema;

CREATE VOLUME IF NOT EXISTS input;
CREATE VOLUME IF NOT EXISTS output;

### ER diagram 

In [0]:
displayHTML("""
<img src="/Workspace/Users/tss749@g.harvard.edu/TPCH-ETL-pipeline-databricks/ER_model (1).png" width="900" height="600">
""")

Databricks does not enforce primary key (PK) or foreign key (FK) constraints, but it allows us to define them in your table schema purely for documentation and for guiding programmers.

### DDL

In [0]:
%sql

-- Region Table
CREATE OR REPLACE TABLE Region (
  region_key BIGINT NOT NULL PRIMARY KEY,
  name STRING,
  comment STRING
)
USING DELTA;

-- Nation Table
CREATE OR REPLACE TABLE Nation (
  nation_key BIGINT NOT NULL PRIMARY KEY,
  name STRING,
  region_key BIGINT NOT NULL,  
  comment STRING
)
USING DELTA;

-- Supplier Table
CREATE OR REPLACE TABLE Supplier (
  supp_key BIGINT NOT NULL PRIMARY KEY,
  name STRING,
  address STRING,
  nation_key BIGINT NOT NULL,
  phone STRING,
  acct_bal DOUBLE,
  comment STRING
)
USING DELTA;

-- Customer Table
CREATE OR REPLACE TABLE Customer (
  cust_key BIGINT NOT NULL PRIMARY KEY,
  name STRING,
  address STRING,
  nation_key BIGINT NOT NULL,
  phone STRING,
  acct_bal DOUBLE,
  mkt_segment STRING,
  comment STRING
)
USING DELTA;

-- Part Table
CREATE OR REPLACE TABLE Part (
  part_key BIGINT NOT NULL PRIMARY KEY,
  name STRING,
  mfgr STRING,
  brand STRING,
  type STRING,
  size BIGINT,
  container STRING,
  retail_price DOUBLE,
  comment STRING
)
USING DELTA;

-- Orders Table
CREATE OR REPLACE TABLE Orders (
  order_key BIGINT PRIMARY KEY,
  cust_key BIGINT NOT NULL,
  order_status STRING,
  total_price DOUBLE,
  order_date DATE,
  order_priority STRING,
  clerk STRING,
  ship_priority STRING,
  comment STRING
)
USING DELTA;

-- Lineitem Table
CREATE OR REPLACE TABLE Lineitem (
  order_key BIGINT NOT NULL,
  part_key BIGINT NOT NULL,
  supp_key BIGINT NOT NULL,
  line_number BIGINT,
  quantity BIGINT,
  extended_price DOUBLE,
  discount DOUBLE,
  tax DOUBLE,
  return_flag STRING,
  line_status BOOLEAN,
  ship_date DATE,
  commit_date DATE,
  receipt_date DATE,
  ships_instruct STRING,
  ship_mode STRING,
  comment STRING
)
USING DELTA;

-- Partsupp Table
CREATE OR REPLACE TABLE Partsupp (
  part_key BIGINT NOT NULL,
  supp_key BIGINT NOT NULL,
  avail_qty BIGINT,
  supply_cost DOUBLE,
  comment STRING
)
USING DELTA;

In [0]:
%sql 
ALTER TABLE Customer ADD CONSTRAINT customer_nation_key FOREIGN KEY (nation_key) REFERENCES Nation(nation_key);
ALTER TABLE Supplier ADD CONSTRAINT supplier_nation_key FOREIGN KEY (nation_key) REFERENCES Nation(nation_key);
ALTER TABLE Orders ADD CONSTRAINT orders_cust_key FOREIGN KEY (cust_key) REFERENCES Customer(cust_key);
ALTER TABLE Lineitem ADD CONSTRAINT lineitem_order_key FOREIGN KEY (order_key) REFERENCES Orders(order_key);
ALTER TABLE Lineitem ADD CONSTRAINT lineitem_part_key FOREIGN KEY (part_key) REFERENCES Part(part_key);
ALTER TABLE Lineitem ADD CONSTRAINT lineitem_supp_key FOREIGN KEY (supp_key) REFERENCES Supplier(supp_key);
ALTER TABLE Partsupp ADD CONSTRAINT partsupp_part_key FOREIGN KEY (part_key) REFERENCES Part(part_key);
ALTER TABLE Partsupp ADD CONSTRAINT partsupp_supp_key FOREIGN KEY (supp_key) REFERENCES Supplier(supp_key);
ALTER TABLE Nation ADD CONSTRAINT nation_region_key FOREIGN KEY (region_key) REFERENCES Region(region_key);
ALTER TABLE Region ADD CONSTRAINT region_region_key FOREIGN KEY (region_key) REFERENCES Region(region_key); 

In [0]:
%sql
ALTER TABLE lineitem ADD CONSTRAINT chk_lineitem_qty CHECK (quantity > 0);
ALTER TABLE lineitem ADD CONSTRAINT chk_lineitem_discount CHECK (discount >= 0 AND discount <= 1);
ALTER TABLE lineitem ADD CONSTRAINT chk_lineitem_tax CHECK (tax >= 0);
ALTER TABLE lineitem ADD CONSTRAINT chk_lineitem_extended_price CHECK (extended_price >= 0);
ALTER TABLE lineitem ADD CONSTRAINT chk_lineitem_return_flag CHECK (return_flag IN ('R', 'A', 'N'));
ALTER TABLE customer ADD CONSTRAINT chk_customer_mkt_segment CHECK (mkt_segment IN ('AUTOMOBILE', 'BUILDING', 'FURNITURE', 'MACHINERY', 'HOUSEHOLD'));
ALTER TABLE part ADD CONSTRAINT chk_part_size CHECK (size > 0);

##3. Ingestion 

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, LongType

# Dictionary of schemas
schema_map = {
    "Region": StructType([
        StructField("region_key", LongType(), True),
        StructField("name", StringType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Nation": StructType([
        StructField("nation_key", LongType(), True),
        StructField("name", StringType(), True),
        StructField("region_key", LongType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Supplier": StructType([
        StructField("supp_key", LongType(), True),
        StructField("name", StringType(), True),
        StructField("address", StringType(), True),
        StructField("nation_key", LongType(), True),
        StructField("phone", StringType(), True),
        StructField("acct_bal", DoubleType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Customer": StructType([
        StructField("cust_key", LongType(), True),
        StructField("name", StringType(), True),
        StructField("address", StringType(), True),
        StructField("nation_key", LongType(), True),
        StructField("phone", StringType(), True),
        StructField("acct_bal", DoubleType(), True),
        StructField("mkt_segment", StringType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Part": StructType([
        StructField("part_key", LongType(), True),
        StructField("name", StringType(), True),
        StructField("mfgr", StringType(), True),
        StructField("brand", StringType(), True),
        StructField("type", StringType(), True),
        StructField("size", LongType(), True),
        StructField("container", StringType(), True),
        StructField("retail_price", DoubleType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Orders": StructType([
        StructField("order_key", LongType(), True),
        StructField("cust_key", LongType(), True),
        StructField("order_status", StringType(), True),
        StructField("total_price", DoubleType(), True),
        StructField("order_date", StringType(), True),
        StructField("order_priority", StringType(), True),
        StructField("clerk", StringType(), True),
        StructField("ship_priority", StringType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Lineitem": StructType([
        StructField("order_key", LongType(), True),
        StructField("part_key", LongType(), True),
        StructField("supp_key", LongType(), True),
        StructField("line_number", LongType(), True),
        StructField("quantity", LongType(), True),
        StructField("extended_price", DoubleType(), True),
        StructField("discount", DoubleType(), True),
        StructField("tax", DoubleType(), True),
        StructField("return_flag", StringType(), True),
        StructField("line_status", StringType(), True),
        StructField("ship_date", StringType(), True),
        StructField("commit_date", StringType(), True),
        StructField("receipt_date", StringType(), True),
        StructField("ships_instruct", StringType(), True),
        StructField("ship_mode", StringType(), True),
        StructField("comment", StringType(), True)
    ]),
    "Partsupp": StructType([
        StructField("part_key", LongType(), True),
        StructField("supp_key", LongType(), True),
        StructField("avail_qty", LongType(), True),
        StructField("supply_cost", DoubleType(), True),
        StructField("comment", StringType(), True)
    ])
}


In [0]:
Df = {}

# Ingest tables, change schema
for tbl in tbl_list_cap:
    Df[tbl] = spark.read.csv(f'{dataDir}/{tbl.lower()}/{tbl.lower()}.tbl', 
                             schema=schema_map[tbl], sep='|')
    # Drop the last column which is always empty
    Df[tbl] = Df[tbl].drop(Df[tbl].columns[-1])

In [0]:
_# convert line_status col from lineitem table to bool  
Df['Lineitem'] = Df['Lineitem'] \
        .withColumn("line_status", F.when(F.col("line_status") == "O", True)
        .when(F.col("line_status") == "F", False).otherwise(None))  

# change to datetime format for oder_date, ship_date, commit_date, receipt_date 
Df["Orders"] = Df["Orders"].withColumn("order_date", F.to_date(F.col("order_date"), "yyyy-MM-dd"))
Df["Lineitem"] = Df["Lineitem"].withColumn("ship_date", F.to_date(F.col("ship_date"), "yyyy-MM-dd"))
Df["Lineitem"] = Df["Lineitem"].withColumn("commit_date", F.to_date(F.col("commit_date"), "yyyy-MM-dd"))
Df["Lineitem"] = Df["Lineitem"].withColumn("receipt_date", F.to_date(F.col("receipt_date"), "yyyy-MM-dd"))

In [0]:
# Load the data to appropriate table 
for tbl in tbl_list_cap:
  Df[tbl].write.format("delta").mode("overwrite").saveAsTable(tbl)
  print(f"Table {tbl} written to disk")

##3. Trasform 

In [0]:
userDir = '/Volumes/tpch_etl_catalog/tpch_schema/'

CountryCode.org is a website that provides comprehensive country-level data: it includes international telephone dialing codes, ISO 3166 country codes (both 2-letter and 3-letter), population, area, and GDP information and more. The data set has been imported to the input Volume.


In [0]:
dbutils.fs.cp(f"{userDir}/input/data/countrycode.csv", f"{userDir}/output/data/countrycode.csv", recurse=True)

In [0]:
df = spark.read.option('inferSchema',True).option('header',True).csv(f"{userDir}/output/data/countrycode.csv")
nationsmapper_df = df.select(F.upper(F.col('Country Name')).alias('nation'), F.col('ISO3').alias('iso'))

# Save df as a table 
nationsmapper_df.write.format('delta').mode("overwrite").saveAsTable("Nationsmapper")
display(nationsmapper_df)

In [0]:
nationmapped_df = spark.sql("""
        SELECT n.name, nm.iso, n.nation_key, n.region_key
        FROM nation n
        LEFT JOIN Nationsmapper AS nm
        ON n.name = nm.nation
""")

display(nationmapped_df)

##5. Queries 