# Online Retail Data Engineering Competition – End-to-End Pipeline (Snowflake)

This notebook demonstrates a complete workflow using Snowflake for an online retail company:
- Environment setup and connection
- Data extraction from RAW layer
- Data quality checks and profiling
- Transformations and modeling into CURATED layer
- Analytical model and metric creation in MART layer
- Performance and cost considerations
- Extension: predictive analytics ideas

Assumptions:
- SQL setup has been executed via `00_Setup.sql` to create schemas and seed data.
- Snowflake credentials are available as environment variables.


In [None]:
import os
import pandas as pd
import numpy as np
import snowflake.connector
from datetime import datetime

# Helper: Snowflake connection
def get_snowflake_connection():
    
 
    print(os.getenv('SNOWFLAKE_ACCOUNT'))
    for name, value in os.environ.items():
        if (name.startswith('SNOWFLAKE_')):
            print("{0}: {1}".format(name, value))
    
    conn = snowflake.connector.connect(
        user=os.getenv('SNOWFLAKE_USER'),
        password=os.getenv('SNOWFLAKE_PASSWORD'),
        account=os.getenv('SNOWFLAKE_ACCOUNT'),
        warehouse=os.getenv('SNOWFLAKE_WAREHOUSE', 'WH_RETAIL_COMPETITION'),
        database=os.getenv('SNOWFLAKE_DATABASE', 'RETAIL_COMPETITION'),
        role=os.getenv('SNOWFLAKE_ROLE')
    )
    return conn

conn = get_snowflake_connection()
cur = conn.cursor()
cur.execute("USE DATABASE RETAIL_COMPETITION;")
cur.execute("USE SCHEMA RAW;")
print('Connected to Snowflake and set context.')


## Extract: Read from RAW tables
We will quickly preview `CUSTOMERS`, `PRODUCTS`, `ORDERS`, `ORDER_ITEMS`, `CUSTOMER_INTERACTIONS`, and `MARKETING_CAMPAIGNS`.


In [None]:
tables = [
    'CUSTOMERS','PRODUCTS','SUPPLIERS','EMPLOYEES','ORDERS','ORDER_ITEMS','CUSTOMER_INTERACTIONS','MARKETING_CAMPAIGNS'
]
preview = {}
for t in tables:
    cur.execute(f"SELECT * FROM RAW.{t} LIMIT 5")
    cols = [c[0] for c in cur.description]
    rows = cur.fetchall()
    preview[t] = pd.DataFrame(rows, columns=cols)

{t: df.shape for t, df in preview.items()}


## Data Quality Checks
We will run basic DQ checks: row counts, null checks on keys, referential integrity checks, and value ranges.


In [None]:
dq_sql = {
    'customers_not_null_id': "SELECT COUNT(*) CNT FROM RAW.CUSTOMERS WHERE CUSTOMER_ID IS NULL",
    'orders_orphan_customers': "SELECT COUNT(*) CNT FROM RAW.ORDERS o LEFT JOIN RAW.CUSTOMERS c ON o.CUSTOMER_ID=c.CUSTOMER_ID WHERE c.CUSTOMER_ID IS NULL",
    'order_items_orphan_orders': "SELECT COUNT(*) CNT FROM RAW.ORDER_ITEMS oi LEFT JOIN RAW.ORDERS o ON oi.ORDER_ID=o.ORDER_ID WHERE o.ORDER_ID IS NULL",
    'order_items_orphan_products': "SELECT COUNT(*) CNT FROM RAW.ORDER_ITEMS oi LEFT JOIN RAW.PRODUCTS p ON oi.PRODUCT_ID=p.PRODUCT_ID WHERE p.PRODUCT_ID IS NULL",
    'price_non_negative': "SELECT COUNT(*) CNT FROM RAW.PRODUCTS WHERE UNIT_PRICE < 0 OR COST_PRICE < 0"
}

results = {}
for name, sql in dq_sql.items():
    cur.execute(sql)
    results[name] = cur.fetchone()[0]

results


## Transformations and Modeling (CURATED)
We will build `DIM_CUSTOMER`, `DIM_PRODUCT`, `DIM_DATE`, `FACT_SALES`, and `FACT_INTERACTIONS`.
If already created via `00_Setup.sql`, we will refresh them for idempotency.


In [None]:
cur.execute("USE SCHEMA CURATED;")

sqls = [
    "CREATE OR REPLACE TABLE DIM_CUSTOMER AS SELECT c.CUSTOMER_ID, c.CUSTOMER_EXTERNAL_KEY, c.FIRST_NAME, c.LAST_NAME, c.EMAIL, c.GENDER, c.BIRTH_DATE, c.CITY, c.STATE, c.COUNTRY, c.SEGMENT, c.CREATED_AT, c.UPDATED_AT FROM RAW.CUSTOMERS c;",
    "CREATE OR REPLACE TABLE DIM_PRODUCT AS SELECT p.PRODUCT_ID, p.SKU, p.PRODUCT_NAME, p.CATEGORY, p.SUBCATEGORY, p.BRAND, p.UNIT_PRICE, p.COST_PRICE, p.ACTIVE_FLAG FROM RAW.PRODUCTS p;",
    "CREATE OR REPLACE TABLE DIM_DATE AS WITH seq AS (SELECT DATEADD(day, ROW_NUMBER() OVER (ORDER BY 1) - 1, '2024-01-01') AS d FROM TABLE(GENERATOR(ROWCOUNT => 400))) SELECT d AS DATE_KEY, YEAR(d) AS YEAR, MONTH(d) AS MONTH, DAY(d) AS DAY, TO_CHAR(d,'DY') AS DOW_SHORT, TO_CHAR(d,'Mon') AS MONTH_SHORT FROM seq;",
    "CREATE OR REPLACE TABLE FACT_SALES AS SELECT oi.ORDER_ITEM_ID, o.ORDER_ID, o.ORDER_DATE::DATE AS ORDER_DATE, o.CUSTOMER_ID, oi.PRODUCT_ID, oi.QUANTITY, oi.UNIT_PRICE_AT_ORDER, oi.DISCOUNT_AMOUNT, (oi.QUANTITY * oi.UNIT_PRICE_AT_ORDER - oi.DISCOUNT_AMOUNT) AS GROSS_REVENUE, (oi.QUANTITY * p.COST_PRICE) AS COST_OF_GOODS, ((oi.QUANTITY * oi.UNIT_PRICE_AT_ORDER - oi.DISCOUNT_AMOUNT) - (oi.QUANTITY * p.COST_PRICE)) AS PROFIT, o.CHANNEL, o.ORDER_STATUS FROM RAW.ORDER_ITEMS oi JOIN RAW.ORDERS o ON o.ORDER_ID = oi.ORDER_ID JOIN RAW.PRODUCTS p ON p.PRODUCT_ID = oi.PRODUCT_ID WHERE o.ORDER_STATUS IN ('Paid','Shipped','Delivered');",
    "CREATE OR REPLACE TABLE FACT_INTERACTIONS AS SELECT i.INTERACTION_ID, i.CUSTOMER_ID, i.INTERACTION_TS::DATE AS INTERACTION_DATE, i.CHANNEL, i.EVENT_TYPE, i.EVENT_PROPERTIES, i.CAMPAIGN_ID FROM RAW.CUSTOMER_INTERACTIONS i;"
]

for s in sqls:
    cur.execute(s)

print('CURATED models built.')


## Analytical Views and Metrics (MART)
We will build or refresh views for:
- Revenue daily, by product, by segment
- Customer LTV
- Campaign CAC
- Retention, churn
- Satisfaction
- Order summary


In [None]:
cur.execute("USE SCHEMA MART;")

mart_sqls = [
    "CREATE OR REPLACE VIEW V_REVENUE_DAILY AS SELECT f.ORDER_DATE AS DATE, SUM(f.GROSS_REVENUE) AS TOTAL_REVENUE, SUM(f.PROFIT) AS TOTAL_PROFIT FROM CURATED.FACT_SALES f GROUP BY 1 ORDER BY 1;",
    "CREATE OR REPLACE VIEW V_REVENUE_BY_PRODUCT AS SELECT p.CATEGORY, p.SUBCATEGORY, p.PRODUCT_NAME, SUM(f.GROSS_REVENUE) AS REVENUE, SUM(f.PROFIT) AS PROFIT, SUM(f.QUANTITY) AS UNITS FROM CURATED.FACT_SALES f JOIN CURATED.DIM_PRODUCT p USING (PRODUCT_ID) GROUP BY 1,2,3 ORDER BY REVENUE DESC;",
    "CREATE OR REPLACE VIEW V_REVENUE_BY_SEGMENT AS SELECT c.SEGMENT, SUM(f.GROSS_REVENUE) AS REVENUE, SUM(f.PROFIT) AS PROFIT FROM CURATED.FACT_SALES f JOIN CURATED.DIM_CUSTOMER c USING (CUSTOMER_ID) GROUP BY 1 ORDER BY REVENUE DESC;",
    "CREATE OR REPLACE VIEW V_CUSTOMER_LTV AS SELECT c.CUSTOMER_ID, c.FIRST_NAME, c.LAST_NAME, c.SEGMENT, SUM(f.PROFIT) AS REALIZED_LTV FROM CURATED.FACT_SALES f JOIN CURATED.DIM_CUSTOMER c USING (CUSTOMER_ID) GROUP BY 1,2,3,4 ORDER BY REALIZED_LTV DESC;",
    "CREATE OR REPLACE VIEW V_CAMPAIGN_CAC AS SELECT mc.CAMPAIGN_ID, mc.CAMPAIGN_NAME, mc.CHANNEL, mc.BUDGET, NULLIF(COUNT(DISTINCT fi.CUSTOMER_ID),0) AS DISTINCT_CUSTOMERS, CASE WHEN COUNT(DISTINCT fi.CUSTOMER_ID) = 0 THEN NULL ELSE mc.BUDGET / COUNT(DISTINCT fi.CUSTOMER_ID) END AS CAC FROM RAW.MARKETING_CAMPAIGNS mc LEFT JOIN CURATED.FACT_INTERACTIONS fi USING (CAMPAIGN_ID) GROUP BY 1,2,3,4;",
    "CREATE OR REPLACE VIEW V_RETENTION_RATE AS WITH orders_per_customer AS ( SELECT CUSTOMER_ID, COUNT(DISTINCT ORDER_ID) AS orders FROM CURATED.FACT_SALES GROUP BY 1 ) SELECT (SUM(CASE WHEN orders >= 2 THEN 1 ELSE 0 END) / NULLIF(COUNT(*),0)) AS RETENTION_RATE FROM orders_per_customer;",
    "CREATE OR REPLACE VIEW V_CHURN_RATE AS SELECT 1 - (SELECT RETENTION_RATE FROM V_RETENTION_RATE) AS CHURN_RATE;",
    "CREATE OR REPLACE VIEW V_CUSTOMER_SATISFACTION AS SELECT AVG(TRY_TO_NUMBER(i.EVENT_PROPERTIES:score)) AS AVG_RATING FROM CURATED.FACT_INTERACTIONS i WHERE i.EVENT_TYPE = 'Rating';",
    "CREATE OR REPLACE VIEW V_ORDER_SUMMARY AS SELECT ORDER_ID, ORDER_DATE, CHANNEL, ORDER_STATUS, SUM(QUANTITY) AS TOTAL_ITEMS, SUM(GROSS_REVENUE) AS ORDER_REVENUE, SUM(PROFIT) AS ORDER_PROFIT FROM CURATED.FACT_SALES GROUP BY 1,2,3,4;"
]

for s in mart_sqls:
    cur.execute(s)

print('MART views created/refreshed.')


## Compute Business Metrics
Query the views to produce KPI tables used by the dashboard.


In [None]:
rev_daily = pd.read_sql("SELECT * FROM MART.V_REVENUE_DAILY ORDER BY DATE", conn)
rev_by_prod = pd.read_sql("SELECT * FROM MART.V_REVENUE_BY_PRODUCT ORDER BY REVENUE DESC", conn)
rev_by_seg = pd.read_sql("SELECT * FROM MART.V_REVENUE_BY_SEGMENT ORDER BY REVENUE DESC", conn)
ltv = pd.read_sql("SELECT * FROM MART.V_CUSTOMER_LTV ORDER BY REALIZED_LTV DESC", conn)
retention = pd.read_sql("SELECT * FROM MART.V_RETENTION_RATE", conn)
churn = pd.read_sql("SELECT * FROM MART.V_CHURN_RATE", conn)
satisfaction = pd.read_sql("SELECT * FROM MART.V_CUSTOMER_SATISFACTION", conn)
order_summary = pd.read_sql("SELECT * FROM MART.V_ORDER_SUMMARY ORDER BY ORDER_DATE DESC", conn)

rev_daily.head(), rev_by_prod.head(), rev_by_seg.head(), ltv.head(), retention, churn, satisfaction.head(), order_summary.head()


## Performance Considerations
- Use clustering keys on large fact tables (e.g., `ORDERS(ORDER_DATE, CUSTOMER_ID)`, `ORDER_ITEMS(ORDER_ID, PRODUCT_ID)`).
- Consider materialized views for heavy aggregations.
- Use appropriate warehouse size; enable auto-suspend and auto-resume.
- Leverage `COPY INTO` and external stages for bulk loading to RAW.
- Partition work into RAW -> CURATED -> MART to isolate concerns.
