# ETL

## Modules

In [1]:
import sqlite3

import pandas as pd

from datetime import datetime, timedelta

## Configuration

In [2]:
db_file = 'retail_dw.db'

## Data Extraction

### DataFrame Logging

This logging offers a modularised way to print standardised logs.

In [6]:
def row_logger(df, stage_name):
    """
    Log the number of rows in a dataframe
    """
    print(f"Stage: {stage_name} | Rows: {len(df)}")

### Data Loading

The dataset from the UCI ML website is the an Excel file that they claim has no null values. Thus, the dataset is imported using pandas `read_excel()` function.

In [7]:
def load_xlsx_data(file):
    """
    Load an Excel file into a Pandas Dataframe
    """
    df = pd.read_excel(file)

    return df

### Data Cleaning

This involves:

- Checking for null values
- Checking for duplicates
- Droping duplicates
- Removing outliers
- Datatype Conversion

In [8]:
def check_null_values(df):
    """
    Check if null values exist in a Pandas Dataframe
    """
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    
    if null_counts.empty:
        print("No null values found in the dataframe.")
        return False
    else:
        print("Columns with null values:")
        for col, count in null_counts.items():
            print(f"{col}: {count}")
        return True

In [9]:
def check_duplicates(df):
    """
    Check if duplicates exist in a Pandas Dataframe
    """
    duplicates = df.duplicated()
    
    if duplicates.any():
        print("Duplicates found!")
        return True
    else:
        print("No duplicates found.")
        return False

In [10]:
def drop_duplicates(df):
    """
    Drop duplicates in a Pandas DataFrame if they exist
    """
    row_logger(df, "Before dropping duplicates")
    
    df_cleaned = df.drop_duplicates()
    
    row_logger(df_cleaned, "After dropping duplicates")
    
    return df_cleaned

In [11]:
def remove_outliers(df):
    """
    Remove numeric outliers: < 0 or <= 0
    """
    row_logger(df, "Before removing outliers")
    
    outliers = df.copy()
    
    outliers = outliers[(outliers["Quantity"] >= 0) & (outliers["UnitPrice"] > 0)]

    row_logger(outliers, "After removing outliers")

    return outliers

In [12]:
def datatype_conversion(df):
    """
    Convert the columns in a Pandas Dataframe to the suitable datatype
    """
    row_logger(df, "Before datatype conversion")
    
    converted = df.copy()

    print(f"These are the columns to convert:\n")
    print(converted.columns.to_list())

    converted["InvoiceNo"] = converted["InvoiceNo"].astype(str)
    converted["StockCode"] = converted["StockCode"].astype(str)
    converted["Description"] = converted["Description"].astype(str)
    converted["Quantity"] = pd.to_numeric(converted["Quantity"], errors="coerce").astype("Int64")  # allows NaN
    converted["InvoiceDate"] = pd.to_datetime(converted["InvoiceDate"], errors="coerce")
    converted["UnitPrice"] = pd.to_numeric(converted["UnitPrice"], errors="coerce").astype(float)
    converted["CustomerID"] = converted["CustomerID"].astype(str)
    converted["Country"] = converted["Country"].astype(str)

    print(converted.dtypes)

    row_logger(converted, "After datatype conversion")

    return converted

### Data Enrichment

This involves:

- Creating a `TotalSales` column
- Filtering for data from the last year of recorded data
- Aggregating customer data into a summary

In [13]:
def create_total_sales(df):
    """
    Compute the total sales for each invoice transaction
    """
    row_logger(df, "Before creating TotalSales column")
    
    total_sales = df.copy()
    total_sales['TotalSales'] = total_sales['Quantity'] * total_sales['UnitPrice']

    row_logger(total_sales, "After creating TotalSales column")

    return total_sales

In [14]:
def filter_last_year(df):
    """
    Filter the Pandas Dataframe to retain only transactions
    from the last 365 days relative to the last date in the
    Pandas Dataframe
    """
    row_logger(df, "Before filtering to the last year sales")
    
    latest_date = df['InvoiceDate'].max()
    one_year_ago = latest_date - timedelta(days=365)
    
    filtered = df[df["InvoiceDate"] >= one_year_ago]

    row_logger(filtered, "After filtering to the last year sales")
    
    return filtered

In [15]:
def aggregate_customer_summary(df):
    """
    Aggregate customer data in order to obtain
    customer summary metrics
    """
    row_logger(df, "Before aggregating customer data")
    
    if 'CustomerID' not in df or 'TotalSales' not in df or 'Quantity' not in df or 'InvoiceDate' not in df:
        raise ValueError("The DataFrame must contain 'CustomerID', 'TotalSales', 'Quantity', and 'InvoiceDate' columns.")

    customer_summary = df.groupby('CustomerID').agg(
        total_sales=('TotalSales', 'sum'),
        total_quantity=('Quantity', 'sum'),
        avg_sales_per_transaction=('TotalSales', 'mean'),
        num_transactions=('InvoiceNo', 'nunique'),  # Count distinct transactions
        first_purchase_date=('InvoiceDate', 'min'),
        last_purchase_date=('InvoiceDate', 'max')
    ).reset_index()

    row_logger(customer_summary, "After aggregating customer data")
    
    return customer_summary

### Data Loading

#### Creating Database Connection

In [16]:
def create_connection():
    """
    Creates a connection to the SQLite database
    """
    return sqlite3.connect(db_file)

#### Creating Database Tables

The star schema tables are modelled after the star schema below:

![](../design/star_schema_design.png)

In [17]:
def create_customer_dimension_table():
    """
    Creates the Customer Dimension table in SQLite
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS customer_dimension (
        CustomerID TEXT PRIMARY KEY,
        Country TEXT
    )
    ''')
    
    conn.commit()
    conn.close()

In [18]:
def create_sales_fact_table():
    """
    Creates the Sales Fact table in SQLite
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS sales_fact (
        InvoiceNo TEXT PRIMARY KEY,
        CustomerID TEXT,
        StockCode TEXT,               
        Quantity INTEGER,
        UnitPrice REAL,
        TotalSales REAL,
        InvoiceDate TEXT,
        FOREIGN KEY (CustomerID) REFERENCES customer_dimension(CustomerID),
        FOREIGN KEY (StockCode) REFERENCES product_dimension(StockCode)  -- Link to product dimension
    )
    ''')
    
    conn.commit()
    conn.close()

In [19]:
def create_date_dimension_table():
    """
    Creates the Date Dimension table in SQLite
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS date_dimension (
        Date TEXT PRIMARY KEY,
        Year INTEGER,
        Month INTEGER,
        Day INTEGER,
        Weekday INTEGER,
        Quarter INTEGER
    )
    ''')
    
    conn.commit()
    conn.close()

In [20]:
def create_product_dimension_table():
    """
    Creates the Product Dimension table in SQLite
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS product_dimension (
        StockCode TEXT PRIMARY KEY,
        Description TEXT
    )
    ''')
    
    conn.commit()
    conn.close()

#### Inserting Data into the Database

Using the full dataframe dimensionality reduction is done to map the data onto each dimension and fact table.

In [21]:
def insert_customer_dimension_data(df):
    """
    Inserts data into the Customer Dimension table
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    # Insert the customer dimension data
    customer_dim = df[['CustomerID', 'Country']].drop_duplicates()

    row_logger(customer_dim, "Inserting customer dimension data")
    
    for _, row in customer_dim.iterrows():
        cursor.execute('''
        INSERT OR REPLACE INTO customer_dimension (CustomerID, Country)
        VALUES (?, ?)
        ''', (row['CustomerID'], row['Country']))
    
    conn.commit()
    conn.close()

In [22]:
def insert_sales_fact_data(df):
    """
    Inserts data into the Sales Fact table
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    # Insert sales fact data
    df['TotalSales'] = df['Quantity'] * df['UnitPrice']  # Calculate TotalSales
    
    # We assume the 'StockCode' column is in the dataframe and needs to be inserted.
    sales_fact = df[['InvoiceNo', 'CustomerID', 'StockCode', 'Quantity', 'UnitPrice', 'TotalSales', 'InvoiceDate']]

    row_logger(sales_fact, "Inserting sales fact data")
    
    for _, row in sales_fact.iterrows():
        cursor.execute('''
        INSERT OR REPLACE INTO sales_fact (InvoiceNo, CustomerID, StockCode, Quantity, UnitPrice, TotalSales, InvoiceDate)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            row['InvoiceNo'], 
            row['CustomerID'], 
            row['StockCode'],  # Insert StockCode
            row['Quantity'], 
            row['UnitPrice'], 
            row['TotalSales'], 
            row['InvoiceDate'].strftime('%Y-%m-%d')  # Ensure the date is formatted as 'YYYY-MM-DD'
        ))
    
    conn.commit()
    conn.close()

In [23]:
def insert_date_dimension_data(df):
    """
    Inserts data into the Date Dimension table
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    # Extract date parts and create date dimension
    date_dim = pd.DataFrame()
    date_dim['Date'] = df['InvoiceDate'].dt.date
    date_dim['Year'] = df['InvoiceDate'].dt.year
    date_dim['Month'] = df['InvoiceDate'].dt.month
    date_dim['Day'] = df['InvoiceDate'].dt.day
    date_dim['Weekday'] = df['InvoiceDate'].dt.weekday
    date_dim['Quarter'] = df['InvoiceDate'].dt.quarter
    
    date_dim = date_dim.drop_duplicates()

    row_logger(date_dim, "Inserting date dimension data")

    for _, row in date_dim.iterrows():
        cursor.execute('''
        INSERT OR REPLACE INTO date_dimension (Date, Year, Month, Day, Weekday, Quarter)
        VALUES (?, ?, ?, ?, ?, ?)
        ''', (row['Date'], row['Year'], row['Month'], row['Day'], row['Weekday'], row['Quarter']))
    
    conn.commit()
    conn.close()

In [24]:
def insert_product_dimension_data(df):
    """
    Inserts data into the Product Dimension table
    """
    conn = create_connection()
    cursor = conn.cursor()
    
    # Insert product dimension data
    product_dim = df[['StockCode', 'Description']].drop_duplicates()

    row_logger(product_dim, "Inserting product dimension data")
    
    for _, row in product_dim.iterrows():
        cursor.execute('''
        INSERT OR REPLACE INTO product_dimension (StockCode, Description)
        VALUES (?, ?)
        ''', (row['StockCode'], row['Description']))
    
    conn.commit()
    conn.close()

#### Retrieving Data from a Database Table

In [25]:
def fetch_first_five_rows(table_name):
    """
    Fetches the first five rows from any table
    """
    conn = create_connection()
    query = f'SELECT * FROM {table_name} LIMIT 5'
    result = pd.read_sql_query(query, conn)
    conn.close()
    return result

#### Loading and Validation

In [26]:
def load_and_verify_data(df):
    """
    Creates the star schema tables
    Inserts data into the star schema tables
    Fetches and displays the first five rows for every one of the tables
    """
    create_customer_dimension_table()
    create_sales_fact_table()
    create_date_dimension_table()
    create_product_dimension_table()

    insert_customer_dimension_data(df)
    insert_sales_fact_data(df)
    insert_date_dimension_data(df)
    insert_product_dimension_data(df)

    print("First 5 rows of Customer Dimension Table:")
    print(fetch_first_five_rows('customer_dimension'))
    
    print("\nFirst 5 rows of Sales Fact Table:")
    print(fetch_first_five_rows('sales_fact'))
    
    print("\nFirst 5 rows of Date Dimension Table:")
    print(fetch_first_five_rows('date_dimension'))
    
    print("\nFirst 5 rows of Product Dimension Table:")
    print(fetch_first_five_rows('product_dimension'))


## ETL Pipeline

In [27]:
def etl_pipeline():
    """
    Extract
    """
    # Load the Excel file into the raw dataframe
    raw_df = load_xlsx_data("../../data/raw/Online Retail.xlsx")

    # Display the first five rows of the raw dataframea
    raw_df.head(5)

    """
    Transform
    """

    # Perform datatype conversion for all the columns in the dataframe
    df_converted = datatype_conversion(raw_df)

    # Original Intent: Filter sales data to the last year assuming that the day is August 12 2025
    # This is done immediately after datatype conversion since
    # 1. Only this data will be saved to the database
    # 2. It will minimise computational complexity
    # 3. The last year is the time range of interest
    # The functionality of the filter_last_year() function has been modified
    # It filters for the last year starting at the last InvoiceDate
    # This is done because there is no data from August 12 2024 to August 12 2025
    # Actual Intent: Filter sales data for the last year starting at the last InvoiceDate
    df_filtered = filter_last_year(df_converted)

    # Check for missing values to cross validate the UCI ML statistics
    # There should be no missing values
    check_null_values(df_filtered)
    
    # Handle duplicates by dropping them if they exist
    if check_duplicates(df_filtered):
        df_no_duplicates = drop_duplicates(df_filtered)
    else:
        df_no_duplicates = df_filtered

    # Handle outliers in the Quantity and UnitPrice columns
    df_no_outliers = remove_outliers(df_no_duplicates)

    # Compute the TotalSales Column
    df_revenue = create_total_sales(df_no_outliers)

    # Group data by the CustomerID
    df_customer = aggregate_customer_summary(df_revenue)

    # Display the first five rows of the customer aggregated summary dataframe
    df_customer.head(5)


    """
    Load
    """

    # Load and verify the loading to database
    load_and_verify_data(df_revenue)

In [28]:
etl_pipeline()

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:23]  Stage: → **Before datatyp…

These are the columns to convert:

['InvoiceNo', 'StockCode', 'Description', 'Quantity', 'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country']
InvoiceNo              object
StockCode              object
Description            object
Quantity                Int64
InvoiceDate    datetime64[ns]
UnitPrice             float64
CustomerID             object
Country                object
dtype: object


HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:24]  Stage: → **After datatype…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:24]  Stage: → **Before filteri…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:24]  Stage: → **After filterin…

No null values found in the dataframe.
Duplicates found!


HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:24]  Stage: → **Before droppin…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **After dropping…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **Before removin…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **After removing…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **Before creatin…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **After creating…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **Before aggrega…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **After aggregat…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **Inserting cust…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:25]  Stage: → **Inserting sale…

HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:49]  Stage: → **Inserting date…

  cursor.execute('''


HTML(value="<div style='font-family: monospace; color: #333;'>[2025-08-13 21:32:49]  Stage: → **Inserting prod…

First 5 rows of Customer Dimension Table:
  CustomerID         Country
0    14479.0  United Kingdom
1    16065.0  United Kingdom
2    17430.0  United Kingdom
3    16520.0  United Kingdom
4    15945.0  United Kingdom

First 5 rows of Sales Fact Table:
  InvoiceNo CustomerID StockCode  Quantity  UnitPrice  TotalSales InvoiceDate
0    538032    14479.0     22696         6       1.95       11.70  2010-12-09
1    538035    16065.0     22175         5       2.95       14.75  2010-12-09
2    538037    17430.0     71477        12       3.25       39.00  2010-12-09
3    538040    16520.0     20685         1       7.95        7.95  2010-12-09
4    538044        nan     22812         1       1.95        1.95  2010-12-09

First 5 rows of Date Dimension Table:
         Date  Year  Month  Day  Weekday  Quarter
0  2010-12-09  2010     12    9        3        4
1  2010-12-10  2010     12   10        4        4
2  2010-12-12  2010     12   12        6        4
3  2010-12-13  2010     12   13        0  