# 02 - ETL Pipeline

This notebook implements and tests the **Extract, Transform, Load (ETL)** pipeline for the social ads dataset.

## What we'll do:
1. **Extract**: Load data from the CSV file
2. **Transform**: Clean, validate, and enrich the data
3. **Load**: Store processed data in SQLite database
4. **Validate**: Test the ETL pipeline and verify results

**Prerequisites**: Run `01_data_exploration.ipynb` first to understand the data structure.

**Next**: After ETL completion, proceed to `03_analysis.ipynb` for insights generation.

In [15]:
# Setup and Imports
import sys
import os
from pathlib import Path
import pandas as pd
import numpy as np
from datetime import datetime
import sqlite3
from sqlalchemy import create_engine, text

# Add src to Python path to import our modules
sys.path.append('../src')

try:
	from config import RAW_CSV, PROCESSED_DB, SQLITE_URL
	from models import metadata, social_ads
	from db import engine, SessionLocal, create_tables
	
	print("✅ Libraries and modules imported successfully!")
	print(f"📁 Raw CSV path: {RAW_CSV}")
	print(f"🗄️ Database path: {PROCESSED_DB}")
	print(f"🔗 SQLite URL: {SQLITE_URL}")
	
except ImportError as e:
	print(f"⚠️ Import error: {e}")
	print("🔧 Using alternative configuration...")
	
	# Fallback configuration if imports fail
	RAW_CSV = '../data/raw/social_ads.csv'
	PROCESSED_DB = '../data/processed/social_ads.db'
	SQLITE_URL = f'sqlite:///{os.path.abspath(PROCESSED_DB)}'
	
	# Create engine directly
	engine = create_engine(SQLITE_URL)
	
	# Define create_tables function for fallback
	def create_tables():
		"""Create database tables if they don't exist"""
		from sqlalchemy import Table, Column, Integer, String, Boolean, Float, DateTime, MetaData
		from datetime import datetime
		
		metadata = MetaData()
		
		social_ads = Table('social_ads', metadata,
			Column('id', Integer, primary_key=True),
			Column('age', Integer, nullable=False),
			Column('estimated_salary', Float, nullable=False),
			Column('purchased', Boolean, nullable=False),
			Column('age_group', String(50)),
			Column('salary_bracket', String(50)),
			Column('created_at', DateTime, default=datetime.utcnow),
			Column('updated_at', DateTime, default=datetime.utcnow)
		)
		
		metadata.create_all(engine)
		print("📊 Database tables created/verified")
	
	print("✅ Fallback configuration loaded!")
	print(f"📁 Raw CSV path: {RAW_CSV}")
	print(f"🗄️ Database path: {PROCESSED_DB}")
	print(f"🔗 SQLite URL: {SQLITE_URL}")

⚠️ Import error: attempted relative import with no known parent package
🔧 Using alternative configuration...
✅ Fallback configuration loaded!
📁 Raw CSV path: ../data/raw/social_ads.csv
🗄️ Database path: ../data/processed/social_ads.db
🔗 SQLite URL: sqlite:////Users/tharushavihanga/Developer/social-ads-etl/data/processed/social_ads.db


## 📥 Extract Phase

Load the raw data from CSV and perform initial validation.

In [16]:
def extract_data():
    """Extract data from CSV file"""
    print("🚀 Starting data extraction...")
    
    try:
        # Load data from CSV
        df_raw = pd.read_csv(RAW_CSV)
        print(f"✅ Successfully loaded {len(df_raw)} records")
        print(f"📊 Columns: {list(df_raw.columns)}")
        print(f"📈 Shape: {df_raw.shape}")
        
        return df_raw
    
    except Exception as e:
        print(f"❌ Error during extraction: {e}")
        raise

# Execute extraction
df_raw = extract_data()
print(f"\n🔍 First 5 rows:")
df_raw.head()

🚀 Starting data extraction...
✅ Successfully loaded 400 records
📊 Columns: ['Age', 'EstimatedSalary', 'Purchased']
📈 Shape: (400, 3)

🔍 First 5 rows:


Unnamed: 0,Age,EstimatedSalary,Purchased
0,19,19000,0
1,35,20000,0
2,26,43000,0
3,27,57000,0
4,19,76000,0


## 🔄 Transform Phase

Clean, validate, and enrich the data with derived features.

In [17]:
def validate_and_clean_data(df):
    """Validate and clean the data"""
    print("🧹 Starting data cleaning and validation...")
    
    initial_count = len(df)
    df_clean = df.copy()
    
    # Clean column names (lowercase, snake_case)
    df_clean.columns = df_clean.columns.str.lower()
    df_clean = df_clean.rename(columns={'estimatedsalary': 'estimated_salary'})
    print(f"✅ Column names cleaned: {list(df_clean.columns)}")
    
    # Remove duplicates
    df_clean = df_clean.drop_duplicates()
    print(f"🗑️ Removed {initial_count - len(df_clean)} duplicate records")
    
    # Remove records with missing values
    df_clean = df_clean.dropna()
    print(f"🚮 Removed records with missing values")
    
    # Validate age range (18-100)
    age_before = len(df_clean)
    df_clean = df_clean[(df_clean['age'] >= 18) & (df_clean['age'] <= 100)]
    print(f"👤 Removed {age_before - len(df_clean)} records with invalid age")
    
    # Validate salary range (positive values)
    salary_before = len(df_clean)
    df_clean = df_clean[df_clean['estimated_salary'] > 0]
    print(f"💰 Removed {salary_before - len(df_clean)} records with invalid salary")
    
    # Validate purchased values (0 or 1)
    purchase_before = len(df_clean)
    df_clean = df_clean[df_clean['purchased'].isin([0, 1])]
    print(f"🛒 Removed {purchase_before - len(df_clean)} records with invalid purchase values")
    
    final_count = len(df_clean)
    print(f"\n📊 Data validation summary:")
    print(f"   Initial records: {initial_count}")
    print(f"   Final records: {final_count}")
    print(f"   Data quality: {final_count/initial_count*100:.1f}%")
    
    return df_clean

# Execute data cleaning
df_clean = validate_and_clean_data(df_raw)
df_clean.head()

🧹 Starting data cleaning and validation...
✅ Column names cleaned: ['age', 'estimated_salary', 'purchased']
🗑️ Removed 33 duplicate records
🚮 Removed records with missing values
👤 Removed 0 records with invalid age
💰 Removed 0 records with invalid salary
🛒 Removed 0 records with invalid purchase values

📊 Data validation summary:
   Initial records: 400
   Final records: 367
   Data quality: 91.8%


Unnamed: 0,age,estimated_salary,purchased
0,19,19000,0
1,35,20000,0
2,26,43000,0
3,27,57000,0
4,19,76000,0


In [18]:
def add_derived_features(df):
    """Add derived features for better analysis"""
    print("🛠️ Adding derived features...")
    
    df_enriched = df.copy()
    
    # Age groups
    def categorize_age(age):
        if age < 25:
            return "Young (18-24)"
        elif age < 35:
            return "Adult (25-34)"
        elif age < 45:
            return "Middle Age (35-44)"
        else:
            return "Senior (45+)"
    
    df_enriched['age_group'] = df_enriched['age'].apply(categorize_age)
    print("✅ Added age_group feature")
    
    # Salary brackets
    def categorize_salary(salary):
        if salary < 30000:
            return "Low (<30K)"
        elif salary < 60000:
            return "Medium (30K-60K)"
        elif salary < 100000:
            return "High (60K-100K)"
        else:
            return "Very High (100K+)"
    
    df_enriched['salary_bracket'] = df_enriched['estimated_salary'].apply(categorize_salary)
    print("✅ Added salary_bracket feature")
    
    # Convert purchased to boolean
    df_enriched['purchased'] = df_enriched['purchased'].astype(bool)
    print("✅ Converted purchased to boolean")
    
    print(f"\n📊 Feature engineering summary:")
    print(f"   Age groups: {df_enriched['age_group'].value_counts().to_dict()}")
    print(f"   Salary brackets: {df_enriched['salary_bracket'].value_counts().to_dict()}")
    
    return df_enriched

# Execute feature engineering
df_transformed = add_derived_features(df_clean)
print(f"\n🔍 Final transformed data shape: {df_transformed.shape}")
print(f"📝 Final columns: {list(df_transformed.columns)}")
df_transformed.head()

🛠️ Adding derived features...
✅ Added age_group feature
✅ Added salary_bracket feature
✅ Converted purchased to boolean

📊 Feature engineering summary:
   Age groups: {'Middle Age (35-44)': 125, 'Senior (45+)': 106, 'Adult (25-34)': 95, 'Young (18-24)': 41}
   Salary brackets: {'High (60K-100K)': 145, 'Medium (30K-60K)': 100, 'Very High (100K+)': 73, 'Low (<30K)': 49}

🔍 Final transformed data shape: (367, 5)
📝 Final columns: ['age', 'estimated_salary', 'purchased', 'age_group', 'salary_bracket']


Unnamed: 0,age,estimated_salary,purchased,age_group,salary_bracket
0,19,19000,False,Young (18-24),Low (<30K)
1,35,20000,False,Middle Age (35-44),Low (<30K)
2,26,43000,False,Adult (25-34),Medium (30K-60K)
3,27,57000,False,Adult (25-34),Medium (30K-60K)
4,19,76000,False,Young (18-24),High (60K-100K)


## 💾 Load Phase

Store the transformed data in SQLite database.

In [19]:
def load_to_database(df):
    """Load transformed data into SQLite database"""
    print("💾 Loading data to database...")
    
    try:
        # Create tables if they don't exist
        create_tables()
        print("✅ Database tables created/verified")
        
        # Clear existing data (for fresh load)
        with engine.connect() as conn:
            conn.execute(text("DELETE FROM social_ads"))
            conn.commit()
        print("🗑️ Cleared existing data")
        
        # Insert new data
        records_to_insert = df.to_dict(orient='records')
        
        # Add timestamps
        current_time = datetime.utcnow()
        for record in records_to_insert:
            record['created_at'] = current_time
            record['updated_at'] = current_time
        
        # Insert into database
        df.to_sql('social_ads', engine, if_exists='append', index=False)
        print(f"✅ Successfully loaded {len(records_to_insert)} records to database")
        
        # Verify the load
        verification_query = "SELECT COUNT(*) as count FROM social_ads"
        result = pd.read_sql_query(verification_query, engine)
        record_count = result['count'].iloc[0]
        print(f"🔍 Verification: {record_count} records in database")
        
        return True
        
    except Exception as e:
        print(f"❌ Error loading to database: {e}")
        return False

# Execute database loading
load_success = load_to_database(df_transformed)

if load_success:
    print(f"\n🎉 ETL Pipeline completed successfully!")
    print(f"📊 Records processed: {len(df_raw)} → {len(df_transformed)}")
    print(f"📈 Data quality: {len(df_transformed)/len(df_raw)*100:.1f}%")

💾 Loading data to database...
📊 Database tables created/verified
✅ Database tables created/verified
🗑️ Cleared existing data
✅ Successfully loaded 367 records to database
🔍 Verification: 367 records in database

🎉 ETL Pipeline completed successfully!
📊 Records processed: 400 → 367
📈 Data quality: 91.8%


  current_time = datetime.utcnow()


In [20]:
# Quick validation of loaded data
print("🔍 ETL VALIDATION")
print("=" * 30)

# Verify record count
total_query = "SELECT COUNT(*) as total FROM social_ads"
total_records = pd.read_sql_query(total_query, engine)['total'].iloc[0]
print(f"✅ Database records: {total_records}")

# Basic stats
stats_query = """
SELECT 
    AVG(age) as avg_age,
    AVG(estimated_salary) as avg_salary,
    SUM(CAST(purchased as INTEGER)) as purchases,
    COUNT(*) as total
FROM social_ads
"""
stats = pd.read_sql_query(stats_query, engine)
print(f"📊 Avg age: {stats['avg_age'].iloc[0]:.1f}")
print(f"💰 Avg salary: ${stats['avg_salary'].iloc[0]:,.0f}")
print(f"🎯 Purchases: {stats['purchases'].iloc[0]}/{stats['total'].iloc[0]}")

print("\n✅ ETL Pipeline completed successfully!")

🔍 ETL VALIDATION
✅ Database records: 367
📊 Avg age: 37.7
💰 Avg salary: $70,719
🎯 Purchases: 138/367

✅ ETL Pipeline completed successfully!


## ✅ ETL Complete!

**Pipeline Summary:**
- **Extract**: 400 records from CSV
- **Transform**: Data cleaning, validation, feature engineering  
- **Load**: Clean data stored in SQLite database

**Next Step:** Proceed to `03_analysis.ipynb` for insights generation.