In [3]:
# etl_demo.ipynb
# E-commerce ETL Pipeline Demo
# This notebook tests the ETL pipeline for loading data into a Supabase PostgreSQL database
# and prepares for automation with Apache Airflow.

import sys
import os
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

# Add path to Airflow DAGs folder to access etl_utils
dags_dir = '/Users/olgacarrasco/airflow/dags'
sys.path.append(dags_dir)

# Check if etl_utils.py exists in the DAGs folder
if not os.path.exists(os.path.join(dags_dir, 'etl_utils.py')):
    raise FileNotFoundError(
        f"etl_utils.py not found in {dags_dir}. "
        f"Please ensure etl_utils.py is in the Airflow DAGs folder ({dags_dir})."
    )

# Import run_etl from etl_utils.py
from etl_utils import run_etl

# Section 1: Check and Preprocess CSVs
def check_csv_paths():
    # CSV paths in the Airflow DAGs folder
    csv_paths = {
        'customers': os.path.join(dags_dir, 'customers.csv'),
        'products': os.path.join(dags_dir, 'products.csv'),
        'orders': os.path.join(dags_dir, 'orders.csv')
    }
    
    # Check if files exist
    missing_files = []
    for key, path in csv_paths.items():
        if not os.path.exists(path):
            missing_files.append(path)
    
    if missing_files:
        print("Error: The following CSV files were not found in the Airflow DAGs folder:")
        for path in missing_files:
            print(f" - {path}")
        print("\nPossible solutions:")
        print(f"1. Ensure the CSV files (customers.csv, products.csv, orders.csv) are in {dags_dir}.")
        print("2. Check if the files have different names (e.g., 'Customers.csv' instead of 'customers.csv').")
        print("3. Move the CSV files to the Airflow DAGs folder:")
        print(f"   mv /path/to/customers.csv {dags_dir}/customers.csv")
        raise FileNotFoundError(f"Missing CSV files: {', '.join(missing_files)}")
    
    return csv_paths

def inspect_and_preprocess_csvs():
    try:
        # Check CSV paths
        csv_paths = check_csv_paths()
        
        # Load CSVs
        customers_df = pd.read_csv(csv_paths['customers'])
        products_df = pd.read_csv(csv_paths['products'])
        orders_df = pd.read_csv(csv_paths['orders'])
        
        # Inspect column names
        print("customers.csv columns:", customers_df.columns.tolist())
        print("orders.csv columns:", orders_df.columns.tolist())
        print("products.csv columns:", products_df.columns.tolist())
        
        # Rename 'customer_id' to 'id' in customers.csv to match etl_utils.py's merge
        if 'customer_id' in customers_df.columns and 'id' not in customers_df.columns:
            customers_df = customers_df.rename(columns={'customer_id': 'id'})
            customers_df.to_csv(csv_paths['customers'], index=False)
            print(f"Renamed 'customer_id' to 'id' in {csv_paths['customers']} to match etl_utils.py")
        
        # Check and rename columns in orders.csv if necessary
        # 'id' is order_id (primary key), so keep it as 'id'
        if 'cust_id' in orders_df.columns and 'customer_id' not in orders_df.columns:
            orders_df = orders_df.rename(columns={'cust_id': 'customer_id'})
            orders_df.to_csv(csv_paths['orders'], index=False)
            print(f"Renamed 'cust_id' to 'customer_id' in {csv_paths['orders']}")
        if 'prod_id' in orders_df.columns and 'product_id' not in orders_df.columns:
            orders_df = orders_df.rename(columns={'prod_id': 'product_id'})
            orders_df.to_csv(csv_paths['orders'], index=False)
            print(f"Renamed 'prod_id' to 'product_id' in {csv_paths['orders']}")
        
        # Verify expected columns
        expected_customers_cols = ['id', 'name', 'email', 'joined_at']  # Expect 'id' for etl_utils.py
        expected_orders_cols = ['id', 'customer_id', 'product_id', 'quantity', 'order_date']
        expected_products_cols = ['id', 'name', 'category', 'price']
        
        if not all(col in customers_df.columns for col in expected_customers_cols):
            raise ValueError(f"{csv_paths['customers']} missing expected columns: {expected_customers_cols}")
        if not all(col in orders_df.columns for col in expected_orders_cols):
            raise ValueError(f"{csv_paths['orders']} missing expected columns: {expected_orders_cols}")
        if not all(col in products_df.columns for col in expected_products_cols):
            raise ValueError(f"{csv_paths['products']} missing expected columns: {expected_products_cols}")
        
        print("CSVs preprocessed successfully.")
    except FileNotFoundError as e:
        print(f"Error: CSV file not found - {str(e)}")
        raise
    except Exception as e:
        print(f"Error during CSV preprocessing: {str(e)}")
        raise

# Section 2: Run ETL Pipeline
def execute_etl():
    try:
        # Run the ETL pipeline from etl_utils.py
        run_etl()
        print("✅ ETL pipeline executed successfully.")
    except Exception as e:
        print(f"ETL pipeline failed: {str(e)}")
        raise

# Section 3: Validate ETL Output
def validate_data():
    try:
        # Connect to Supabase PostgreSQL
        # Replace with your own Supabase credentials
        engine = create_engine(
            "postgresql+psycopg2://please_enter_your_supabase_user:please_enter_your_supabase_password@please_enter_your_supabase_host:please_enter_your_supabase_port/please_enter_your_supabase_dbname"
        )
        
        # Check customers table
        query_customers = "SELECT id, name, email, joined_at FROM customers LIMIT 5;"
        df_customers = pd.read_sql(query_customers, engine)
        print("\nCustomers Table (Top 5):")
        print(df_customers)
        
        # Check top 5 products by quantity sold
        query_products = """
        SELECT product_id, SUM(quantity) AS total_sold
        FROM orders
        GROUP BY product_id
        ORDER BY total_sold DESC
        LIMIT 5;
        """
        df_products = pd.read_sql(query_products, engine)
        print("\nTop 5 Products by Quantity Sold:")
        print(df_products)
        
        # Check orders table (explicitly select expected columns)
        query_orders = """
        SELECT id, customer_id, product_id, quantity, order_date
        FROM orders
        LIMIT 5;
        """
        df_orders = pd.read_sql(query_orders, engine)
        print("\nOrders Table (Top 5):")
        print(df_orders)
    except Exception as e:
        print(f"Validation failed: {str(e)}")
        raise

# Section 4: Define ETL Functions (for reference/testing, not used in production)
# These mirror etl_utils.py but are included for notebook testing
def transform_customers(df):
    import re
    df['email'] = df['email'].str.strip().str.lower()
    email_pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    df = df[df['email'].apply(lambda x: bool(re.match(email_pattern, x)))]
    df['name'] = df['name'].str.strip().str.title()
    df['joined_at'] = pd.to_datetime(df['joined_at'], errors='coerce')
    df = df.dropna(subset=['joined_at'])
    df = df.drop_duplicates(subset=['id'])  # Use 'id' to match etl_utils.py
    return df

def transform_products(df):
    df['name'] = df['name'].str.strip().str.title()
    df['category'] = df['category'].str.strip().str.title()
    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    df = df[df['price'] >= 0].dropna(subset=['price'])
    df = df.drop_duplicates(subset=['id'])
    return df

def transform_orders(df, customers_df, products_df):
    df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce').fillna(0).astype(int)
    df = df[df['quantity'] > 0]
    df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
    df = df.dropna(subset=['order_date'])
    valid_customers = set(customers_df['id'])  # Use 'id' to match customers.csv
    valid_products = set(products_df['id'])
    df = df[df['customer_id'].isin(valid_customers) & df['product_id'].isin(valid_products)]
    df = df.drop_duplicates(subset=['id'])  # 'id' is order_id
    return df

def insert_customers(df, conn):
    with conn.cursor() as cur:
        for _, row in df.iterrows():
            cur.execute("""
                INSERT INTO customers (id, name, email, joined_at)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (id) DO NOTHING;
            """, (row['id'], row['name'], row['email'], row['joined_at']))
    conn.commit()

def insert_products(df, conn):
    with conn.cursor() as cur:
        for _, row in df.iterrows():
            cur.execute("""
                INSERT INTO products (id, name, category, price)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (id) DO NOTHING;
            """, (row['id'], row['name'], row['category'], row['price']))
    conn.commit()

def insert_orders(df, conn):
    with conn.cursor() as cur:
        for _, row in df.iterrows():
            cur.execute("""
                INSERT INTO orders (id, customer_id, product_id, quantity, order_date)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (id) DO NOTHING;
            """, (row['id'], row['customer_id'], row['product_id'], row['quantity'], row['order_date']))
    conn.commit()

# Section 5: Manual ETL for Testing (Optional)
def run_manual_etl():
    try:
        # Connect to database
        # Replace with your own Supabase credentials
        conn = psycopg2.connect(
            host="please_enter_your_supabase_host",
            port="please_enter_your_supabase_port",
            user="please_enter_your_supabase_user",
            password="please_enter_your_supabase_password",
            dbname="please_enter_your_supabase_dbname"
        )
        
        # Load and preprocess CSVs
        csv_paths = check_csv_paths()
        customers_df = pd.read_csv(csv_paths['customers'])
        products_df = pd.read_csv(csv_paths['products'])
        orders_df = pd.read_csv(csv_paths['orders'])
        
        # Apply transformations
        customers_df = transform_customers(customers_df)
        products_df = transform_products(products_df)
        orders_df = transform_orders(orders_df, customers_df, products_df)
        
        # Insert data
        insert_customers(customers_df, conn)
        insert_products(products_df, conn)
        insert_orders(orders_df, conn)
        
        # Close connection
        conn.close()
        print("✅ Manual ETL ran successfully.")
    except Exception as e:
        print(f"Manual ETL failed: {str(e)}")
        raise

# Section 6: Airflow Setup Instructions
def print_airflow_setup():
    print("\nRun the following commands in your terminal to set up Airflow:")
    print("1. Install Airflow:")
    print("   pip install apache-airflow")
    print("2. Initialize Airflow database:")
    print("   airflow db migrate")
    print("3. Create an admin user:")
    print("   airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com")
    print("4. Start the Airflow API server:")
    print("   airflow api-server --port 8080")
    print("5. Start the Airflow scheduler (in a separate terminal):")
    print("   airflow scheduler")
    print("Alternatively, run Airflow in standalone mode for testing:")
    print("   airflow standalone")
    print("\nAdd etl_dag.py to your Airflow DAGs folder and trigger the 'csv_to_postgres_etl' DAG.")

# Main Execution
if __name__ == "__main__":
    # Step 1: Inspect and preprocess CSVs
    inspect_and_preprocess_csvs()
    
    # Step 2: Run ETL pipeline (uses etl_utils.py)
    execute_etl()
    
    # Step 3: Validate data
    validate_data()
    
    # Step 4: Print Airflow setup instructions
    print_airflow_setup()
    
    # Optional: Run manual ETL for testing
    # run_manual_etl()

customers.csv columns: ['id', 'name', 'email', 'joined_at']
orders.csv columns: ['id', 'customer_id', 'product_id', 'quantity', 'order_date']
products.csv columns: ['id', 'name', 'category', 'price']
CSVs preprocessed successfully.
Loading CSV files from: /Users/olgacarrasco/airflow/dags
Customers columns: ['id', 'name', 'email', 'joined_at']
Products columns: ['id', 'name', 'category', 'price']
Orders columns: ['id', 'customer_id', 'product_id', 'quantity', 'order_date']
Connecting to DB at aws-0-eu-north-1.pooler.supabase.com:6543/postgres with user postgres.qmbbrplssfblgvghpczc
✅ ETL pipeline executed successfully.
Validation failed: invalid literal for int() with base 10: 'please_enter_your_supabase_port'


ValueError: invalid literal for int() with base 10: 'please_enter_your_supabase_port'