In [78]:
#Setup
import pandas as pd
from db import get_engine

engine = get_engine()

with engine.begin() as connection:
    connection.exec_driver_sql("PRAGMA foreign_keys = ON;")
    connection.exec_driver_sql("ATTACH DATABASE 'raw.db' AS raw;")
    connection.exec_driver_sql("ATTACH DATABASE 'relational.db' AS relational;")

In [79]:
#Validating that all required raw tables exist before performing data transformations
#Failing indicates that loading raw tables (00_load_raw) was not run
tables_needed = [
    'crm_customers',
    'crm_products',
    'crm_sales',
    'erp_customers',
    'erp_locations',
    'erp_product_categories'
]

tables_loaded = pd.read_sql("""
SELECT name
FROM raw.sqlite_master
WHERE type = 'table'
;""", engine)['name'].tolist()

missing = set(tables_needed) - set(tables_loaded)

if missing:
    print(f'Missing Tables:')
    for m in missing:
        print(f'{m}')
    raise RuntimeError(
    f'Run 00_load_raw.ipynb first')

### Loading Transformed Data into Staging Views

##### stg_crm_customers

In [80]:
#Applying transformations to raw.crm_customers as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_crm_customers;""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_crm_customers AS
    WITH cleaned_crm_customers AS (
        SELECT 
            CAST(cst_id AS INTEGER) AS cst_id, 
            cst_key, 
            TRIM(cst_firstname) AS cst_firstname, 
            TRIM(cst_lastname) AS cst_lastname,
            CASE
                WHEN UPPER(TRIM(cst_marital_status)) = 'M' THEN 'Married'
                WHEN UPPER(TRIM(cst_marital_status)) = 'S' THEN 'Single'
                ELSE NULL
                END AS cst_marital_status,
            CASE
                WHEN UPPER(TRIM(cst_gndr)) = 'M' THEN 'Male'
                WHEN UPPER(TRIM(cst_gndr)) = 'F' THEN 'Female'
                ELSE NULL
                END AS cst_gndr,
            CASE 
                WHEN DATE(cst_create_date) > DATE('now')
                THEN NULL
                ELSE DATE(cst_create_date)
                END AS cst_create_date
        FROM raw.crm_customers
        WHERE NOT (cst_id IS NULL
        AND cst_firstname IS NULL
        AND cst_lastname IS NULL)
        ),
    ranked_crm_customers AS (
    SELECT *, 
        ROW_NUMBER() OVER (
            PARTITION BY cst_id
            ORDER BY cst_create_date DESC)
        AS row_num
    FROM cleaned_crm_customers
    )
    SELECT
        cst_id,
        cst_key, 
        cst_firstname,
        cst_lastname,
        cst_marital_status,
        cst_gndr,
        cst_create_date
    FROM ranked_crm_customers
    WHERE row_num = 1;""")

##### stg_crm_products

In [81]:
#Applying transformations to raw.crm_products as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_crm_products;""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_crm_products AS
    SELECT 
        CAST(prd_id AS INTEGER) AS prd_id,
        SUBSTRING(TRIM(prd_key), 1, 5) AS cat_id,
        SUBSTRING(TRIM(prd_key), 7, LENGTH(prd_key)) AS prd_key,
        TRIM(prd_nm) as prd_nm,
        CAST(prd_cost AS INTEGER) AS prd_cost,
        CASE
            WHEN UPPER(TRIM(prd_line)) = 'M' THEN 'Mountain'
            WHEN UPPER(TRIM(prd_line)) = 'R' THEN 'Road'
            WHEN UPPER(TRIM(prd_line)) = 'S' THEN 'Other'
            WHEN UPPER(TRIM(prd_line)) = 'T' THEN 'Touring'
            ELSE NULL
            END AS prd_line,
        DATE(prd_start_dt) as prd_start_dt,
        CASE
            WHEN LEAD(prd_start_dt) OVER (
                PARTITION BY prd_key
                ORDER BY prd_start_dt) > prd_start_dt
            THEN DATE(LEAD(prd_start_dt) OVER (
                PARTITION BY prd_key
                ORDER BY prd_start_dt),
                '-1 day')
            ELSE NULL
            END AS prd_end_dt
    FROM raw.crm_products;""")

##### stg_crm_sales

In [82]:
#Applying transformations to raw.crm_sales as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_crm_sales;""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_crm_sales AS
    SELECT
        CAST(sls_ord_num AS TEXT) || '-' || CAST(sls_prd_key AS TEXT) AS sls_ord_key,
        sls_ord_num,
        sls_prd_key,
        CAST(sls_cust_id AS INTEGER) AS sls_cust_id,
        CASE 
            WHEN LENGTH(sls_order_dt) = 8
            THEN DATE(
                SUBSTRING(sls_order_dt, 1, 4) || '-' || 
                SUBSTRING(sls_order_dt, 5, 2) || '-' || 
                SUBSTRING(sls_order_dt, 7, 2))
            ELSE NULL
            END AS sls_order_dt,
        CASE 
            WHEN LENGTH(sls_ship_dt) = 8
            THEN DATE(
                SUBSTRING(sls_ship_dt, 1, 4) || '-' || 
                SUBSTRING(sls_ship_dt, 5, 2) || '-' || 
                SUBSTRING(sls_ship_dt, 7, 2))
            ELSE NULL
            END AS sls_ship_dt,
        CASE 
            WHEN LENGTH(sls_due_dt) = 8
            THEN DATE(
                SUBSTRING(sls_due_dt, 1, 4) || '-' || 
                SUBSTRING(sls_due_dt, 5, 2) || '-' || 
                SUBSTRING(sls_due_dt, 7, 2))
            ELSE NULL
            END AS sls_due_dt,
        CASE
            WHEN sls_sales IS NULL OR sls_sales <= 0 OR sls_sales != (ABS(sls_price) * sls_quantity)
            THEN CAST((ABS(sls_price) * sls_quantity) AS INTEGER)
            ELSE CAST(sls_sales AS INTEGER)
            END AS sls_sales,
        CAST (sls_quantity AS INTEGER) AS sls_quantity, 
        CASE
            WHEN sls_price IS NULL OR sls_price <= 0
            THEN CAST((sls_sales / sls_quantity) AS INTEGER)
            ELSE CAST(sls_price AS INTEGER)
            END AS sls_price
    FROM raw.crm_sales;""")

##### stg_erp_customers

In [83]:
#Applying transformations to raw.erp_customers as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_erp_customers;""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_erp_customers AS
    SELECT
        CASE
            WHEN CID LIKE 'NAS%'
            THEN SUBSTRING(CID, 4, LENGTH(CID))
            ELSE CID
            END AS CID,
        CASE
            WHEN SUBSTRING(BDATE, 1, 4) > '2000'
            THEN DATE('1900' || SUBSTRING(BDATE, 5, 6))
            ELSE DATE(BDATE)
            END AS BDATE,
        CASE
            WHEN UPPER(TRIM(GEN)) = 'MALE' OR UPPER(TRIM(GEN)) = 'M'
            THEN 'Male'
            WHEN UPPER(TRIM(GEN)) = 'FEMALE' OR UPPER(TRIM(GEN)) = 'F'
            THEN 'Female'
            ELSE NULL
            END AS GEN
    FROM raw.erp_customers
    ;""")

##### stg_erp_locations

In [84]:
#Applying transformations to raw.erp_locations as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_erp_locations;""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_erp_locations AS
    SELECT
        REPLACE(CID, '-', '') AS CID,
        CASE
            WHEN UPPER(TRIM(CNTRY)) = 'AUSTRALIA'
            THEN 'Australia'
            WHEN UPPER(TRIM(CNTRY)) = 'US' OR UPPER(TRIM(CNTRY)) = 'USA' OR UPPER(TRIM(CNTRY)) = 'UNITED STATES'
            THEN 'United States'
            WHEN UPPER(TRIM(CNTRY)) = 'DE' OR UPPER(TRIM(CNTRY)) = 'GERMANY'
            THEN 'Germany'
            WHEN UPPER(TRIM(CNTRY)) = 'CANADA'
            THEN 'Canada'
            WHEN UPPER(TRIM(CNTRY)) = 'UNITED KINGDOM'
            THEN 'United Kingdom'
            WHEN UPPER(TRIM(CNTRY)) = 'FRANCE'
            THEN 'France'
            ELSE NULL
            END AS CNTRY
    FROM raw.erp_locations
    ;""")

##### stg_erp_product_categories

In [85]:
#Applying transformations to raw.erp_product_categories as a view in raw.db
with engine.begin() as connection:
    connection.exec_driver_sql("""
    DROP VIEW IF EXISTS raw.stg_erp_product_categories""")

    connection.exec_driver_sql("""
    CREATE VIEW raw.stg_erp_product_categories AS
    SELECT
        REPLACE(ID, '_', '-') AS ID,
        CAT,
        SUBCAT,
        MAINTENANCE
    FROM raw.erp_product_categories
    ;""")

### Sanity Check

In [86]:
#Ensuring views were created
expected_views = [
    'stg_crm_customers',
    'stg_crm_products',
    'stg_crm_sales',
    'stg_erp_customers',
    'stg_erp_locations',
    'stg_erp_product_categories'
]

actual_views = pd.read_sql("""
SELECT name
FROM raw.sqlite_master
WHERE type = 'view';
""", engine)['name'].to_list()

missing = set(expected_views) - set(actual_views)

if missing:
    print('Missing Views:')
    for view in missing:
        print(f'{view}')
    print(f'\nRun cells in order\n')
else:
    print(f'All views created as expected!\n')

All views created as expected!

