# Task 2: ETL Process Implementation
DSA 2040 - Data Warehousing and Data Mining Practical Exam

In [1]:
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime, timedelta
import logging
import random
from faker import Faker

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("ETL Process Implementation for Retail Data Warehouse")
print("=" * 60)

ETL Process Implementation for Retail Data Warehouse


## Step 1: Data Generation (Extract Phase)
Since we're generating synthetic data, we'll create realistic retail transaction data

In [2]:
def generate_synthetic_retail_data(num_rows=1000):
    """
    Generate synthetic retail data similar to Online Retail dataset structure
    Columns: InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, customer_id, Country
    """
    fake = Faker()
    
    # Define product categories and items
    categories = {
        'Electronics': ['Smartphone', 'Laptop', 'Tablet', 'Headphones', 'Speaker', 'Charger'],
        'Clothing': ['T-Shirt', 'Jeans', 'Dress', 'Jacket', 'Shoes', 'Hat'],
        'Home': ['Chair', 'Table', 'Lamp', 'Cushion', 'Mug', 'Plate'],
        'Books': ['Novel', 'Cookbook', 'Biography', 'Manual', 'Dictionary', 'Magazine'],
        'Sports': ['Ball', 'Racket', 'Weights', 'Mat', 'Bottle', 'Towel']
    }
    
    countries = ['UK', 'France', 'Germany', 'Netherlands', 'Spain', 'Italy', 'USA', 'Canada', 'Australia']
    
    # Generate data
    data = []
    invoice_counter = 536365  # Starting invoice number
    customer_ids = list(range(12346, 12446))  # 100 unique customers
    
    # Create date range for last 2 years
    end_date = datetime(2025, 8, 12)
    start_date = end_date - timedelta(days=730)
    
    for i in range(num_rows):
        # Select category and product
        category = random.choice(list(categories.keys()))
        product = random.choice(categories[category])
        
        # Generate invoice (group some items in same invoice)
        if i % random.randint(1, 5) == 0:
            invoice_counter += 1
        
        # Generate data row
        row = {
            'InvoiceNo': f'{invoice_counter}',
            'StockCode': f'{category[:2].upper()}{random.randint(10000, 99999)}',
            'Description': f'{product} {category}',
            'Quantity': random.randint(1, 50),
            'InvoiceDate': fake.date_time_between(start_date=start_date, end_date=end_date),
            'UnitPrice': round(random.uniform(1.0, 100.0), 2),
            'customer_id': random.choice(customer_ids),
            'Country': random.choice(countries),
            'Category': category  # Adding category for easier analysis
        }
        data.append(row)
    
    # Create DataFrame
    df = pd.DataFrame(data)
    
    # Add some missing values and outliers to make it realistic
    # Missing customer_id for some rows
    missing_indices = np.random.choice(df.index, size=int(0.05 * len(df)), replace=False)
    df.loc[missing_indices, 'customer_id'] = np.nan
    
    # Add some negative quantities (returns)
    return_indices = np.random.choice(df.index, size=int(0.03 * len(df)), replace=False)
    df.loc[return_indices, 'Quantity'] = -df.loc[return_indices, 'Quantity']
    
    # Add some zero/negative prices
    bad_price_indices = np.random.choice(df.index, size=int(0.02 * len(df)), replace=False)
    df.loc[bad_price_indices, 'UnitPrice'] = 0
    
    return df

# Generate synthetic data
print("Step 1: Extracting Data...")
logger.info("Starting data generation")

# Generate the dataset
retail_data = generate_synthetic_retail_data(1000)

# Save to CSV for reference
retail_data.to_csv('synthetic_retail_data.csv', index=False)

print(f"✓ Generated {len(retail_data)} rows of synthetic retail data")
print(f"✓ Data saved to 'synthetic_retail_data.csv'")
logger.info(f"Generated {len(retail_data)} rows of data")

# Display basic info about extracted data
print("\nExtracted Data Summary:")
print(f"Shape: {retail_data.shape}")
print(f"Columns: {list(retail_data.columns)}")
print("\nFirst 5 rows:")
print(retail_data.head())

print(f"\nData types:")
print(retail_data.dtypes)

2025-08-14 13:43:23,170 - INFO - Starting data generation


Step 1: Extracting Data...


2025-08-14 13:43:23,534 - INFO - Generated 1000 rows of data


✓ Generated 1000 rows of synthetic retail data
✓ Data saved to 'synthetic_retail_data.csv'

Extracted Data Summary:
Shape: (1000, 9)
Columns: ['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'customer_id', 'Country', 'Category']

First 5 rows:
  InvoiceNo StockCode          Description  Quantity         InvoiceDate  \
0    536366   CL31454      Jacket Clothing        39 2023-09-12 14:31:01   
1    536366   EL85228  Speaker Electronics         4 2025-03-22 08:31:35   
2    536366   BO71516         Manual Books         4 2024-03-21 06:30:37   
3    536366   SP51195         Towel Sports        33 2024-08-10 02:12:58   
4    536367   SP50955       Weights Sports        48 2023-10-05 02:09:26   

   UnitPrice  customer_id  Country     Category  
0      63.90     12396.0   France     Clothing  
1      35.85     12417.0    Spain  Electronics  
2      67.56     12388.0  Germany        Books  
3      95.23     12351.0    Spain       Sports  
4      95.88     123

## Step 2: Transform Phase

In [3]:
def transform_retail_data(df):
    """
    Transform the retail data for data warehouse loading
    """
    print("\nStep 2: Transforming Data...")
    logger.info("Starting data transformation")
    
    # Create a copy for transformation
    transformed_df = df.copy()
    initial_rows = len(transformed_df)
    
    print(f"Initial rows: {initial_rows}")
    
    # Handle missing values
    print("- Handling missing values...")
    missing_before = transformed_df.isnull().sum().sum()
    
    # Drop rows with missing customer_id (for this warehouse, we need customer info)
    transformed_df = transformed_df.dropna(subset=['customer_id'])
    transformed_df['customer_id'] = transformed_df['customer_id'].astype(int)
    
    missing_after = transformed_df.isnull().sum().sum()
    print(f"  Missing values before: {missing_before}, after: {missing_after}")
    
    # Convert data types
    print("- Converting data types...")
    transformed_df['InvoiceDate'] = pd.to_datetime(transformed_df['InvoiceDate'])
    transformed_df['UnitPrice'] = pd.to_numeric(transformed_df['UnitPrice'], errors='coerce')
    transformed_df['Quantity'] = pd.to_numeric(transformed_df['Quantity'], errors='coerce')
    
    # Handle outliers - Remove negative quantities and zero/negative prices
    print("- Removing outliers...")
    rows_before_outliers = len(transformed_df)
    
    # Remove returns (negative quantities) and invalid prices
    transformed_df = transformed_df[
        (transformed_df['Quantity'] > 0) & 
        (transformed_df['UnitPrice'] > 0)
    ]
    
    rows_after_outliers = len(transformed_df)
    outliers_removed = rows_before_outliers - rows_after_outliers
    print(f"  Removed {outliers_removed} rows with outliers")
    
    # Calculate TotalSales
    print("- Calculating TotalSales...")
    transformed_df['TotalSales'] = transformed_df['Quantity'] * transformed_df['UnitPrice']
    
    # Filter for last year's data (from August 12, 2024)
    print("- Filtering for last year's data...")
    cutoff_date = datetime(2024, 8, 12)
    rows_before_filter = len(transformed_df)
    
    transformed_df = transformed_df[transformed_df['InvoiceDate'] >= cutoff_date]
    
    rows_after_filter = len(transformed_df)
    filtered_rows = rows_before_filter - rows_after_filter
    print(f"  Filtered out {filtered_rows} rows older than August 12, 2024")
    
    # Create time dimensions
    print("- Creating time dimensions...")
    transformed_df['Year'] = transformed_df['InvoiceDate'].dt.year
    transformed_df['Month'] = transformed_df['InvoiceDate'].dt.month
    transformed_df['Quarter'] = transformed_df['InvoiceDate'].dt.quarter
    transformed_df['Date'] = transformed_df['InvoiceDate'].dt.date
    
    print(f"Final transformed rows: {len(transformed_df)}")
    logger.info(f"Transformation complete. Rows: {initial_rows} -> {len(transformed_df)}")
    
    return transformed_df

# Apply transformations
transformed_data = transform_retail_data(retail_data)

# Create customer dimension data
def create_customer_dimension(df):
    """Create customer dimension table"""
    print("- Creating Customer Dimension...")
    
    customer_dim = df.groupby('customer_id').agg({
        'Country': 'first',
        'TotalSales': 'sum',
        'InvoiceNo': 'nunique',
        'Quantity': 'sum'
    }).reset_index()
    
    customer_dim.columns = ['customer_id', 'Country', 'TotalPurchases', 'TotalInvoices', 'TotalQuantity']
    customer_dim['CustomerSegment'] = pd.cut(customer_dim['TotalPurchases'], 
                                           bins=3, 
                                           labels=['Low', 'Medium', 'High'])
    
    print(f"  Created customer dimension with {len(customer_dim)} customers")
    return customer_dim

# Create time dimension data
def create_time_dimension(df):
    """Create time dimension table"""
    print("- Creating Time Dimension...")
    
    time_dim = df[['Date', 'Year', 'Month', 'Quarter']].drop_duplicates().reset_index(drop=True)
    time_dim['TimeID'] = range(1, len(time_dim) + 1)
    time_dim['MonthName'] = pd.to_datetime(time_dim['Date']).dt.month_name()
    time_dim['Weekday'] = pd.to_datetime(time_dim['Date']).dt.day_name()
    
    print(f"  Created time dimension with {len(time_dim)} time periods")
    return time_dim

# Create product dimension data
def create_product_dimension(df):
    """Create product dimension table"""
    print("- Creating Product Dimension...")
    
    product_dim = df[['StockCode', 'Description', 'Category', 'UnitPrice']].drop_duplicates().reset_index(drop=True)
    product_dim['ProductID'] = range(1, len(product_dim) + 1)
    product_dim['AvgPrice'] = product_dim.groupby('Category')['UnitPrice'].transform('mean')
    
    print(f"  Created product dimension with {len(product_dim)} products")
    return product_dim

# Create dimension tables
customer_dim = create_customer_dimension(transformed_data)
time_dim = create_time_dimension(transformed_data)
product_dim = create_product_dimension(transformed_data)

print("\nTransformation Summary:")
print(f"✓ Fact table rows: {len(transformed_data)}")
print(f"✓ Customer dimension: {len(customer_dim)} records")
print(f"✓ Time dimension: {len(time_dim)} records")
print(f"✓ Product dimension: {len(product_dim)} records")

2025-08-14 13:44:56,350 - INFO - Starting data transformation
2025-08-14 13:44:56,379 - INFO - Transformation complete. Rows: 1000 -> 467



Step 2: Transforming Data...
Initial rows: 1000
- Handling missing values...
  Missing values before: 50, after: 0
- Converting data types...
- Removing outliers...
  Removed 47 rows with outliers
- Calculating TotalSales...
- Filtering for last year's data...
  Filtered out 436 rows older than August 12, 2024
- Creating time dimensions...
Final transformed rows: 467
- Creating Customer Dimension...
  Created customer dimension with 99 customers
- Creating Time Dimension...
  Created time dimension with 261 time periods
- Creating Product Dimension...
  Created product dimension with 467 products

Transformation Summary:
✓ Fact table rows: 467
✓ Customer dimension: 99 records
✓ Time dimension: 261 records
✓ Product dimension: 467 records


## Step 3: Load Phase

In [4]:
def create_database_schema(db_path):
    """Create the data warehouse schema in SQLite"""
    print(f"\nStep 3: Loading Data to Database ({db_path})...")
    logger.info("Creating database schema")
    
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Create Customer Dimension Table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS CustomerDim (
        customer_id INTEGER PRIMARY KEY,
        Country TEXT NOT NULL,
        TotalPurchases REAL,
        TotalInvoices INTEGER,
        TotalQuantity INTEGER,
        CustomerSegment TEXT
    )
    ''')
    
    # Create Time Dimension Table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS TimeDim (
        TimeID INTEGER PRIMARY KEY,
        Date DATE NOT NULL,
        Year INTEGER,
        Month INTEGER,
        Quarter INTEGER,
        MonthName TEXT,
        Weekday TEXT
    )
    ''')
    
    # Create Product Dimension Table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS ProductDim (
        ProductID INTEGER PRIMARY KEY,
        StockCode TEXT,
        Description TEXT,
        Category TEXT,
        UnitPrice REAL,
        AvgPrice REAL
    )
    ''')
    
    # Create Sales Fact Table
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS SalesFact (
        SalesID INTEGER PRIMARY KEY AUTOINCREMENT,
        InvoiceNo TEXT,
        customer_id INTEGER,
        ProductID INTEGER,
        TimeID INTEGER,
        Quantity INTEGER,
        UnitPrice REAL,
        TotalSales REAL,
        FOREIGN KEY (customer_id) REFERENCES CustomerDim(customer_id),
        FOREIGN KEY (ProductID) REFERENCES ProductDim(ProductID),
        FOREIGN KEY (TimeID) REFERENCES TimeDim(TimeID)
    )
    ''')
    
    conn.commit()
    print("✓ Database schema created successfully")
    return conn

def load_dimension_tables(conn, customer_dim, time_dim, product_dim):
    """Load data into dimension tables"""
    print("- Loading dimension tables...")
    
    # Load Customer Dimension
    customer_dim.to_sql('CustomerDim', conn, if_exists='replace', index=False)
    customer_count = conn.execute("SELECT COUNT(*) FROM CustomerDim").fetchone()[0]
    print(f"  ✓ Loaded {customer_count} customers")
    
    # Load Time Dimension
    time_dim.to_sql('TimeDim', conn, if_exists='replace', index=False)
    time_count = conn.execute("SELECT COUNT(*) FROM TimeDim").fetchone()[0]
    print(f"  ✓ Loaded {time_count} time records")
    
    # Load Product Dimension
    product_dim.to_sql('ProductDim', conn, if_exists='replace', index=False)
    product_count = conn.execute("SELECT COUNT(*) FROM ProductDim").fetchone()[0]
    print(f"  ✓ Loaded {product_count} products")
    
    return customer_count, time_count, product_count

def create_sales_fact(transformed_data, time_dim, product_dim):
    """Create sales fact table with proper foreign keys"""
    print("- Creating sales fact table...")
    
    # Merge with time dimension to get TimeID
    fact_table = transformed_data.merge(
        time_dim[['Date', 'TimeID']], 
        on='Date', 
        how='left'
    )
    
    # Merge with product dimension to get ProductID
    fact_table = fact_table.merge(
        product_dim[['StockCode', 'ProductID']], 
        on='StockCode', 
        how='left'
    )
    
    # Select only the columns needed for fact table
    sales_fact = fact_table[[
        'InvoiceNo', 'customer_id', 'ProductID', 'TimeID',
        'Quantity', 'UnitPrice', 'TotalSales'
    ]].copy()
    
    print(f"  ✓ Created sales fact with {len(sales_fact)} records")
    return sales_fact

def load_fact_table(conn, sales_fact):
    """Load sales fact table"""
    print("- Loading sales fact table...")
    
    sales_fact.to_sql('SalesFact', conn, if_exists='replace', index=False)
    fact_count = conn.execute("SELECT COUNT(*) FROM SalesFact").fetchone()[0]
    print(f"  ✓ Loaded {fact_count} sales records")
    
    return fact_count

# Execute the loading process
db_path = 'retail_dw.db'
conn = create_database_schema(db_path)

# Load dimensions
customer_count, time_count, product_count = load_dimension_tables(conn, customer_dim, time_dim, product_dim)

# Create and load fact table
sales_fact = create_sales_fact(transformed_data, time_dim, product_dim)
fact_count = load_fact_table(conn, sales_fact)

2025-08-14 13:45:35,931 - INFO - Creating database schema



Step 3: Loading Data to Database (retail_dw.db)...
✓ Database schema created successfully
- Loading dimension tables...
  ✓ Loaded 99 customers
  ✓ Loaded 261 time records
  ✓ Loaded 467 products
- Creating sales fact table...
  ✓ Created sales fact with 467 records
- Loading sales fact table...
  ✓ Loaded 467 sales records


## Step 4: ETL Function and Logging

In [5]:
def full_etl_process(num_rows=1000, db_path='retail_dw.db'):
    """
    Complete ETL process function with comprehensive logging
    """
    print(f"\n{'='*60}")
    print("COMPLETE ETL PROCESS EXECUTION")
    print(f"{'='*60}")
    
    start_time = datetime.now()
    logger.info("Starting full ETL process")
    
    try:
        # EXTRACT
        print("\n🔄 EXTRACT PHASE")
        logger.info("Extract phase started")
        raw_data = generate_synthetic_retail_data(num_rows)
        extract_rows = len(raw_data)
        print(f"✅ Extracted {extract_rows} rows")
        logger.info(f"Extract completed: {extract_rows} rows")
        
        # TRANSFORM
        print("\n🔄 TRANSFORM PHASE")
        logger.info("Transform phase started")
        transformed_data = transform_retail_data(raw_data)
        transform_rows = len(transformed_data)
        
        # Create dimensions
        customer_dim = create_customer_dimension(transformed_data)
        time_dim = create_time_dimension(transformed_data)
        product_dim = create_product_dimension(transformed_data)
        
        print(f"✅ Transformed to {transform_rows} fact rows")
        logger.info(f"Transform completed: {transform_rows} fact rows")
        
        # LOAD
        print("\n🔄 LOAD PHASE")
        logger.info("Load phase started")
        conn = create_database_schema(db_path)
        
        # Load dimensions
        customer_count, time_count, product_count = load_dimension_tables(conn, customer_dim, time_dim, product_dim)
        
        # Load fact
        sales_fact = create_sales_fact(transformed_data, time_dim, product_dim)
        fact_count = load_fact_table(conn, sales_fact)
        
        print(f"✅ Loaded {fact_count} sales records")
        logger.info(f"Load completed: {fact_count} sales records")
        
        # Summary
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        print(f"\n{'='*60}")
        print("ETL PROCESS SUMMARY")
        print(f"{'='*60}")
        print(f"⏱️  Duration: {duration:.2f} seconds")
        print(f"📥 Extracted: {extract_rows:,} rows")
        print(f"🔧 Transformed: {transform_rows:,} rows")
        print(f"📤 Loaded:")
        print(f"   - Sales Facts: {fact_count:,}")
        print(f"   - Customers: {customer_count:,}")
        print(f"   - Time Periods: {time_count:,}")
        print(f"   - Products: {product_count:,}")
        print(f"💾 Database: {db_path}")
        print(f"✅ ETL Process Completed Successfully!")
        
        logger.info(f"ETL process completed successfully in {duration:.2f} seconds")
        
        conn.close()
        
        return {
            'success': True,
            'duration': duration,
            'extracted_rows': extract_rows,
            'transformed_rows': transform_rows,
            'loaded_facts': fact_count,
            'loaded_customers': customer_count,
            'loaded_time_periods': time_count,
            'loaded_products': product_count
        }
        
    except Exception as e:
        logger.error(f"ETL process failed: {str(e)}")
        print(f"❌ ETL Process Failed: {str(e)}")
        return {'success': False, 'error': str(e)}

# Execute the complete ETL process
etl_result = full_etl_process(1000, 'retail_dw.db')

# ## Data Quality Verification

def verify_data_quality(db_path='retail_dw.db'):
    """Verify the quality of loaded data"""
    print(f"\n{'='*60}")
    print("DATA QUALITY VERIFICATION")
    print(f"{'='*60}")
    
    conn = sqlite3.connect(db_path)
    
    # Check table counts
    print("\n📊 Table Record Counts:")
    tables = ['CustomerDim', 'TimeDim', 'ProductDim', 'SalesFact']
    for table in tables:
        count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
        print(f"   {table}: {count:,} records")
    
    # Check for referential integrity
    print("\n🔗 Referential Integrity Checks:")
    
    # Check for orphaned records in fact table
    orphaned_customers = conn.execute('''
        SELECT COUNT(*) FROM SalesFact s 
        LEFT JOIN CustomerDim c ON s.customer_id = c.customer_id 
        WHERE c.customer_id IS NULL
    ''').fetchone()[0]
    
    orphaned_products = conn.execute('''
        SELECT COUNT(*) FROM SalesFact s 
        LEFT JOIN ProductDim p ON s.ProductID = p.ProductID 
        WHERE p.ProductID IS NULL
    ''').fetchone()[0]
    
    orphaned_time = conn.execute('''
        SELECT COUNT(*) FROM SalesFact s 
        LEFT JOIN TimeDim t ON s.TimeID = t.TimeID 
        WHERE t.TimeID IS NULL
    ''').fetchone()[0]
    
    print(f"   Orphaned Customer Records: {orphaned_customers}")
    print(f"   Orphaned Product Records: {orphaned_products}")
    print(f"   Orphaned Time Records: {orphaned_time}")
    
    # Check data ranges
    print("\n📈 Data Quality Metrics:")
    
    # Sales amount range
    sales_stats = conn.execute('''
        SELECT MIN(TotalSales), MAX(TotalSales), AVG(TotalSales) FROM SalesFact
    ''').fetchone()
    print(f"   Sales Range: ${sales_stats[0]:.2f} - ${sales_stats[1]:.2f} (Avg: ${sales_stats[2]:.2f})")
    
    # Date range
    date_range = conn.execute('''
        SELECT MIN(Date), MAX(Date) FROM TimeDim
    ''').fetchone()
    print(f"   Date Range: {date_range[0]} to {date_range[1]}")
    
    # Country distribution
    print("\n🌍 Top 5 Countries by Sales:")
    top_countries = conn.execute('''
        SELECT c.Country, SUM(s.TotalSales) as TotalSales, COUNT(*) as TransactionCount
        FROM SalesFact s
        JOIN CustomerDim c ON s.customer_id = c.customer_id
        GROUP BY c.Country
        ORDER BY TotalSales DESC
        LIMIT 5
    ''').fetchall()
    
    for country, sales, count in top_countries:
        print(f"   {country}: ${sales:,.2f} ({count:,} transactions)")
    
    conn.close()
    
    print(f"\n✅ Data Quality Verification Complete!")

# Run data quality verification
verify_data_quality('retail_dw.db')

print(f"\n{'='*60}")
print("TASK 2 COMPLETED SUCCESSFULLY!")
print(f"{'='*60}")
print("📁 Files Created:")
print("   - synthetic_retail_data.csv (source data)")
print("   - retail_dw.db (data warehouse)")
print("   - Complete ETL process implemented")
print("   - Data quality verified")
print(f"{'='*60}")

2025-08-14 13:46:22,783 - INFO - Starting full ETL process
2025-08-14 13:46:22,787 - INFO - Extract phase started
2025-08-14 13:46:22,856 - INFO - Extract completed: 1000 rows
2025-08-14 13:46:22,856 - INFO - Transform phase started
2025-08-14 13:46:22,856 - INFO - Starting data transformation
2025-08-14 13:46:22,887 - INFO - Transformation complete. Rows: 1000 -> 447
2025-08-14 13:46:22,915 - INFO - Transform completed: 447 fact rows
2025-08-14 13:46:22,919 - INFO - Load phase started
2025-08-14 13:46:22,922 - INFO - Creating database schema



COMPLETE ETL PROCESS EXECUTION

🔄 EXTRACT PHASE
✅ Extracted 1000 rows

🔄 TRANSFORM PHASE

Step 2: Transforming Data...
Initial rows: 1000
- Handling missing values...
  Missing values before: 50, after: 0
- Converting data types...
- Removing outliers...
  Removed 45 rows with outliers
- Calculating TotalSales...
- Filtering for last year's data...
  Filtered out 458 rows older than August 12, 2024
- Creating time dimensions...
Final transformed rows: 447
- Creating Customer Dimension...
  Created customer dimension with 98 customers
- Creating Time Dimension...
  Created time dimension with 259 time periods
- Creating Product Dimension...
  Created product dimension with 447 products
✅ Transformed to 447 fact rows

🔄 LOAD PHASE

Step 3: Loading Data to Database (retail_dw.db)...
✓ Database schema created successfully
- Loading dimension tables...
  ✓ Loaded 98 customers


2025-08-14 13:46:23,056 - INFO - Load completed: 447 sales records
2025-08-14 13:46:23,058 - INFO - ETL process completed successfully in 0.28 seconds


  ✓ Loaded 259 time records
  ✓ Loaded 447 products
- Creating sales fact table...
  ✓ Created sales fact with 447 records
- Loading sales fact table...
  ✓ Loaded 447 sales records
✅ Loaded 447 sales records

ETL PROCESS SUMMARY
⏱️  Duration: 0.28 seconds
📥 Extracted: 1,000 rows
🔧 Transformed: 447 rows
📤 Loaded:
   - Sales Facts: 447
   - Customers: 98
   - Time Periods: 259
   - Products: 447
💾 Database: retail_dw.db
✅ ETL Process Completed Successfully!

DATA QUALITY VERIFICATION

📊 Table Record Counts:
   CustomerDim: 98 records
   TimeDim: 259 records
   ProductDim: 447 records
   SalesFact: 447 records

🔗 Referential Integrity Checks:
   Orphaned Customer Records: 0
   Orphaned Product Records: 0
   Orphaned Time Records: 0

📈 Data Quality Metrics:
   Sales Range: $16.00 - $4951.50 (Avg: $1244.93)
   Date Range: 2024-08-12 to 2025-08-11

🌍 Top 5 Countries by Sales:
   UK: $106,403.90 (81 transactions)
   France: $92,112.19 (83 transactions)
   Italy: $71,126.60 (54 transactions)
