# Data pipelines involves extracting, transforming, & loading data for analytical use case

In [None]:
! python ../setup.py

In [None]:
import duckdb
import pandas as pd

%load_ext sql
conn = duckdb.connect("tpch.db")
%sql conn --alias duckdb

In [None]:
%%sql
show tables;

## Let's create our data pipeline with Python

In [None]:
import duckdb

db_file_name = 'tpch.db'
conn = duckdb.connect(db_file_name)
cursor = conn.cursor()

# Connect to DuckDB and load TPC-H tables into Pandas DataFrames
customer_df = cursor.sql("SELECT * FROM customer").df()
orders_df = cursor.sql("SELECT * FROM orders").df()
lineitem_df = cursor.sql("SELECT * FROM lineitem").df()
nation_df = cursor.sql("SELECT * FROM nation").df()
region_df = cursor.sql("SELECT * FROM region").df()
supplier_df = cursor.sql("SELECT * FROM supplier").df()
part_df = cursor.sql("SELECT * FROM part").df()
partsupp_df = cursor.sql("SELECT * FROM partsupp").df()

conn.close()

In [None]:
# Create bronze tables
def create_bronze_tables(db_file_name):
    with duckdb.connect(db_file_name) as con:
        con.sql("""
        DROP SCHEMA IF EXISTS bronze CASCADE;
        """)
        
        con.sql("""
        CREATE SCHEMA IF NOT EXISTS bronze;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS bronze.customer;
        """)
        
        con.sql("""
        CREATE TABLE bronze.customer AS 
        SELECT 
            c_custkey AS customer_key,
            c_name AS name,
            c_address AS address,
            c_nationkey AS nationkey,
            c_phone AS phone,
            c_acctbal AS acctbal,
            c_mktsegment AS mktsegment,
            c_comment AS comment
        FROM customer;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS bronze.nation;
        """)
        
        con.sql("""
        CREATE TABLE bronze.nation AS 
        SELECT 
            n_nationkey AS nationkey,
            n_name AS name,
            n_regionkey AS regionkey,
            n_comment AS comment
        FROM nation;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS bronze.region;
        """)
        
        con.sql("""
        CREATE TABLE bronze.region AS 
        SELECT 
            r_regionkey AS regionkey,
            r_name AS name,
            r_comment AS comment
        FROM region;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS bronze.orders;
        """)
        
        con.sql("""
        CREATE TABLE bronze.orders AS 
        SELECT 
            o_orderkey AS orderkey,
            o_custkey AS custkey,
            o_orderstatus AS orderstatus,
            o_totalprice AS totalprice,
            o_orderdate AS orderdate,
            o_orderpriority AS orderpriority,
            o_clerk AS clerk,
            o_shippriority AS shippriority,
            o_comment AS comment
        FROM orders;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS bronze.lineitem;
        """)
        
        con.sql("""
        CREATE TABLE bronze.lineitem AS 
        SELECT 
            l_orderkey AS orderkey,
            l_partkey AS partkey,
            l_suppkey AS suppkey,
            l_linenumber AS linenumber,
            l_quantity AS quantity,
            l_extendedprice AS extendedprice,
            l_discount AS discount,
            l_tax AS tax,
            l_returnflag AS returnflag,
            l_linestatus AS linestatus,
            l_shipdate AS shipdate,
            l_commitdate AS commitdate,
            l_receiptdate AS receiptdate,
            l_shipinstruct AS shipinstruct,
            l_shipmode AS shipmode,
            l_comment AS comment
        FROM lineitem;
        """)

In [None]:
# Create silver tables
def create_silver_tables(db_file_name):
    with duckdb.connect(db_file_name) as con:
        con.sql("""
        DROP TABLE IF EXISTS dim_customer;
        """)
        
        con.sql("""
        CREATE TABLE dim_customer AS
        SELECT 
            c.customer_key,
            c.name AS customer_name,
            c.address,
            c.phone,
            c.acctbal,
            c.mktsegment,
            n.name AS nation_name,
            n.comment AS nation_comment,
            r.name AS region_name,
            r.comment AS region_comment
        FROM bronze.customer AS c
        LEFT JOIN bronze.nation AS n ON c.nationkey = n.nationkey
        LEFT JOIN bronze.region AS r ON n.regionkey = r.regionkey;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS fct_orders;
        """)
        
        con.sql("""
        CREATE TABLE fct_orders AS
        SELECT 
            o.orderkey,
            o.custkey,
            o.orderstatus,
            o.totalprice,
            o.orderdate,
            o.orderpriority,
            o.clerk,
            o.shippriority,
            o.comment
        FROM bronze.orders AS o;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS fct_lineitem;
        """)
        
        con.sql("""
        CREATE TABLE fct_lineitem AS
        SELECT 
            l.orderkey,
            l.partkey,
            l.suppkey,
            l.linenumber,
            l.quantity,
            l.extendedprice,
            l.discount,
            l.tax,
            l.returnflag,
            l.linestatus,
            l.shipdate,
            l.commitdate,
            l.receiptdate,
            l.shipinstruct,
            l.shipmode,
            l.comment
        FROM bronze.lineitem AS l;
        """)
        con.commit()

In [None]:
# Create gold tables
def create_obts(db_file_name):
    with duckdb.connect(db_file_name) as con:
        con.sql("""
        DROP TABLE IF EXISTS wide_orders;
        """)
        
        con.sql("""
        CREATE TABLE wide_orders AS
        SELECT o.*,
            c.*
        FROM fct_orders o
        LEFT JOIN dim_customer c 
        ON o.custkey = c.customer_key;
        """)
        
        con.sql("""
        DROP TABLE IF EXISTS wide_lineitem;
        """)
        
        con.sql("""
        CREATE TABLE wide_lineitem AS
        SELECT * FROM fct_lineitem;
        """)


def create_preagg_tables(db_file_name):
    with duckdb.connect(db_file_name) as con:
        con.sql("""
        DROP TABLE IF EXISTS order_lineitem_metrics;
        """)
        
        con.sql("""
        CREATE TABLE order_lineitem_metrics AS
        SELECT 
            orderkey AS order_key,
            COUNT(linenumber) AS num_lineitems
        FROM wide_lineitem
        GROUP BY orderkey;
        """)
        con.sql("""
        DROP TABLE IF EXISTS customer_outreach_metrics;
        """)
        
        con.sql("""
        CREATE TABLE customer_outreach_metrics AS
        SELECT 
            o.customer_key,
            o.customer_name,
            MIN(o.totalprice) AS min_order_value,
            MAX(o.totalprice) AS max_order_value,
            AVG(o.totalprice) AS avg_order_value,
            AVG(m.num_lineitems) AS avg_num_items_per_order
        FROM wide_orders AS o
        LEFT JOIN (
            SELECT 
                orderkey AS order_key,
                COUNT(linenumber) AS num_lineitems
            FROM wide_lineitem
            GROUP BY orderkey
        ) AS m ON o.orderkey = m.order_key
        GROUP BY o.customer_key, o.customer_name;
        """)

def create_gold_tables(db_file_name):
    create_obts(db_file_name)
    create_preagg_tables(db_file_name)

In [None]:
def run_pipeline(db_file_name):
    print("==========CREATING BRONZE TABLES===================")
    create_bronze_tables(db_file_name)
    print("==========CREATING SILVER TABLES===================")
    create_silver_tables(db_file_name)
    print("==========CREATING GOLD TABLES===================")
    create_gold_tables(db_file_name)

In [None]:
# Run the data pipeline 
db_file_name = './tpch.db'
run_pipeline(db_file_name)

In [None]:
with duckdb.connect(db_file_name) as con:
    con.table("customer_outreach_metrics").show(max_rows=5)

Cons

* Code
* Bugs and maintanance
* adding new tables is tough
* need to split out table functions for modularity,

Just running SQL code, there is an easier way to do this with data build tool (dbt)

## dbt (data build tool) enables one to build data pipelines with SWE best practices

### dbt is a cli 

- One SQL select query (with any transformations you want) per file
- file name = model name (model can be SQL table/view/matreilaized view, etc)
- Data quality checks
- define documentation with yml file
- automatic data lineage diagram with a static webpage
- support for SCD2 creation

### The popularity of dbt is its ability to create data pipelines with SQL scripts

In [None]:
! dbt --version

In [None]:
! dbt init tpch_warehouse

Create a `profiles.yml` in the `tpch_warehouse` folder with the following content (note this would be already created for you)

```yml
---
config:
  send_anonymous_usage_stats: false
tpch_warehouse:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: ./dbt.duckdb
    prod:
      type: duckdb
      path: ./dbt-prod.duckdb
```

In [6]:
! cd tpch_warehouse && dbt debug # Command to check connection to our warehouse

[0m17:17:49  Running with dbt=1.8.8

User config should be moved from the 'config' key in profiles.yml to the 'flags' key in dbt_project.yml.
[0m17:17:49  dbt version: 1.8.8
[0m17:17:49  python version: 3.12.4
[0m17:17:49  python path: /home/josephkevinmachado/code/de_101/env/bin/python
[0m17:17:49  os info: Linux-6.9.3-76060903-generic-x86_64-with-glibc2.35
[0m17:17:49  Using profiles dir at /home/josephkevinmachado/code/de_101/4-Data-Pipeline/tpch_warehouse
[0m17:17:49  Using profiles.yml file at /home/josephkevinmachado/code/de_101/4-Data-Pipeline/tpch_warehouse/profiles.yml
[0m17:17:49  Using dbt_project.yml file at /home/josephkevinmachado/code/de_101/4-Data-Pipeline/tpch_warehouse/dbt_project.yml
[0m17:17:49  adapter type: duckdb
[0m17:17:49  adapter version: 1.9.0
[0m17:17:49  Configuration:
[0m17:17:49    profiles.yml file [[32mOK found and valid[0m]
[0m17:17:49    dbt_project.yml file [[32mOK found and valid[0m]
[0m17:17:49  Required dependencies:
[0m17:17:4

In [7]:
! cd tpch_warehouse/ && dbt run

[0m17:19:21  Running with dbt=1.8.8

User config should be moved from the 'config' key in profiles.yml to the 'flags' key in dbt_project.yml.
[0m17:19:21  Registered adapter: duckdb=1.9.0
[0m17:19:21  Unable to do partial parsing because saved manifest not found. Starting full parse.
[0m17:19:23  Found 4 models, 4 data tests, 416 macros
[0m17:19:23  
[0m17:19:23  Concurrency: 1 threads (target='dev')
[0m17:19:23  
[0m17:19:23  1 of 4 START sql table model main.my_first_dbt_model ........................... [RUN]
[0m17:19:23  1 of 4 OK created sql table model main.my_first_dbt_model ...................... [[32mOK[0m in 0.13s]
[0m17:19:23  2 of 4 START sql table model main.my_first_dbt_model-checkpoint ................ [RUN]
[0m17:19:23  2 of 4 OK created sql table model main.my_first_dbt_model-checkpoint ........... [[32mOK[0m in 0.05s]
[0m17:19:23  3 of 4 START sql view model main.my_second_dbt_model ........................... [RUN]
[0m17:19:23  3 of 4 OK created sql 

### dbt has a preferred step-by-step way to transform data

### Connection credentials are stored in profiles.yml

### Configuration settings are stored in dbt_project.yml

### Ensure the data is correct with tests


### Create SCD2 easily with dbt snapshot