In [3]:
import os
import logging
import sqlite3
from datetime import datetime, timedelta

import numpy as np
import pandas as pd

try:
    from faker import Faker
    HAS_FAKER = True
except ImportError:
    HAS_FAKER = False

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

CURRENT_DATE = datetime(2025, 8, 12)


# 0. Synthetic data generation
# -------------------------------------------------------------------
def generate_synthetic_data(n_rows: int = 1000) -> pd.DataFrame:
    """
    Generate synthetic retail invoice-like data.
    Columns: InvoiceNo, InvoiceDate, CustomerID, Country, ProductID,
             Quantity, UnitPrice.
    """
    logging.info("Generating synthetic data with %d rows...", n_rows)

    if HAS_FAKER:
        fake = Faker()
        Faker.seed(42)
    np.random.seed(42)

    # Basic ranges
    n_customers = 100
    n_products = 200
    countries = [
        "Kenya", "Uganda", "Tanzania", "South Africa", "Nigeria",
        "USA", "UK", "Germany", "India", "Brazil"
    ]

    data = {}

 # InvoiceNo: simple integer or string
    data["InvoiceNo"] = [f"INV-{100000 + i}" for i in range(n_rows)]

    # InvoiceDate: random dates over last 2 years from CURRENT_DATE
    days_back = 365 * 2
    random_days = np.random.randint(0, days_back, size=n_rows)
    data["InvoiceDate"] = [
        (CURRENT_DATE - timedelta(days=int(d))).strftime("%Y-%m-%d")
        for d in random_days
    ]

    # CustomerID: pick from 1..100
    data["CustomerID"] = np.random.randint(1, n_customers + 1, size=n_rows)

    # Country: random from list
    data["Country"] = np.random.choice(countries, size=n_rows)

 # ProductID: pick from 1..200
    data["ProductID"] = np.random.randint(1, n_products + 1, size=n_rows)

    # Quantity: 1..50 (some possible bad values for outlier test)
    data["Quantity"] = np.random.randint(1, 51, size=n_rows)

    # UnitPrice: 1..100 as float with 2 decimals
    data["UnitPrice"] = np.round(
        np.random.uniform(1, 100, size=n_rows), 2
    )
     # Introduce a few bad rows (Quantity < 0 or UnitPrice <= 0) for testing
    bad_indices = np.random.choice(n_rows, size=10, replace=False)
    for idx in bad_indices[:5]:
        data["Quantity"][idx] = -abs(data["Quantity"][idx])
    for idx in bad_indices[5:]:
        data["UnitPrice"][idx] = 0.0

    df = pd.DataFrame(data)
    logging.info("Synthetic data generated: %d rows.", len(df))
    return df


In [4]:
# 1. Extract

def extract_data(csv_path: str | None = None) -> pd.DataFrame:
    """
    Extract data into a pandas DataFrame.
    If csv_path is provided and exists, read from CSV,
    otherwise generate synthetic data.
    """
    if csv_path and os.path.exists(csv_path):
        logging.info("Reading data from CSV: %s", csv_path)
        df = pd.read_csv(csv_path)
    else:
        logging.info("CSV not provided or not found. Using synthetic data.")
        df = generate_synthetic_data(1000)

    logging.info("Extracted rows: %d", len(df))

    # Handle missing values (simple example: drop rows with all NaNs)
    df = df.dropna(how="all")
    logging.info("After dropping fully empty rows: %d", len(df))

    # Convert InvoiceDate to datetime
    if "InvoiceDate" in df.columns:
         df["InvoiceDate"] = pd.to_datetime(df["InvoiceDate"], errors="coerce")
        # Drop rows with invalid dates
    df = df.dropna(subset=["InvoiceDate"])

    logging.info("After date conversion: %d rows", len(df))
    return df


In [5]:
# 2. Transform

def transform_data(df: pd.DataFrame):
    """
    Transform the extracted DataFrame:
      - Handle outliers
      - Compute TotalSales
      - Filter last-year sales (from CURRENT_DATE)
      - Create CustomerDim-like summary
      - Create TimeDim-like DataFrame
    Returns:
      fact_df, customer_dim_df, time_dim_df
    """
    logging.info("Starting transformation on %d rows", len(df))

    # Handle outliers: remove Quantity < 0 or UnitPrice <= 0
    before_outlier = len(df)
    df = df[(df["Quantity"] >= 0) & (df["UnitPrice"] > 0)]
    after_outlier = len(df)
    logging.info(
        "Removed %d rows as outliers (Quantity < 0 or UnitPrice <= 0).",
        before_outlier - after_outlier
    )
 # TotalSales = Quantity * UnitPrice
    df["TotalSales"] = df["Quantity"] * df["UnitPrice"]

    # Filter for sales in the last year (from CURRENT_DATE)
    one_year_ago = CURRENT_DATE - timedelta(days=365)
    mask_last_year = df["InvoiceDate"] >= one_year_ago
    df_last_year = df[mask_last_year].copy()
    logging.info(
        "Filtered to last year (%s to %s): %d rows",
        one_year_ago.date(),
        CURRENT_DATE.date(),
        len(df_last_year)
    )
 # Customer summary: group by CustomerID
    customer_dim_df = (
        df_last_year
        .groupby(["CustomerID", "Country"], as_index=False)
        .agg(
            TotalPurchases=("TotalSales", "sum"),
            TotalQuantity=("Quantity", "sum"),
            OrderCount=("InvoiceNo", "nunique")
        )
    )
    logging.info(
        "CustomerDim rows created: %d unique customers",
        len(customer_dim_df)
    )

 # Time dimension from unique InvoiceDate values
    time_dim_df = (
        df_last_year[["InvoiceDate"]]
        .drop_duplicates()
        .copy()
    )
    time_dim_df["TimeID"] = range(1, len(time_dim_df) + 1)
    time_dim_df["Date"] = time_dim_df["InvoiceDate"].dt.date
    time_dim_df["Day"] = time_dim_df["InvoiceDate"].dt.day
    time_dim_df["Month"] = time_dim_df["InvoiceDate"].dt.month
    time_dim_df["Year"] = time_dim_df["InvoiceDate"].dt.year
    time_dim_df["Quarter"] = time_dim_df["InvoiceDate"].dt.quarter
    logging.info("TimeDim rows created: %d unique dates", len(time_dim_df))

  # Create a mapping from date to TimeID for fact table
    date_to_timeid = dict(
        zip(time_dim_df["Date"], time_dim_df["TimeID"])
    )

    # Fact table: map InvoiceDate -> TimeID
    fact_df = df_last_year.copy()
    fact_df["Date"] = fact_df["InvoiceDate"].dt.date
    fact_df["TimeID"] = fact_df["Date"].map(date_to_timeid)
# Rename columns to be more warehouse-like
    fact_df = fact_df.rename(
        columns={
            "CustomerID": "customer_id",
            "ProductID": "product_id",
            "Quantity": "quantity",
            "UnitPrice": "unit_price",
            "TotalSales": "total_sales",
            "InvoiceNo": "invoice_no",
            "Country": "country"
        }
    )

    logging.info("Fact table rows: %d", len(fact_df))
 # Prepare CustomerDim structure
    customer_dim_df = customer_dim_df.rename(
        columns={
            "CustomerID": "customer_id",
            "Country": "country",
            "TotalPurchases": "total_purchases",
            "TotalQuantity": "total_quantity",
            "OrderCount": "order_count"
        }
    )

    # Prepare TimeDim structure
    time_dim_df = time_dim_df[
        ["TimeID", "Date", "Day", "Month", "Quarter", "Year"]
    ].rename(
        columns={
            "TimeID": "time_id",
            "Date": "date",
            "Day": "day",
            "Month": "month",
            "Quarter": "quarter",
            "Year": "year"
        }
    )
    return fact_df, customer_dim_df, time_dim_df

In [6]:
# 3. Load

def load_to_sqlite(
    fact_df: pd.DataFrame,
    customer_dim_df: pd.DataFrame,
    time_dim_df: pd.DataFrame,
    db_path: str = "retail_dw.db"
):
    """
    Load DataFrames into SQLite:
      - SalesFact
      - CustomerDim
      - TimeDim
    Overwrites tables if they exist.
    """
    logging.info("Loading data into SQLite DB: %s", db_path)

    conn = sqlite3.connect(db_path)
    try:
        # Write dimension tables first
        customer_dim_df.to_sql(
            "CustomerDim",
            conn,
            if_exists="replace",
            index=False
        )
        time_dim_df.to_sql(
            "TimeDim",
            conn,
            if_exists="replace",
            index=False
        )
         # Then fact table
        fact_df.to_sql(
            "SalesFact",
            conn,
            if_exists="replace",
            index=False
        )

        logging.info("Loaded CustomerDim rows: %d", len(customer_dim_df))
        logging.info("Loaded TimeDim rows: %d", len(time_dim_df))
        logging.info("Loaded SalesFact rows: %d", len(fact_df))

    except Exception as e:
        logging.error("Error while loading to SQLite: %s", e)
        raise
    finally:
        conn.close()
        logging.info("SQLite connection closed.")


In [7]:
# 4. Full ETL function

def run_etl(csv_path: str | None = None, db_path: str = "retail_dw.db"):
    """
    Run the full ETL:
      1. Extract from CSV or generate synthetic
      2. Transform (clean, outliers, TotalSales, dimensions)
      3. Load into SQLite
    Logs row counts at each stage.
    """
    logging.info("=== ETL STARTED ===")

    # Extract
    df_raw = extract_data(csv_path=csv_path)
    logging.info("Raw row count after extract: %d", len(df_raw))

    # Transform
    fact_df, customer_dim_df, time_dim_df = transform_data(df_raw)
    logging.info(
        "After transform - Fact: %d, CustomerDim: %d, TimeDim: %d",
        len(fact_df), len(customer_dim_df), len(time_dim_df)
    )
 # Load
    load_to_sqlite(
        fact_df=fact_df,
        customer_dim_df=customer_dim_df,
        time_dim_df=time_dim_df,
        db_path=db_path
    )

    logging.info("=== ETL COMPLETED SUCCESSFULLY ===")


In [8]:
# Script entry point
# -------------------------------------------------------------------
if __name__ == "__main__":
    # If you want to use a real CSV, provide path here:
    # run_etl(csv_path="your_file.csv")
    run_etl()


2025-12-12 15:15:52,893 [INFO] === ETL STARTED ===
2025-12-12 15:15:52,900 [INFO] CSV not provided or not found. Using synthetic data.
2025-12-12 15:15:52,904 [INFO] Generating synthetic data with 1000 rows...
2025-12-12 15:15:52,967 [INFO] Synthetic data generated: 1000 rows.
2025-12-12 15:15:52,973 [INFO] Extracted rows: 1000
2025-12-12 15:15:52,979 [INFO] After dropping fully empty rows: 1000
2025-12-12 15:15:53,006 [INFO] After date conversion: 1000 rows
2025-12-12 15:15:53,006 [INFO] Raw row count after extract: 1000
2025-12-12 15:15:53,012 [INFO] Starting transformation on 1000 rows
2025-12-12 15:15:53,020 [INFO] Removed 10 rows as outliers (Quantity < 0 or UnitPrice <= 0).
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["TotalSales"] = df["Quantity"] * df["UnitPr

In [9]:
import sqlite3
import pandas as pd
import numpy as np

# Connect to SQLite
conn = sqlite3.connect("retail_dw.db")

# Generate synthetic product data
np.random.seed(42)

num_products = 200
categories = ["Electronics", "Clothing", "Home", "Sports", "Beauty"]

product_df = pd.DataFrame({
    "product_id": range(1, num_products + 1),
    "product_name": [f"Product_{i}" for i in range(1, num_products + 1)],
    "category": np.random.choice(categories, size=num_products)
})

# Load into SQLite
product_df.to_sql("dim_product", conn, if_exists="replace", index=False)

conn.close()

product_df.head()

Unnamed: 0,product_id,product_name,category
0,1,Product_1,Sports
1,2,Product_2,Beauty
2,3,Product_3,Home
3,4,Product_4,Beauty
4,5,Product_5,Beauty
