In [3]:
# etl_retail.py
"""
ETL Pipeline for African Retail Data Warehouse
Performs:
- Extraction: Load synthetic CSV
- Transformation: Clean, filter, derive dimensions
- Loading: Save to SQLite DB with star schema
"""

import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
import os

# Set seed for reproducibility
np.random.seed(42)

print("=== ETL Pipeline Started ===\n")

def run_etl():
    # Assume current date is August 12, 2025 (as per exam spec)
    current_date = pd.Timestamp('2025-08-12')
    one_year_ago = current_date - pd.DateOffset(years=1)

    # --- 1. EXTRACT ---
    print("1. EXTRACT: Reading synthetic retail data...")
    if not os.path.exists('synthetic_retail_africa.csv'):
        raise FileNotFoundError("synthetic_retail_africa.csv not found. Run generate_retail_data.py first.")

    df = pd.read_csv('synthetic_retail_africa.csv', parse_dates=['InvoiceDate'])
    print(f"  → Rows after extraction: {len(df)}")

    # --- 2. TRANSFORM ---
    print("\n2. TRANSFORM: Cleaning and preparing data...")

    # Handle missing values
    initial_count = len(df)
    df.dropna(inplace=True)
    print(f"  → Dropped {initial_count - len(df)} rows due to missing values")

    # Remove invalid entries
    valid_rows = (df['Quantity'] > 0) & (df['UnitPrice'] > 0)
    df = df[valid_rows].copy()
    print(f"  → Removed {(~valid_rows).sum()} invalid rows (Quantity <= 0 or UnitPrice <= 0)")

    # Add TotalSales
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']

    # Filter for last year
    recent_mask = df['InvoiceDate'] >= one_year_ago
    recent_df = df[recent_mask].copy()
    print(f"  → Filtered to last year ({one_year_ago.date()} to {current_date.date()}): {len(recent_df)} rows")

    # Ensure 'Date' column as date type for merging
    recent_df['Date'] = recent_df['InvoiceDate'].dt.date

    # --- CREATE DIMENSIONS AND FACT ---

    # TimeDim: One row per date
    print("\n  → Building TimeDim...")
    time_dim = recent_df[['InvoiceDate']].drop_duplicates().copy()
    time_dim['Date'] = time_dim['InvoiceDate'].dt.date  # Python date
    time_dim['TimeKey'] = time_dim['InvoiceDate'].dt.strftime('%Y%m%d').astype(int)
    time_dim['Day'] = time_dim['InvoiceDate'].dt.day
    time_dim['Month'] = time_dim['InvoiceDate'].dt.month
    time_dim['Year'] = time_dim['InvoiceDate'].dt.year
    time_dim['Quarter'] = time_dim['InvoiceDate'].dt.to_period('Q').astype(str)
    # Keep only necessary columns
    time_dim = time_dim[['TimeKey', 'Date', 'Day', 'Month', 'Quarter', 'Year']].drop_duplicates().sort_values('TimeKey').reset_index(drop=True)
    print(f"  → TimeDim created with {len(time_dim)} unique dates")

    # CustomerDim: Map CustomerID to details
    print("  → Building CustomerDim...")
    customer_dim = recent_df[['CustomerID', 'Country']].drop_duplicates().reset_index(drop=True)
    # Assign names (you can use Faker here if preferred)
    customer_dim['Name'] = [f"Customer_{i}" for i in range(len(customer_dim))]
    # Map African countries to regions
    region_map = {
        'Nigeria': 'West Africa', 'Ghana': 'West Africa', 'Rwanda': 'East Africa',
        'Kenya': 'East Africa', 'Uganda': 'East Africa', 'Tanzania': 'East Africa',
        'Ethiopia': 'East Africa', 'South Africa': 'Southern Africa',
        'Morocco': 'North Africa', 'Egypt': 'North Africa'
    }
    customer_dim['Region'] = customer_dim['Country'].map(region_map)
    customer_dim.reset_index(inplace=True)
    customer_dim.rename(columns={'index': 'CustomerKey'}, inplace=True)
    print(f"  → CustomerDim created with {len(customer_dim)} customers")

    # ProductDim: Assign categories
    print("  → Building ProductDim...")
    product_dim = recent_df[['StockCode', 'Description']].drop_duplicates().reset_index(drop=True)
    categories = ['Electronics', 'Clothing', 'Home', 'Books', 'Toys']
    # Deterministic category assignment
    np.random.shuffle(categories)  # Shuffle once
    category_map = {}
    for i, code in enumerate(product_dim['StockCode'].unique()):
        category_map[code] = categories[i % len(categories)]
    product_dim['Category'] = product_dim['StockCode'].map(category_map)
    product_dim.reset_index(inplace=True)
    product_dim.rename(columns={'index': 'ProductKey'}, inplace=True)
    print(f"  → ProductDim created with {len(product_dim)} products")

    # SalesFact: Join using 'Date' (not InvoiceDate)
    print("  → Building SalesFact...")
    fact = recent_df.merge(
        customer_dim[['CustomerID', 'CustomerKey']], on='CustomerID'
    ).merge(
        product_dim[['StockCode', 'ProductKey']], on='StockCode'
    ).merge(
        time_dim[['Date', 'TimeKey']], on='Date'  # ✅ Correct: Merge on 'Date'
    )

    sales_fact = fact[['CustomerKey', 'ProductKey', 'TimeKey', 'Quantity', 'TotalSales']].copy()
    print(f"  → SalesFact created with {len(sales_fact)} fact records")

    # --- 3. LOAD ---
    print("\n3. LOAD: Writing to SQLite database...")

    db_path = 'retail_dw.db'
    conn = sqlite3.connect(db_path)

    # Drop existing tables
    cursor = conn.cursor()
    cursor.executescript("""
        DROP TABLE IF EXISTS SalesFact;
        DROP TABLE IF EXISTS CustomerDim;
        DROP TABLE IF EXISTS ProductDim;
        DROP TABLE IF EXISTS TimeDim;
    """)

    # Write tables
    time_dim.to_sql('TimeDim', conn, if_exists='replace', index=False)
    customer_dim.to_sql('CustomerDim', conn, if_exists='replace', index=False)
    product_dim.to_sql('ProductDim', conn, if_exists='replace', index=False)
    sales_fact.to_sql('SalesFact', conn, if_exists='replace', index=False)

    conn.close()
    print(f"✅ ETL completed successfully!")
    print(f"📁 Database saved as '{db_path}'")
    print(f"📊 Fact Table Rows: {len(sales_fact)}")
    print(f"🌍 Customers: {len(customer_dim)}, Products: {len(product_dim)}, Dates: {len(time_dim)}")

# Run the ETL
if __name__ == "__main__":
    run_etl()

=== ETL Pipeline Started ===

1. EXTRACT: Reading synthetic retail data...
  → Rows after extraction: 1000

2. TRANSFORM: Cleaning and preparing data...
  → Dropped 0 rows due to missing values
  → Removed 0 invalid rows (Quantity <= 0 or UnitPrice <= 0)
  → Filtered to last year (2024-08-12 to 2025-08-12): 376 rows

  → Building TimeDim...
  → TimeDim created with 233 unique dates
  → Building CustomerDim...
  → CustomerDim created with 298 customers
  → Building ProductDim...
  → ProductDim created with 376 products
  → Building SalesFact...
  → SalesFact created with 1857 fact records

3. LOAD: Writing to SQLite database...
✅ ETL completed successfully!
📁 Database saved as 'retail_dw.db'
📊 Fact Table Rows: 1857
🌍 Customers: 298, Products: 376, Dates: 233
