In [1]:
import os
import re
import json
import math
import time
import uuid
import random
import string
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, Dict, Any, Optional, Tuple

import pandas as pd


@dataclass(frozen=True)
class Paths:
    base_dir: str = "."
    src_db: str = "b2b_source.sqlite"
    dwh_db: str = "b2b_dwh.sqlite"
    marketing_xlsx: str = "marketing_leads.xlsx"
    weblog_file: str = "weblogs_combined.log"

paths = Paths()

RANDOM_SEED = 42
random.seed(RANDOM_SEED)

N_COMPANIES = 220          # includes suppliers and buyers
N_SUPPLIERS = 55
N_PRODUCTS = 650
N_END_CUSTOMERS = 60_000
N_ORDERS = 140_000
AVG_ITEMS_PER_ORDER = 2.4  # will randomize
MAX_ITEMS_PER_ORDER = 6

LOOKBACK_DAYS = 520  # generate > last year so reports make sense

INCR_NEW_ORDERS = 8_000
INCR_NEW_LOG_LINES = 25_000

COUNTRIES = ["AR", "BR", "CL", "CO", "MX", "US", "ES"]
COUNTRY_WEIGHTS = [0.36, 0.12, 0.08, 0.07, 0.12, 0.15, 0.10]

DEVICE_UA_POOL = [
    # Desktop
    ("desktop", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"),
    ("desktop", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_2_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.3 Safari/605.1.15"),
    ("desktop", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36"),
    # Mobile
    ("mobile", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1"),
    ("mobile", "Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36"),
    # Tablet
    ("tablet", "Mozilla/5.0 (iPad; CPU OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148 Safari/604.1"),
    # Bots
    ("bot", "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"),
]


In [2]:
def etl_print(msg: str) -> None:
    ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{ts}] {msg}")

def utc_now() -> datetime:
    return datetime.now(timezone.utc)

def dt_to_iso(dt: datetime) -> str:
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc).isoformat()

def iso_to_dt(s: str) -> datetime:
    # SQLite stores ISO strings; parse back
    return datetime.fromisoformat(s.replace("Z", "+00:00"))

def chunks(seq: List[Any], size: int) -> Iterable[List[Any]]:
    for i in range(0, len(seq), size):
        yield seq[i:i + size]

def rand_str(n: int) -> str:
    return "".join(random.choices(string.ascii_uppercase + string.digits, k=n))

def weighted_choice(items: List[Any], weights: List[float]) -> Any:
    return random.choices(items, weights=weights, k=1)[0]

def connect(db_path: str) -> sqlite3.Connection:
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA journal_mode=WAL;")
    conn.execute("PRAGMA synchronous=NORMAL;")
    conn.execute("PRAGMA foreign_keys=ON;")
    return conn

def exec_script(conn: sqlite3.Connection, sql: str) -> None:
    conn.executescript(sql)
    conn.commit()


In [3]:
SOURCE_SCHEMA_SQL = """
DROP TABLE IF EXISTS companies;
DROP TABLE IF EXISTS suppliers;
DROP TABLE IF EXISTS end_customers;
DROP TABLE IF EXISTS products;
DROP TABLE IF EXISTS supplier_products;
DROP TABLE IF EXISTS company_catalog;
DROP TABLE IF EXISTS orders;
DROP TABLE IF EXISTS order_items;
DROP TABLE IF EXISTS ip_geo_map;
DROP TABLE IF EXISTS etl_source_audit;

CREATE TABLE companies (
    company_id      INTEGER PRIMARY KEY,
    cuit            TEXT NOT NULL UNIQUE,
    name            TEXT NOT NULL,
    country_code    TEXT NOT NULL,
    created_at_utc  TEXT NOT NULL
);

CREATE TABLE suppliers (
    supplier_id     INTEGER PRIMARY KEY,
    company_id      INTEGER NOT NULL UNIQUE,
    qualification_tier TEXT NOT NULL,
    FOREIGN KEY(company_id) REFERENCES companies(company_id)
);

CREATE TABLE end_customers (
    end_customer_id INTEGER PRIMARY KEY,
    document_number TEXT NOT NULL UNIQUE,
    full_name       TEXT NOT NULL,
    date_of_birth   TEXT NOT NULL,
    created_at_utc  TEXT NOT NULL
);

CREATE TABLE products (
    product_id      INTEGER PRIMARY KEY,
    sku             TEXT NOT NULL UNIQUE,
    name            TEXT NOT NULL,
    category        TEXT NOT NULL,
    created_at_utc  TEXT NOT NULL
);

CREATE TABLE supplier_products (
    supplier_product_id INTEGER PRIMARY KEY,
    supplier_id     INTEGER NOT NULL,
    product_id      INTEGER NOT NULL,
    default_price   REAL NOT NULL,
    active_flag     INTEGER NOT NULL,
    created_at_utc  TEXT NOT NULL,
    UNIQUE(supplier_id, product_id),
    FOREIGN KEY(supplier_id) REFERENCES suppliers(supplier_id),
    FOREIGN KEY(product_id) REFERENCES products(product_id)
);

-- each buyer company defines its own catalog price for products coming from suppliers
CREATE TABLE company_catalog (
    catalog_id      INTEGER PRIMARY KEY,
    buyer_company_id INTEGER NOT NULL,
    product_id      INTEGER NOT NULL,
    supplier_id     INTEGER NOT NULL,
    catalog_price   REAL NOT NULL,
    effective_from_utc TEXT NOT NULL,
    effective_to_utc   TEXT,
    active_flag     INTEGER NOT NULL,
    FOREIGN KEY(buyer_company_id) REFERENCES companies(company_id),
    FOREIGN KEY(product_id) REFERENCES products(product_id),
    FOREIGN KEY(supplier_id) REFERENCES suppliers(supplier_id)
);

CREATE TABLE orders (
    order_id        INTEGER PRIMARY KEY,
    buyer_company_id INTEGER NOT NULL,
    end_customer_id INTEGER NOT NULL,
    order_ts_utc    TEXT NOT NULL,
    status          TEXT NOT NULL,
    currency        TEXT NOT NULL,
    created_at_utc  TEXT NOT NULL,
    FOREIGN KEY(buyer_company_id) REFERENCES companies(company_id),
    FOREIGN KEY(end_customer_id) REFERENCES end_customers(end_customer_id)
);

CREATE TABLE order_items (
    order_item_id   INTEGER PRIMARY KEY,
    order_id        INTEGER NOT NULL,
    product_id      INTEGER NOT NULL,
    supplier_id     INTEGER NOT NULL,
    qty             INTEGER NOT NULL,
    unit_price      REAL NOT NULL,
    line_total      REAL NOT NULL,
    created_at_utc  TEXT NOT NULL,
    FOREIGN KEY(order_id) REFERENCES orders(order_id),
    FOREIGN KEY(product_id) REFERENCES products(product_id),
    FOREIGN KEY(supplier_id) REFERENCES suppliers(supplier_id)
);

-- for deterministic "geo" without external API
CREATE TABLE ip_geo_map (
    ip              TEXT PRIMARY KEY,
    country_code    TEXT NOT NULL,
    city            TEXT NOT NULL
);

-- optional: a tiny audit table in source to mimic “last successful load”
CREATE TABLE etl_source_audit (
    id              INTEGER PRIMARY KEY,
    entity_name     TEXT NOT NULL,
    last_change_id  INTEGER,
    last_change_ts_utc TEXT,
    updated_at_utc  TEXT NOT NULL
);
"""

src_conn = connect(paths.src_db)
exec_script(src_conn, SOURCE_SCHEMA_SQL)
src_conn.close()

print("Created source DB:", paths.src_db)


Created source DB: b2b_source.sqlite


In [4]:
def build_companies(n_companies: int, n_suppliers: int) -> Tuple[pd.DataFrame, pd.DataFrame]:
    now = utc_now()
    companies = []
    for i in range(1, n_companies + 1):
        country = weighted_choice(COUNTRIES, COUNTRY_WEIGHTS)
        cuit = f"{random.randint(20, 33)}-{random.randint(10_000_000, 99_999_999)}-{random.randint(0,9)}"
        name = f"{random.choice(['Aurum','Delta','Nimbus','Vertex','Orion','Pampa','Andes','Pacifica'])} {random.choice(['Trading','Supply','Wholesale','Industrial','Market'])} {rand_str(3)}"
        created = now - timedelta(days=random.randint(30, LOOKBACK_DAYS))
        companies.append((i, cuit, name, country, dt_to_iso(created)))

    companies_df = pd.DataFrame(companies, columns=["company_id","cuit","name","country_code","created_at_utc"])

    supplier_company_ids = set(random.sample(list(companies_df["company_id"]), k=n_suppliers))
    suppliers = []
    tiers = ["bronze", "silver", "gold"]
    for sid, cid in enumerate(sorted(supplier_company_ids), start=1):
        suppliers.append((sid, cid, weighted_choice(tiers, [0.25, 0.45, 0.30])))

    suppliers_df = pd.DataFrame(suppliers, columns=["supplier_id","company_id","qualification_tier"])
    return companies_df, suppliers_df

def build_products(n_products: int) -> pd.DataFrame:
    now = utc_now()
    categories = ["office", "safety", "packaging", "electronics", "tools", "cleaning", "food-service", "lab"]
    items = []
    for pid in range(1, n_products + 1):
        cat = random.choice(categories)
        sku = f"{cat[:3].upper()}-{random.randint(100000, 999999)}"
        name = f"{random.choice(['Pro','Ultra','Eco','Prime','Max'])} {random.choice(['Kit','Bundle','Pack','Unit','Case'])} {random.choice(['A','B','C','X','Z'])}-{random.randint(1,99)}"
        created = now - timedelta(days=random.randint(15, LOOKBACK_DAYS))
        items.append((pid, sku, name, cat, dt_to_iso(created)))
    return pd.DataFrame(items, columns=["product_id","sku","name","category","created_at_utc"])

def build_supplier_products(suppliers_df: pd.DataFrame, products_df: pd.DataFrame) -> pd.DataFrame:
    now = utc_now()
    rows = []
    sp_id = 1
    for _, s in suppliers_df.iterrows():
        # each supplier carries a subset
        carry = random.randint(int(N_PRODUCTS * 0.25), int(N_PRODUCTS * 0.55))
        product_ids = random.sample(list(products_df["product_id"]), k=carry)
        for pid in product_ids:
            base = round(random.uniform(3.0, 900.0), 2)
            active = 1 if random.random() > 0.03 else 0
            created = now - timedelta(days=random.randint(1, LOOKBACK_DAYS))
            rows.append((sp_id, int(s["supplier_id"]), int(pid), base, active, dt_to_iso(created)))
            sp_id += 1
    return pd.DataFrame(rows, columns=[
        "supplier_product_id","supplier_id","product_id","default_price","active_flag","created_at_utc"
    ])

def build_company_catalog(companies_df: pd.DataFrame, suppliers_df: pd.DataFrame, supplier_products_df: pd.DataFrame) -> pd.DataFrame:
    now = utc_now()
    supplier_ids = set(suppliers_df["supplier_id"].tolist())
    buyer_company_ids = [cid for cid in companies_df["company_id"].tolist() if cid not in set(suppliers_df["company_id"].tolist())]

    rows = []
    catalog_id = 1

    # make a lookup of active supplier_products by supplier
    active_sp = supplier_products_df[supplier_products_df["active_flag"] == 1].copy()
    by_supplier = {}
    for sid, grp in active_sp.groupby("supplier_id"):
        by_supplier[int(sid)] = grp[["product_id","default_price"]].values.tolist()

    for buyer_cid in random.sample(buyer_company_ids, k=int(len(buyer_company_ids) * 0.85)):
        # each buyer picks several suppliers
        chosen_suppliers = random.sample(list(supplier_ids), k=random.randint(8, min(18, len(supplier_ids))))
        for sid in chosen_suppliers:
            if sid not in by_supplier:
                continue
            # each supplier contributes a subset to the buyer's catalog
            sp_rows = by_supplier[sid]
            take = random.randint(40, 120)
            for pid, default_price in random.sample(sp_rows, k=min(take, len(sp_rows))):
                # negotiate: +/- up to 15%
                factor = random.uniform(0.85, 1.15)
                catalog_price = round(float(default_price) * factor, 2)
                eff_from = now - timedelta(days=random.randint(1, LOOKBACK_DAYS))
                rows.append((catalog_id, int(buyer_cid), int(pid), int(sid), catalog_price, dt_to_iso(eff_from), None, 1))
                catalog_id += 1

    return pd.DataFrame(rows, columns=[
        "catalog_id","buyer_company_id","product_id","supplier_id","catalog_price",
        "effective_from_utc","effective_to_utc","active_flag"
    ])

# header - build end_customers (stable constraints, controlled bad DOB only)

def build_end_customers(n_customers: int) -> pd.DataFrame:
    now = utc_now()
    first = ["Juan","Sofia","Mateo","Valentina","Martin","Camila","Bruno","Lucia","Diego","Ana","Tomas","Renata"]
    last = ["Gomez","Perez","Lopez","Diaz","Fernandez","Silva","Alvarez","Romero","Torres","Ruiz","Sosa","Vega"]

    rows = []
    for i in range(1, n_customers + 1):
        dob = datetime(1958, 1, 1, tzinfo=timezone.utc) + timedelta(days=random.randint(0, 60 * 365))
        name = f"{random.choice(first)} {random.choice(last)} {random.choice(last)}"

        # Always unique + NOT NULL for SQLite constraints
        doc = f"DOC-{i:08d}"

        created = now - timedelta(days=random.randint(1, LOOKBACK_DAYS))
        rows.append((i, doc, name, dob.date().isoformat(), dt_to_iso(created)))

    df = pd.DataFrame(rows, columns=["end_customer_id","document_number","full_name","date_of_birth","created_at_utc"])

    # Inject a small number of invalid DOB strings (won't break DB constraints)
    bad_idx = random.sample(range(len(df)), k=35)
    df.loc[bad_idx[:20], "date_of_birth"] = "1900-02-30"   # invalid date string
    df.loc[bad_idx[20:], "date_of_birth"] = "not-a-date"   # another invalid pattern

    return df


def build_orders_and_items(
    companies_df: pd.DataFrame,
    suppliers_df: pd.DataFrame,
    catalog_df: pd.DataFrame,
    end_customers_df: pd.DataFrame,
    n_orders: int
) -> Tuple[pd.DataFrame, pd.DataFrame]:

    now = utc_now()

    supplier_company_ids = set(suppliers_df["company_id"].tolist())
    buyer_company_ids = [cid for cid in companies_df["company_id"].tolist() if cid not in supplier_company_ids]

    # catalog lookup for buyer -> list of (product_id, supplier_id, price)
    cat_active = catalog_df[catalog_df["active_flag"] == 1].copy()
    cat_map: Dict[int, List[Tuple[int,int,float]]] = {}
    for buyer_id, grp in cat_active.groupby("buyer_company_id"):
        cat_map[int(buyer_id)] = [(int(r.product_id), int(r.supplier_id), float(r.catalog_price)) for r in grp.itertuples(index=False)]

    statuses = ["created", "confirmed", "shipped", "delivered", "cancelled"]
    status_w = [0.08, 0.22, 0.18, 0.47, 0.05]

    orders = []
    items = []
    order_item_id = 1

    customer_ids = end_customers_df["end_customer_id"].tolist()

    for order_id in range(1, n_orders + 1):
        buyer = random.choice(buyer_company_ids)
        if buyer not in cat_map or len(cat_map[buyer]) < 20:
            buyer = random.choice([b for b in buyer_company_ids if b in cat_map])

        cust_id = random.choice(customer_ids)
        order_ts = now - timedelta(days=random.randint(0, LOOKBACK_DAYS), seconds=random.randint(0, 86400))
        status = weighted_choice(statuses, status_w)
        currency = "USD" if random.random() < 0.18 else "ARS"

        created = order_ts - timedelta(minutes=random.randint(1, 120))
        orders.append((order_id, int(buyer), int(cust_id), dt_to_iso(order_ts), status, currency, dt_to_iso(created)))

        k = min(MAX_ITEMS_PER_ORDER, max(1, int(random.gauss(mu=AVG_ITEMS_PER_ORDER, sigma=1.1))))
        picks = random.sample(cat_map[buyer], k=k)
        for (pid, sid, unit_price) in picks:
            qty = random.randint(1, 12)
            # occasional dirty row: negative qty or zero price
            if random.random() < 0.0015:
                qty = -qty
            if random.random() < 0.0015:
                unit_price = 0.0

            line_total = round(float(unit_price) * qty, 2)
            items.append((order_item_id, order_id, pid, sid, qty, float(unit_price), float(line_total), dt_to_iso(order_ts)))
            order_item_id += 1

    orders_df = pd.DataFrame(orders, columns=[
        "order_id","buyer_company_id","end_customer_id","order_ts_utc","status","currency","created_at_utc"
    ])
    items_df = pd.DataFrame(items, columns=[
        "order_item_id","order_id","product_id","supplier_id","qty","unit_price","line_total","created_at_utc"
    ])
    return orders_df, items_df

def build_ip_geo_map(companies_df: pd.DataFrame) -> pd.DataFrame:
    # Create IPs per company-country; weblog generator will reuse these
    cities = {
        "AR": ["Buenos Aires", "Cordoba", "Rosario"],
        "BR": ["Sao Paulo", "Rio", "Curitiba"],
        "CL": ["Santiago", "Valparaiso"],
        "CO": ["Bogota", "Medellin"],
        "MX": ["CDMX", "Guadalajara"],
        "US": ["Miami", "Dallas", "NYC"],
        "ES": ["Madrid", "Barcelona"],
    }

    rows = []
    used = set()

    def gen_ip() -> str:
        # keep it within private-ish looking ranges but still readable
        return f"{random.randint(11, 219)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}"

    # one or more IP per company
    for c in companies_df.itertuples(index=False):
        for _ in range(random.randint(1, 4)):
            ip = gen_ip()
            while ip in used:
                ip = gen_ip()
            used.add(ip)
            rows.append((ip, c.country_code, random.choice(cities[c.country_code])))

    df = pd.DataFrame(rows, columns=["ip","country_code","city"])
    return df


def load_df(conn: sqlite3.Connection, df: pd.DataFrame, table: str, chunksize: int = 5_000) -> None:
    # method=None uses executemany under the hood and avoids SQLite parameter limits from multi-row INSERT
    df.to_sql(
        table,
        conn,
        if_exists="append",
        index=False,
        chunksize=chunksize,
        method=None
    )



# Build datasets
companies_df, suppliers_df = build_companies(N_COMPANIES, N_SUPPLIERS)
products_df = build_products(N_PRODUCTS)
supplier_products_df = build_supplier_products(suppliers_df, products_df)
catalog_df = build_company_catalog(companies_df, suppliers_df, supplier_products_df)
end_customers_df = build_end_customers(N_END_CUSTOMERS)
orders_df, order_items_df = build_orders_and_items(companies_df, suppliers_df, catalog_df, end_customers_df, N_ORDERS)
ip_geo_df = build_ip_geo_map(companies_df)

# Load into SQLite
src_conn = connect(paths.src_db)
src_conn.execute("BEGIN;")
load_df(src_conn, companies_df, "companies")
load_df(src_conn, suppliers_df, "suppliers")
load_df(src_conn, products_df, "products")
load_df(src_conn, supplier_products_df, "supplier_products")
load_df(src_conn, catalog_df, "company_catalog")
load_df(src_conn, end_customers_df, "end_customers")
load_df(src_conn, orders_df, "orders")
load_df(src_conn, order_items_df, "order_items")
load_df(src_conn, ip_geo_df, "ip_geo_map")
src_conn.commit()
src_conn.close()

print("Loaded source data.")
print("Companies:", len(companies_df), "Suppliers:", len(suppliers_df), "Products:", len(products_df))
print("Customers:", len(end_customers_df), "Orders:", len(orders_df), "Order items:", len(order_items_df))


Loaded source data.
Companies: 220 Suppliers: 55 Products: 650
Customers: 60000 Orders: 140000 Order items: 282405


In [5]:
def build_marketing_leads(n_rows: int = 18_000) -> pd.DataFrame:
    now = utc_now()
    lead_sources = ["linkedin", "webinar", "partner", "cold-email", "referral", "conference"]
    stages = ["new", "contacted", "qualified", "won", "lost"]
    industries = ["manufacturing", "retail", "healthcare", "logistics", "construction", "hospitality", "energy"]

    rows = []
    for i in range(1, n_rows + 1):
        country = weighted_choice(COUNTRIES, COUNTRY_WEIGHTS)
        created = now - timedelta(days=random.randint(0, 365), hours=random.randint(0, 23))
        company_name = f"{random.choice(['North','South','East','West','Global','Central'])} {random.choice(['Edge','Bridge','Core','Line','Peak'])} {random.choice(['LLC','SA','Inc','SRL'])} {rand_str(3)}"
        email = f"lead_{i}_{rand_str(4).lower()}@example.com"
        stage = weighted_choice(stages, [0.46, 0.22, 0.18, 0.06, 0.08])
        score = max(0, min(100, int(random.gauss(58, 18))))

        rows.append((i, company_name, email, country, random.choice(industries), random.choice(lead_sources), stage, score, dt_to_iso(created)))

    df = pd.DataFrame(rows, columns=[
        "lead_id","company_name","email","country_code","industry","source","stage","lead_score","created_at_utc"
    ])

    # inject a few bad rows
    bad_idx = random.sample(range(len(df)), k=45)
    df.loc[bad_idx[:15], "email"] = "not-an-email"
    df.loc[bad_idx[15:30], "lead_score"] = 900
    df.loc[bad_idx[30:], "company_name"] = ""

    return df


leads_df = build_marketing_leads()
with pd.ExcelWriter(paths.marketing_xlsx, engine="openpyxl") as writer:
    leads_df.to_excel(writer, index=False, sheet_name="leads")

print("Created marketing spreadsheet:", paths.marketing_xlsx, "rows:", len(leads_df))


Created marketing spreadsheet: marketing_leads.xlsx rows: 18000


In [6]:
DWH_SCHEMA_SQL = """
DROP TABLE IF EXISTS etl_runs;
DROP TABLE IF EXISTS etl_checkpoints;
DROP TABLE IF EXISTS etl_row_errors;

DROP TABLE IF EXISTS dim_date;
DROP TABLE IF EXISTS dim_company;
DROP TABLE IF EXISTS dim_end_customer;
DROP TABLE IF EXISTS dim_product;
DROP TABLE IF EXISTS dim_device;
DROP TABLE IF EXISTS dim_marketing_lead;

DROP TABLE IF EXISTS fact_sales;
DROP TABLE IF EXISTS fact_web_events;

CREATE TABLE etl_runs (
    run_id          TEXT PRIMARY KEY,
    job_name        TEXT NOT NULL,
    run_type        TEXT NOT NULL,          -- initial | incremental
    status          TEXT NOT NULL,          -- running | succeeded | failed
    started_at_utc  TEXT NOT NULL,
    finished_at_utc TEXT,
    last_step       TEXT,
    error_message   TEXT
);

CREATE TABLE etl_checkpoints (
    job_name        TEXT NOT NULL,
    entity_name     TEXT NOT NULL,          -- orders | weblogs | leads
    last_id         INTEGER,
    last_ts_utc     TEXT,
    updated_at_utc  TEXT NOT NULL,
    PRIMARY KEY(job_name, entity_name)
);

CREATE TABLE etl_row_errors (
    error_id        TEXT PRIMARY KEY,
    run_id          TEXT NOT NULL,
    source_name     TEXT NOT NULL,          -- b2b_db | weblogs | marketing_xlsx
    entity_name     TEXT NOT NULL,
    record_ref      TEXT,
    error_type      TEXT NOT NULL,
    error_message   TEXT NOT NULL,
    raw_payload     TEXT,
    created_at_utc  TEXT NOT NULL
);

-- Date dimension for reporting
CREATE TABLE dim_date (
    date_key        INTEGER PRIMARY KEY,   -- YYYYMMDD
    date_iso        TEXT NOT NULL UNIQUE,  -- YYYY-MM-DD
    year            INTEGER NOT NULL,
    month           INTEGER NOT NULL,
    day             INTEGER NOT NULL,
    month_start_iso TEXT NOT NULL
);

CREATE TABLE dim_company (
    company_key     INTEGER PRIMARY KEY,
    company_id      INTEGER NOT NULL UNIQUE,
    cuit            TEXT NOT NULL,
    name            TEXT NOT NULL,
    country_code    TEXT NOT NULL
);

CREATE TABLE dim_end_customer (
    end_customer_key INTEGER PRIMARY KEY,
    end_customer_id  INTEGER NOT NULL UNIQUE,
    document_number  TEXT,
    full_name        TEXT,
    date_of_birth    TEXT,
    is_valid         INTEGER NOT NULL
);

CREATE TABLE dim_product (
    product_key     INTEGER PRIMARY KEY,
    product_id      INTEGER NOT NULL UNIQUE,
    sku             TEXT NOT NULL,
    name            TEXT NOT NULL,
    category        TEXT NOT NULL
);

CREATE TABLE dim_device (
    device_key      INTEGER PRIMARY KEY,
    device_family   TEXT NOT NULL UNIQUE   -- desktop|mobile|tablet|bot|other
);

CREATE TABLE dim_marketing_lead (
    lead_key        INTEGER PRIMARY KEY,
    lead_id         INTEGER NOT NULL UNIQUE,
    company_name    TEXT,
    email           TEXT,
    country_code    TEXT,
    industry        TEXT,
    source          TEXT,
    stage           TEXT,
    lead_score      INTEGER,
    is_valid        INTEGER NOT NULL
);

-- Facts
CREATE TABLE fact_sales (
    sales_id        TEXT PRIMARY KEY,
    order_id        INTEGER NOT NULL,
    order_item_id   INTEGER NOT NULL UNIQUE,
    order_ts_utc    TEXT NOT NULL,

    date_key        INTEGER NOT NULL,
    buyer_company_key INTEGER NOT NULL,
    end_customer_key  INTEGER NOT NULL,
    product_key     INTEGER NOT NULL,
    supplier_company_key INTEGER NOT NULL,

    qty             INTEGER NOT NULL,
    unit_price      REAL NOT NULL,
    line_total      REAL NOT NULL,
    currency        TEXT NOT NULL,
    order_status    TEXT NOT NULL,

    FOREIGN KEY(date_key) REFERENCES dim_date(date_key),
    FOREIGN KEY(buyer_company_key) REFERENCES dim_company(company_key),
    FOREIGN KEY(end_customer_key) REFERENCES dim_end_customer(end_customer_key),
    FOREIGN KEY(product_key) REFERENCES dim_product(product_key),
    FOREIGN KEY(supplier_company_key) REFERENCES dim_company(company_key)
);

CREATE TABLE fact_web_events (
    web_event_id    TEXT PRIMARY KEY,
    event_ts_utc    TEXT NOT NULL,
    date_key        INTEGER NOT NULL,

    ip              TEXT NOT NULL,
    country_code    TEXT,
    username        TEXT,
    user_agent      TEXT,
    device_key      INTEGER NOT NULL,

    http_method     TEXT,
    path            TEXT,
    status_code     INTEGER,

    FOREIGN KEY(date_key) REFERENCES dim_date(date_key),
    FOREIGN KEY(device_key) REFERENCES dim_device(device_key)
);

-- Indexes for report speed
CREATE INDEX idx_fact_sales_date ON fact_sales(date_key);
CREATE INDEX idx_fact_sales_product ON fact_sales(product_key);
CREATE INDEX idx_fact_sales_buyer ON fact_sales(buyer_company_key);

CREATE INDEX idx_fact_web_date ON fact_web_events(date_key);
CREATE INDEX idx_fact_web_country ON fact_web_events(country_code);
CREATE INDEX idx_fact_web_device ON fact_web_events(device_key);
"""

dwh_conn = connect(paths.dwh_db)
exec_script(dwh_conn, DWH_SCHEMA_SQL)
dwh_conn.close()

print("Created DWH DB:", paths.dwh_db)


Created DWH DB: b2b_dwh.sqlite


In [7]:
class ETLJob:
    def __init__(self, job_name: str, src_db: str, dwh_db: str):
        self.job_name = job_name
        self.src_db = src_db
        self.dwh_db = dwh_db

    def _start_run(self, run_type: str) -> str:
        run_id = str(uuid.uuid4())
        conn = connect(self.dwh_db)
        conn.execute(
            "INSERT INTO etl_runs(run_id, job_name, run_type, status, started_at_utc) VALUES (?,?,?,?,?)",
            (run_id, self.job_name, run_type, "running", dt_to_iso(utc_now()))
        )
        conn.commit()
        conn.close()
        return run_id

    def _set_run_step(self, run_id: str, step: str) -> None:
        conn = connect(self.dwh_db)
        conn.execute("UPDATE etl_runs SET last_step=? WHERE run_id=?", (step, run_id))
        conn.commit()
        conn.close()

    def _finish_run(self, run_id: str, status: str, error_message: Optional[str] = None) -> None:
        conn = connect(self.dwh_db)
        conn.execute(
            "UPDATE etl_runs SET status=?, finished_at_utc=?, error_message=? WHERE run_id=?",
            (status, dt_to_iso(utc_now()), error_message, run_id)
        )
        conn.commit()
        conn.close()

    def _get_checkpoint(self, entity_name: str) -> Dict[str, Any]:
        conn = connect(self.dwh_db)
        row = conn.execute(
            "SELECT last_id, last_ts_utc FROM etl_checkpoints WHERE job_name=? AND entity_name=?",
            (self.job_name, entity_name)
        ).fetchone()
        conn.close()
        if not row:
            return {"last_id": None, "last_ts_utc": None}
        return {"last_id": row[0], "last_ts_utc": row[1]}

    def _set_checkpoint(self, entity_name: str, last_id: Optional[int], last_ts_utc: Optional[str]) -> None:
        conn = connect(self.dwh_db)
        conn.execute(
            """
            INSERT INTO etl_checkpoints(job_name, entity_name, last_id, last_ts_utc, updated_at_utc)
            VALUES (?,?,?,?,?)
            ON CONFLICT(job_name, entity_name)
            DO UPDATE SET last_id=excluded.last_id, last_ts_utc=excluded.last_ts_utc, updated_at_utc=excluded.updated_at_utc
            """,
            (self.job_name, entity_name, last_id, last_ts_utc, dt_to_iso(utc_now()))
        )
        conn.commit()
        conn.close()

    def _log_row_error(
        self,
        run_id: str,
        source_name: str,
        entity_name: str,
        record_ref: str,
        error_type: str,
        error_message: str,
        raw_payload: Optional[Dict[str, Any]] = None
    ) -> None:
        conn = connect(self.dwh_db)
        conn.execute(
            """
            INSERT INTO etl_row_errors(error_id, run_id, source_name, entity_name, record_ref, error_type, error_message, raw_payload, created_at_utc)
            VALUES (?,?,?,?,?,?,?,?,?)
            """,
            (
                str(uuid.uuid4()),
                run_id,
                source_name,
                entity_name,
                record_ref,
                error_type,
                error_message[:500],
                json.dumps(raw_payload, ensure_ascii=False) if raw_payload is not None else None,
                dt_to_iso(utc_now()),
            )
        )
        conn.commit()
        conn.close()


In [8]:
DEVICE_PATTERNS = [
    ("bot", re.compile(r"bot|spider|crawl", re.I)),
    ("mobile", re.compile(r"iphone|android.+mobile|mobile safari|windows phone", re.I)),
    ("tablet", re.compile(r"ipad|android(?!.*mobile)", re.I)),
    ("desktop", re.compile(r"windows nt|macintosh|x11; linux", re.I)),
]

def parse_device_family(user_agent: str) -> str:
    if not user_agent:
        return "other"
    for fam, pat in DEVICE_PATTERNS:
        if pat.search(user_agent):
            return fam
    return "other"

def date_key_from_iso(ts_utc: str) -> int:
    d = iso_to_dt(ts_utc).date()
    return int(d.strftime("%Y%m%d"))

def ensure_dim_date(dwh_conn: sqlite3.Connection, min_date: datetime, max_date: datetime) -> None:
    # Build date dimension range once
    min_d = min_date.date()
    max_d = max_date.date()
    rows = []
    cur = datetime(min_d.year, min_d.month, min_d.day, tzinfo=timezone.utc)
    end = datetime(max_d.year, max_d.month, max_d.day, tzinfo=timezone.utc)

    while cur.date() <= end.date():
        dk = int(cur.strftime("%Y%m%d"))
        date_iso = cur.date().isoformat()
        month_start = cur.replace(day=1).date().isoformat()
        rows.append((dk, date_iso, cur.year, cur.month, cur.day, month_start))
        cur += timedelta(days=1)

    dwh_conn.execute("BEGIN;")
    dwh_conn.executemany(
        """
        INSERT OR IGNORE INTO dim_date(date_key, date_iso, year, month, day, month_start_iso)
        VALUES (?,?,?,?,?,?)
        """,
        rows
    )
    dwh_conn.commit()


In [9]:
def load_dimensions(run_id: str, src_db: str, dwh_db: str, job_name: str) -> None:
    src = connect(src_db)
    dwh = connect(dwh_db)

    # companies
    comp = pd.read_sql_query("SELECT company_id, cuit, name, country_code FROM companies", src)
    dwh.execute("BEGIN;")
    dwh.executemany(
        """
        INSERT OR IGNORE INTO dim_company(company_id, cuit, name, country_code)
        VALUES (?,?,?,?)
        """,
        comp[["company_id","cuit","name","country_code"]].itertuples(index=False, name=None)
    )
    dwh.commit()

    # products
    prod = pd.read_sql_query("SELECT product_id, sku, name, category FROM products", src)
    dwh.execute("BEGIN;")
    dwh.executemany(
        """
        INSERT OR IGNORE INTO dim_product(product_id, sku, name, category)
        VALUES (?,?,?,?)
        """,
        prod[["product_id","sku","name","category"]].itertuples(index=False, name=None)
    )
    dwh.commit()

    # device families (static)
    device_fams = [("desktop",), ("mobile",), ("tablet",), ("bot",), ("other",)]
    dwh.execute("BEGIN;")
    dwh.executemany("INSERT OR IGNORE INTO dim_device(device_family) VALUES (?)", device_fams)
    dwh.commit()

    # customers with validation
    cust = pd.read_sql_query(
        "SELECT end_customer_id, document_number, full_name, date_of_birth FROM end_customers",
        src
    )

    def validate_customer(r: pd.Series) -> Tuple[int, Optional[str]]:
        # doc is guaranteed valid in source now
        if not r["full_name"] or str(r["full_name"]).strip() == "":
            return 0, "missing_full_name"
        try:
            datetime.fromisoformat(str(r["date_of_birth"]))
        except Exception:
            return 0, "invalid_date_of_birth"
        return 1, None


    is_valid = []
    for _, r in cust.iterrows():
        ok, reason = validate_customer(r)
        is_valid.append(ok)
        if ok == 0:
            # log but still load a row (marked invalid) so facts can still join
            ETLJob(job_name, src_db, dwh_db)._log_row_error(
                run_id=run_id,
                source_name="b2b_db",
                entity_name="end_customers",
                record_ref=str(r["end_customer_id"]),
                error_type="validation_error",
                error_message=reason or "invalid_row",
                raw_payload=r.to_dict(),
            )

    cust["is_valid"] = is_valid

    dwh.execute("BEGIN;")
    dwh.executemany(
        """
        INSERT OR IGNORE INTO dim_end_customer(end_customer_id, document_number, full_name, date_of_birth, is_valid)
        VALUES (?,?,?,?,?)
        """,
        cust[["end_customer_id","document_number","full_name","date_of_birth","is_valid"]].itertuples(index=False, name=None)
    )
    dwh.commit()

    # date dimension range: derive from orders min/max
    mm = src.execute("SELECT MIN(order_ts_utc), MAX(order_ts_utc) FROM orders").fetchone()
    if mm and mm[0] and mm[1]:
        min_dt = iso_to_dt(mm[0])
        max_dt = iso_to_dt(mm[1])
        ensure_dim_date(dwh, min_dt - timedelta(days=2), max_dt + timedelta(days=2))

    src.close()
    dwh.close()


In [10]:
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")

def load_marketing_leads(run_id: str, dwh_db: str, xlsx_path: str, job: ETLJob) -> None:
    dwh = connect(dwh_db)

    df = pd.read_excel(xlsx_path, sheet_name="leads")

    def validate_lead(r: pd.Series) -> Tuple[int, Optional[str]]:
        if not r.get("company_name") or str(r["company_name"]).strip() == "":
            return 0, "missing_company_name"
        if not r.get("email") or not EMAIL_RE.match(str(r["email"]).strip()):
            return 0, "invalid_email"
        score = r.get("lead_score")
        if score is None or not (0 <= int(score) <= 100):
            return 0, "lead_score_out_of_range"
        return 1, None

    is_valid = []
    for _, r in df.iterrows():
        ok, reason = validate_lead(r)
        is_valid.append(ok)
        if ok == 0:
            job._log_row_error(
                run_id=run_id,
                source_name="marketing_xlsx",
                entity_name="leads",
                record_ref=str(r.get("lead_id")),
                error_type="validation_error",
                error_message=reason or "invalid_row",
                raw_payload={k: (None if (isinstance(v, float) and math.isnan(v)) else v) for k, v in r.to_dict().items()},
            )

    df["is_valid"] = is_valid

    # Upsert-like: ignore existing, since lead_id is stable for this project
    dwh.execute("BEGIN;")
    dwh.executemany(
        """
        INSERT OR IGNORE INTO dim_marketing_lead(
            lead_id, company_name, email, country_code, industry, source, stage, lead_score, is_valid
        ) VALUES (?,?,?,?,?,?,?,?,?)
        """,
        df[[
            "lead_id","company_name","email","country_code","industry","source","stage","lead_score","is_valid"
        ]].itertuples(index=False, name=None)
    )
    dwh.commit()
    dwh.close()


In [11]:
def load_sales_facts(run_id: str, src_db: str, dwh_db: str, job: ETLJob, run_type: str) -> None:
    src = connect(src_db)
    dwh = connect(dwh_db)

    cp = job._get_checkpoint("orders")
    last_order_id = cp["last_id"] if run_type == "incremental" else None

    if last_order_id is not None:
        etl_print(f"Sales facts incremental load: starting after order_id={last_order_id}")
    else:
        etl_print("Sales facts initial load: processing all orders")

    where = ""
    params: Tuple[Any, ...] = ()
    if last_order_id is not None:
        where = "WHERE o.order_id > ?"
        params = (int(last_order_id),)

    q = f"""
    SELECT
        o.order_id, o.buyer_company_id, o.end_customer_id, o.order_ts_utc, o.status, o.currency,
        oi.order_item_id, oi.product_id, oi.supplier_id, oi.qty, oi.unit_price, oi.line_total
    FROM orders o
    JOIN order_items oi ON oi.order_id = o.order_id
    {where}
    ORDER BY o.order_id, oi.order_item_id
    """

    cur = src.execute(q, params)

    max_seen_order_id = last_order_id
    batch = []
    BATCH_SIZE = 20_000
    inserted_ok = 0
    skipped_bad = 0

    def flush(rows: List[Tuple[Any, ...]]) -> None:
        if not rows:
            return
        dwh.execute("BEGIN;")
        dwh.executemany(
            """
            INSERT OR IGNORE INTO fact_sales(
                sales_id, order_id, order_item_id, order_ts_utc,
                date_key, buyer_company_key, end_customer_key, product_key, supplier_company_key,
                qty, unit_price, line_total, currency, order_status
            ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
            """,
            rows
        )
        dwh.commit()

    etl_print("Reading orders + items from source DB")

    while True:
        r = cur.fetchone()
        if r is None:
            break

        (
            order_id, buyer_company_id, end_customer_id, order_ts_utc, status, currency,
            order_item_id, product_id, supplier_id, qty, unit_price, line_total
        ) = r

        # Validate row
        if qty is None or int(qty) <= 0:
            skipped_bad += 1
            job._log_row_error(
                run_id, "b2b_db", "order_items", str(order_item_id),
                "validation_error", "qty_must_be_positive",
                raw_payload={"order_item_id": order_item_id, "order_id": order_id, "qty": qty}
            )
            continue

        if unit_price is None or float(unit_price) <= 0:
            skipped_bad += 1
            job._log_row_error(
                run_id, "b2b_db", "order_items", str(order_item_id),
                "validation_error", "unit_price_must_be_positive",
                raw_payload={"order_item_id": order_item_id, "order_id": order_id, "unit_price": unit_price}
            )
            continue

        dk = date_key_from_iso(order_ts_utc)

        # supplier_id -> supplier company_id
        supplier_company_id = src.execute(
            "SELECT company_id FROM suppliers WHERE supplier_id=?",
            (int(supplier_id),)
        ).fetchone()

        if not supplier_company_id:
            skipped_bad += 1
            job._log_row_error(
                run_id, "b2b_db", "order_items", str(order_item_id),
                "lookup_error", "supplier_not_found",
                raw_payload={"supplier_id": supplier_id}
            )
            continue

        supplier_company_key = int(supplier_company_id[0])

        sales_id = str(uuid.uuid4())
        batch.append((
            sales_id,
            int(order_id),
            int(order_item_id),
            order_ts_utc,
            int(dk),
            int(buyer_company_id),
            int(end_customer_id),
            int(product_id),
            int(supplier_company_key),
            int(qty),
            float(unit_price),
            float(line_total),
            str(currency),
            str(status),
        ))

        max_seen_order_id = int(order_id)

        if len(batch) >= BATCH_SIZE:
            flush(batch)
            inserted_ok += len(batch)
            etl_print(f"Inserted {inserted_ok} sales rows so far | last_order_id={max_seen_order_id} | skipped_bad={skipped_bad}")
            batch = []
            job._set_checkpoint("orders", max_seen_order_id, None)

    flush(batch)
    inserted_ok += len(batch)

    if max_seen_order_id is not None:
        job._set_checkpoint("orders", int(max_seen_order_id), None)

    etl_print(f"Sales facts load complete | inserted_ok={inserted_ok} | skipped_bad={skipped_bad} | last_order_id={max_seen_order_id}")

    src.close()
    dwh.close()


In [17]:
import re
from datetime import datetime, timezone

LOG_RE = re.compile(
    r'^(?P<ip>\S+)\s+\S+\s+(?P<user>\S+)\s+\[(?P<ts>[^\]]+)\]\s+'
    r'"(?P<req>[^"]*)"\s+(?P<status>\d{3})\s+\S+\s+"[^"]*"\s+"(?P<ua>[^"]*)"'
)

def parse_apache_ts(ts: str) -> datetime:
    # Example: 10/Oct/2000:13:55:36 +0000
    return datetime.strptime(ts, "%d/%b/%Y:%H:%M:%S %z").astimezone(timezone.utc)


In [12]:
def load_weblogs(run_id: str, src_db: str, dwh_db: str, log_path: str, job: ETLJob, run_type: str) -> None:
    dwh = connect(dwh_db)
    src = connect(src_db)

    cp = job._get_checkpoint("weblogs")
    last_line = int(cp["last_id"] or 0) if run_type == "incremental" else 0

    etl_print(f"Weblogs load started | file={log_path}")
    if run_type == "incremental":
        etl_print(f"Weblogs incremental resume | last_line={last_line}")

    device_key_map = {r[0]: r[1] for r in dwh.execute("SELECT device_family, device_key FROM dim_device").fetchall()}
    ip_geo = {r[0]: r[1] for r in src.execute("SELECT ip, country_code FROM ip_geo_map").fetchall()}

    BATCH = 25_000
    rows = []
    processed_ok = 0
    skipped_bad = 0
    current_line = 0

    def flush() -> None:
        nonlocal rows
        if not rows:
            return
        dwh.execute("BEGIN;")
        dwh.executemany(
            """
            INSERT OR IGNORE INTO fact_web_events(
                web_event_id, event_ts_utc, date_key, ip, country_code, username, user_agent, device_key,
                http_method, path, status_code
            ) VALUES (?,?,?,?,?,?,?,?,?,?,?)
            """,
            rows
        )
        dwh.commit()
        rows = []

    with open(log_path, "r", encoding="utf-8") as f:
        for line in f:
            current_line += 1
            if current_line <= last_line:
                continue

            m = LOG_RE.match(line.strip())
            if not m:
                skipped_bad += 1
                job._log_row_error(
                    run_id, "weblogs", "log_lines", str(current_line),
                    "parse_error", "unmatched_log_format",
                    raw_payload={"line": line.strip()[:800]}
                )
                continue

            ip = m.group("ip")
            user = m.group("user")
            ua = m.group("ua") or ""
            status = int(m.group("status"))
            req = (m.group("req") or "").split()

            try:
                ts_utc = parse_apache_ts(m.group("ts"))
            except Exception as e:
                skipped_bad += 1
                job._log_row_error(
                    run_id, "weblogs", "log_lines", str(current_line),
                    "parse_error", f"bad_timestamp: {e}",
                    raw_payload={"ts": m.group("ts"), "line": line.strip()[:800]}
                )
                continue

            method = req[0] if len(req) >= 2 else None
            path = req[1] if len(req) >= 2 else None

            fam = parse_device_family(ua)
            device_key = device_key_map.get(fam, device_key_map["other"])
            country = ip_geo.get(ip)
            dk = int(ts_utc.strftime("%Y%m%d"))

            rows.append((
                str(uuid.uuid4()),
                dt_to_iso(ts_utc),
                dk,
                ip,
                country,
                None if user == "-" else user,
                ua,
                device_key,
                method,
                path,
                status
            ))

            processed_ok += 1

            if processed_ok % BATCH == 0:
                flush()
                etl_print(f"Weblogs progress | processed_ok={processed_ok} | skipped_bad={skipped_bad} | current_line={current_line}")
                job._set_checkpoint("weblogs", current_line, None)

    flush()
    job._set_checkpoint("weblogs", current_line, None)

    etl_print(f"Weblogs load complete | processed_ok={processed_ok} | skipped_bad={skipped_bad} | last_line={current_line}")

    src.close()
    dwh.close()


In [13]:
def run_etl(job: ETLJob, run_type: str = "initial") -> str:
    etl_print(f"Starting ETL job '{job.job_name}' | run_type={run_type}")

    run_id = job._start_run(run_type=run_type)
    etl_print(f"Run ID: {run_id}")

    try:
        etl_print("Step 1/4 - Loading dimensions")
        job._set_run_step(run_id, "dimensions")
        load_dimensions(run_id, job.src_db, job.dwh_db, job.job_name)
        etl_print("Dimensions loaded successfully")

        etl_print("Step 2/4 - Loading marketing leads")
        job._set_run_step(run_id, "marketing_leads")
        load_marketing_leads(run_id, job.dwh_db, paths.marketing_xlsx, job)
        etl_print("Marketing leads loaded")

        etl_print("Step 3/4 - Loading sales facts")
        job._set_run_step(run_id, "sales_facts")
        load_sales_facts(run_id, job.src_db, job.dwh_db, job, run_type)
        etl_print("Sales facts loaded")

        etl_print("Step 4/4 - Loading weblog events")
        job._set_run_step(run_id, "weblogs")
        load_weblogs(run_id, job.src_db, job.dwh_db, paths.weblog_file, job, run_type)
        etl_print("Weblog events loaded")

        job._finish_run(run_id, "succeeded")
        etl_print("ETL job completed successfully")

        return run_id

    except Exception as e:
        etl_print(f"ETL job failed: {e}")
        job._finish_run(run_id, "failed", error_message=str(e))
        raise


In [15]:
job = ETLJob(
    job_name="b2b_etl",
    src_db=paths.src_db,
    dwh_db=paths.dwh_db
)

print("ETL job initialized")


ETL job initialized


In [19]:
run_id = run_etl(job, run_type="initial")
print("Initial ETL completed. run_id:", run_id)


[2026-02-07 06:59:46] Starting ETL job 'b2b_etl' | run_type=initial
[2026-02-07 06:59:46] Run ID: f7edcb33-9cbb-4d6b-bcf8-ebb7ef9d9e99
[2026-02-07 06:59:46] Step 1/4 - Loading dimensions
[2026-02-07 06:59:51] Dimensions loaded successfully
[2026-02-07 06:59:51] Step 2/4 - Loading marketing leads
[2026-02-07 06:59:59] Marketing leads loaded
[2026-02-07 06:59:59] Step 3/4 - Loading sales facts
[2026-02-07 07:00:00] Sales facts initial load: processing all orders
[2026-02-07 07:00:00] Reading orders + items from source DB
[2026-02-07 07:00:02] Inserted 20000 sales rows so far | last_order_id=9889 | skipped_bad=57
[2026-02-07 07:00:04] Inserted 40000 sales rows so far | last_order_id=19880 | skipped_bad=117
[2026-02-07 07:00:06] Inserted 60000 sales rows so far | last_order_id=29821 | skipped_bad=176
[2026-02-07 07:00:08] Inserted 80000 sales rows so far | last_order_id=39725 | skipped_bad=233
[2026-02-07 07:00:10] Inserted 100000 sales rows so far | last_order_id=49632 | skipped_bad=298
[

In [20]:
dwh = connect(paths.dwh_db)

cp_df = pd.read_sql_query(
    "SELECT entity_name, last_id, last_ts_utc, updated_at_utc FROM etl_checkpoints WHERE job_name='b2b_etl' ORDER BY entity_name",
    dwh
)
etl_print("Current checkpoints:")
display(cp_df)

err_df = pd.read_sql_query(
    """
    SELECT source_name, entity_name, error_type, COUNT(*) AS n
    FROM etl_row_errors
    GROUP BY 1,2,3
    ORDER BY n DESC
    """,
    dwh
)
etl_print("Row error counts:")
display(err_df)

dwh.close()


[2026-02-07 07:01:17] Current checkpoints:


Unnamed: 0,entity_name,last_id,last_ts_utc,updated_at_utc
0,orders,140000,,2026-02-07T01:30:29.418681+00:00
1,weblogs,265000,,2026-02-07T01:31:11.001777+00:00


[2026-02-07 07:01:17] Row error counts:


Unnamed: 0,source_name,entity_name,error_type,n
0,b2b_db,order_items,validation_error,1720
1,b2b_db,end_customers,validation_error,70
2,marketing_xlsx,leads,validation_error,60


In [21]:
dwh = connect(paths.dwh_db)

# Report 1: Top 5 devices used by B2B clients :contentReference[oaicite:4]{index=4}
report_1 = pd.read_sql_query(
    """
    SELECT dd.device_family, COUNT(*) AS events
    FROM fact_web_events f
    JOIN dim_device dd ON dd.device_key = f.device_key
    WHERE f.username IS NOT NULL
    GROUP BY dd.device_family
    ORDER BY events DESC
    LIMIT 5
    """,
    dwh
)
print("Report 1 - Top devices")
display(report_1)

# Country with most user logins (by events with username)
top_country = dwh.execute(
    """
    SELECT country_code
    FROM fact_web_events
    WHERE username IS NOT NULL AND country_code IS NOT NULL
    GROUP BY country_code
    ORDER BY COUNT(*) DESC
    LIMIT 1
    """
).fetchone()
top_country = top_country[0] if top_country else None
print("Country with most user logins:", top_country)

# Report 2: Most popular products in that country :contentReference[oaicite:5]{index=5}
# Interpretation: products most purchased by buyer companies in that country (ties back to B2B sales facts).
report_2 = pd.read_sql_query(
    """
    SELECT
        dp.sku,
        dp.name,
        dp.category,
        SUM(fs.qty) AS total_units,
        ROUND(SUM(fs.line_total), 2) AS total_value
    FROM fact_sales fs
    JOIN dim_product dp ON dp.product_key = fs.product_key
    JOIN dim_company bc ON bc.company_key = fs.buyer_company_key
    WHERE bc.country_code = ?
    GROUP BY dp.sku, dp.name, dp.category
    ORDER BY total_units DESC
    LIMIT 15
    """,
    dwh,
    params=(top_country,)
)
print("Report 2 - Popular products in the top-login country")
display(report_2)

# Report 3: Monthly sales for last year :contentReference[oaicite:6]{index=6}
# We'll anchor "last year" to the max order date in the warehouse, so it stays stable for a demo.
max_order_ts = dwh.execute("SELECT MAX(order_ts_utc) FROM fact_sales").fetchone()[0]
max_dt = iso_to_dt(max_order_ts) if max_order_ts else utc_now()
start_dt = (max_dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - timedelta(days=365)).date().isoformat()

report_3 = pd.read_sql_query(
    """
    SELECT
        substr(dd.month_start_iso, 1, 7) AS year_month,
        COUNT(DISTINCT fs.order_id) AS orders,
        SUM(fs.qty) AS units,
        ROUND(SUM(fs.line_total), 2) AS gross_sales
    FROM fact_sales fs
    JOIN dim_date dd ON dd.date_key = fs.date_key
    WHERE dd.date_iso >= ?
    GROUP BY substr(dd.month_start_iso, 1, 7)
    ORDER BY year_month
    """,
    dwh,
    params=(start_dt,)
)
print("Report 3 - Monthly sales (last 12 months)")
display(report_3)

dwh.close()


Report 1 - Top devices


Unnamed: 0,device_family,events
0,desktop,77293
1,mobile,51803
2,bot,25802
3,tablet,25654


Country with most user logins: AR
Report 2 - Popular products in the top-login country


Unnamed: 0,sku,name,category,total_units,total_value
0,PAC-670027,Eco Pack X-36,packaging,1616,727148.45
1,SAF-858711,Pro Pack B-59,safety,1596,582838.37
2,SAF-302897,Eco Unit X-43,safety,1526,578947.28
3,FOO-256466,Eco Unit B-97,food-service,1519,813406.58
4,PAC-226124,Prime Bundle B-28,packaging,1499,702513.05
5,TOO-713099,Pro Unit B-2,tools,1498,558803.11
6,FOO-869295,Ultra Kit Z-47,food-service,1488,665825.35
7,CLE-692960,Ultra Bundle B-23,cleaning,1485,799509.11
8,OFF-593602,Pro Kit X-7,office,1480,620958.72
9,FOO-147936,Max Bundle B-3,food-service,1477,566804.51


Report 3 - Monthly sales (last 12 months)


Unnamed: 0,year_month,orders,units,gross_sales
0,2025-02,7512,97843,44108036.85
1,2025-03,8220,106556,47509129.05
2,2025-04,8096,106049,47462581.27
3,2025-05,8486,111804,49759751.46
4,2025-06,8103,105807,47239160.31
5,2025-07,8351,109636,49023272.39
6,2025-08,8198,108032,47823754.17
7,2025-09,8169,107364,48035123.81
8,2025-10,8215,108083,48479139.11
9,2025-11,8201,106383,47275836.74


In [22]:
def simulate_incremental_sales(src_db: str, n_new_orders: int) -> None:
    src = connect(src_db)

    # get last IDs
    last_order_id = src.execute("SELECT MAX(order_id) FROM orders").fetchone()[0] or 0
    last_item_id = src.execute("SELECT MAX(order_item_id) FROM order_items").fetchone()[0] or 0

    companies = pd.read_sql_query("SELECT company_id FROM companies", src)
    suppliers = pd.read_sql_query("SELECT supplier_id, company_id FROM suppliers", src)
    customers = pd.read_sql_query("SELECT end_customer_id FROM end_customers", src)
    catalog = pd.read_sql_query("SELECT buyer_company_id, product_id, supplier_id, catalog_price FROM company_catalog WHERE active_flag=1", src)

    supplier_company_ids = set(suppliers["company_id"].tolist())
    buyer_company_ids = [cid for cid in companies["company_id"].tolist() if cid not in supplier_company_ids]

    cat_map: Dict[int, List[Tuple[int,int,float]]] = {}
    for buyer_id, grp in catalog.groupby("buyer_company_id"):
        cat_map[int(buyer_id)] = [(int(r.product_id), int(r.supplier_id), float(r.catalog_price)) for r in grp.itertuples(index=False)]

    now = utc_now()
    statuses = ["created", "confirmed", "shipped", "delivered", "cancelled"]
    status_w = [0.06, 0.22, 0.18, 0.50, 0.04]

    new_orders = []
    new_items = []
    order_item_id = last_item_id + 1

    for i in range(1, n_new_orders + 1):
        order_id = last_order_id + i
        buyer = random.choice([b for b in buyer_company_ids if b in cat_map])
        cust = int(random.choice(customers["end_customer_id"].tolist()))
        order_ts = now - timedelta(days=random.randint(0, 25), seconds=random.randint(0, 86400))
        created = order_ts - timedelta(minutes=random.randint(1, 60))
        status = weighted_choice(statuses, status_w)
        currency = "USD" if random.random() < 0.18 else "ARS"

        new_orders.append((order_id, buyer, cust, dt_to_iso(order_ts), status, currency, dt_to_iso(created)))

        k = random.randint(1, 5)
        picks = random.sample(cat_map[buyer], k=k)
        for (pid, sid, unit_price) in picks:
            qty = random.randint(1, 10)
            line_total = round(unit_price * qty, 2)
            new_items.append((order_item_id, order_id, pid, sid, qty, unit_price, line_total, dt_to_iso(order_ts)))
            order_item_id += 1

    src.execute("BEGIN;")
    src.executemany(
        "INSERT INTO orders(order_id, buyer_company_id, end_customer_id, order_ts_utc, status, currency, created_at_utc) VALUES (?,?,?,?,?,?,?)",
        new_orders
    )
    src.executemany(
        "INSERT INTO order_items(order_item_id, order_id, product_id, supplier_id, qty, unit_price, line_total, created_at_utc) VALUES (?,?,?,?,?,?,?,?)",
        new_items
    )
    src.commit()
    src.close()


simulate_incremental_sales(paths.src_db, INCR_NEW_ORDERS)
print("Inserted incremental orders/items into source:", INCR_NEW_ORDERS)

# Run incremental ETL
run_id_incr = run_etl(job, run_type="incremental")
print("Incremental ETL completed. run_id:", run_id_incr)


Inserted incremental orders/items into source: 8000
[2026-02-07 07:01:57] Starting ETL job 'b2b_etl' | run_type=incremental
[2026-02-07 07:01:57] Run ID: 1979bdff-7714-4ded-aca0-f44a40fe6113
[2026-02-07 07:01:57] Step 1/4 - Loading dimensions
[2026-02-07 07:02:03] Dimensions loaded successfully
[2026-02-07 07:02:03] Step 2/4 - Loading marketing leads
[2026-02-07 07:02:11] Marketing leads loaded
[2026-02-07 07:02:11] Step 3/4 - Loading sales facts
[2026-02-07 07:02:11] Sales facts incremental load: starting after order_id=140000
[2026-02-07 07:02:11] Reading orders + items from source DB
[2026-02-07 07:02:15] Inserted 20000 sales rows so far | last_order_id=146651 | skipped_bad=0
[2026-02-07 07:02:16] Sales facts load complete | inserted_ok=24094 | skipped_bad=0 | last_order_id=148000
[2026-02-07 07:02:16] Sales facts loaded
[2026-02-07 07:02:16] Step 4/4 - Loading weblog events
[2026-02-07 07:02:16] Weblogs load started | file=weblogs_combined.log
[2026-02-07 07:02:16] Weblogs incremen

In [23]:
dwh = connect(paths.dwh_db)

runs = pd.read_sql_query(
    "SELECT run_id, run_type, status, started_at_utc, finished_at_utc, last_step, error_message FROM etl_runs ORDER BY started_at_utc DESC LIMIT 10",
    dwh
)
print("Latest ETL runs")
display(runs)

cps = pd.read_sql_query(
    "SELECT job_name, entity_name, last_id, last_ts_utc, updated_at_utc FROM etl_checkpoints ORDER BY entity_name",
    dwh
)
print("Checkpoints")
display(cps)

errs = pd.read_sql_query(
    "SELECT source_name, entity_name, error_type, COUNT(*) AS n FROM etl_row_errors GROUP BY 1,2,3 ORDER BY n DESC",
    dwh
)
print("Row errors summary (expected: some validation errors we injected)")
display(errs)

dwh.close()


Latest ETL runs


Unnamed: 0,run_id,run_type,status,started_at_utc,finished_at_utc,last_step,error_message
0,1979bdff-7714-4ded-aca0-f44a40fe6113,incremental,succeeded,2026-02-07T01:31:57.590393+00:00,2026-02-07T01:32:16.653201+00:00,weblogs,
1,f7edcb33-9cbb-4d6b-bcf8-ebb7ef9d9e99,initial,succeeded,2026-02-07T01:29:46.336726+00:00,2026-02-07T01:31:11.016275+00:00,weblogs,
2,8d2dc456-072c-4267-bbf2-21f3fa01cae7,initial,failed,2026-02-07T01:26:46.098501+00:00,2026-02-07T01:28:02.171624+00:00,weblogs,name 'LOG_RE' is not defined


Checkpoints


Unnamed: 0,job_name,entity_name,last_id,last_ts_utc,updated_at_utc
0,b2b_etl,orders,148000,,2026-02-07T01:32:16.248119+00:00
1,b2b_etl,weblogs,265000,,2026-02-07T01:32:16.624120+00:00


Row errors summary (expected: some validation errors we injected)


Unnamed: 0,source_name,entity_name,error_type,n
0,b2b_db,order_items,validation_error,1720
1,b2b_db,end_customers,validation_error,105
2,marketing_xlsx,leads,validation_error,90
