In [104]:
from pathlib import Path

try:
    PROJECT_ROOT = Path(__file__).resolve().parents[1]
except NameError:
    PROJECT_ROOT = Path.cwd()

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
STAGING_DIR = DATA_DIR / "staging"
WAREHOUSE_DIR = PROJECT_ROOT / "warehouse"
DB_PATH = WAREHOUSE_DIR / "security.duckdb"

RAW_DIR.mkdir(parents=True, exist_ok=True)
STAGING_DIR.mkdir(parents=True, exist_ok=True)
WAREHOUSE_DIR.mkdir(parents=True, exist_ok=True)

In [106]:
import logging

def get_logger(name: str = "pipeline") -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
        handler.setFormatter(fmt)
        logger.addHandler(handler)
    return logger

In [108]:
import pandas as pd

def assert_not_null(df: pd.DataFrame, cols: list[str]) -> None:
    missing = df[cols].isna().mean()
    bad = missing[missing > 0].sort_values(ascending=False)
    if not bad.empty:
        raise ValueError(f"NOT NULL check failed:\n{bad}")

def assert_unique(df: pd.DataFrame, cols: list[str]) -> None:
    dupes = df.duplicated(subset=cols).sum()
    if dupes > 0:
        raise ValueError(f"UNIQUE check failed on {cols}. Duplicates: {dupes}")

def assert_accepted_values(df: pd.DataFrame, col: str, allowed: set) -> None:
    bad = set(df[col].dropna().unique()) - allowed
    if bad:
        raise ValueError(f"Accepted values check failed for {col}. Bad: {bad}")

def assert_row_count(df: pd.DataFrame, min_rows: int) -> None:
    if len(df) < min_rows:
        raise ValueError(f"Row count check failed. Got {len(df)}, expected >= {min_rows}")

In [110]:
import sys
from pathlib import Path

# 1) remove already-imported src
sys.modules.pop("src", None)

# 2) point to your ACTUAL project root (edit this!)
PROJECT_ROOT = Path("/Users/sahra").resolve()
print(PROJECT_ROOT)
# 3) put it at the front so it wins imports
sys.path.insert(0, str(PROJECT_ROOT))

# 4) re-import and confirm
import src
print("src path =", list(getattr(src, "__path__", [])))

/Users/sahra
src path = ['/Users/sahra/src']


In [112]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

from src.config import RAW_DIR
from src.utils_logging import get_logger

logger = get_logger("generate_data")

def main(n_users: int = 2000, n_events: int = 20000, seed: int = 42) -> None:
    rng = np.random.default_rng(seed)

    # -------------------------
    # 1) USERS TABLE
    # -------------------------
    users = pd.DataFrame({
        "user_id": np.arange(1, n_users + 1),
        "country": rng.choice(["US", "CA", "UK", "DE", "FR", "IN"], size=n_users,
                              p=[0.45, 0.10, 0.10, 0.10, 0.10, 0.15]),
        "plan": rng.choice(["free", "pro", "enterprise"], size=n_users,
                           p=[0.60, 0.30, 0.10]),
        "account_age_days": rng.integers(1, 2500, size=n_users),
    })

    # -------------------------
    # 2) IP REPUTATION TABLE (MAKE IPS UNIQUE!)
    # -------------------------
    n_ips = 5000

    # Generate unique IPs safely by using a set
    ips = set()
    while len(ips) < n_ips:
        ips.add(f"10.{rng.integers(0,256)}.{rng.integers(0,256)}.{rng.integers(0,256)}")
    ips = list(ips)

    ip_rep = pd.DataFrame({
        "ip": ips,
        "ip_risk": np.clip(rng.normal(0.25, 0.2, size=n_ips), 0, 1),
        "is_known_bad": (rng.random(n_ips) < 0.03).astype(int)
    })

    # -------------------------
    # 3) AUTH EVENTS TABLE
    # -------------------------
    start = datetime(2025, 1, 1)
    timestamps = [start + timedelta(minutes=int(x)) for x in rng.integers(0, 60 * 24 * 60, size=n_events)]  # 60 days

    events = pd.DataFrame({
        "event_id": np.arange(1, n_events + 1),
        "user_id": rng.integers(1, n_users + 1, size=n_events),
        "timestamp": timestamps,
        "device_type": rng.choice(["mobile", "desktop", "tablet"], size=n_events, p=[0.5, 0.4, 0.1]),
        "browser": rng.choice(["chrome", "firefox", "safari", "edge"], size=n_events, p=[0.55, 0.15, 0.2, 0.1]),
        "is_vpn": (rng.random(n_events) < 0.2).astype(int),
        "failed_logins": rng.poisson(1.2, size=n_events),
        "session_duration": rng.exponential(120, size=n_events),
        "bytes_sent": rng.lognormal(10, 1.0, size=n_events),
        "bytes_received": rng.lognormal(10.2, 1.1, size=n_events),
        "ip": rng.choice(ip_rep["ip"].values, size=n_events),
        "success": (rng.random(n_events) > 0.15).astype(int)
    })

    # -------------------------
    # 4) CREATE LABEL WITH REALISTIC SIGNAL
    #    (JOIN USERS + IP_REP WITHOUT ROW EXPLOSION)
    # -------------------------
    tmp = events.merge(users[["user_id", "account_age_days", "plan"]], on="user_id", how="left")
    tmp = tmp.merge(ip_rep[["ip", "ip_risk", "is_known_bad"]], on="ip", how="left")

    # Safety check: joins must not change row count
    if len(tmp) != len(events):
        raise ValueError(f"Join exploded rows: tmp={len(tmp)} events={len(events)}. Check join keys uniqueness.")

    prob = (
        0.25 * tmp["ip_risk"].fillna(0) +
        0.25 * (tmp["failed_logins"] > 2).astype(int) +
        0.15 * tmp["is_vpn"] +
        0.15 * (tmp["account_age_days"] < 30).astype(int) +
        0.10 * (tmp["plan"] == "free").astype(int) +
        0.20 * tmp["is_known_bad"].fillna(0)
    )

    noise = rng.normal(0, 0.08, size=len(tmp))
    events["label"] = ((prob + noise) > 0.55).astype(int).to_numpy()

    # -------------------------
    # 5) INJECT MISSING VALUES (REALISTIC DATA ISSUES)
    # -------------------------
    for col in ["session_duration", "bytes_sent"]:
        idx = rng.choice(events.index, size=int(0.02 * n_events), replace=False)
        events.loc[idx, col] = np.nan

    idx = rng.choice(users.index, size=int(0.02 * n_users), replace=False)
    users.loc[idx, "country"] = np.nan

    # -------------------------
    # 6) WRITE RAW FILES
    # -------------------------
    users.to_csv(RAW_DIR / "user_profiles.csv", index=False)
    ip_rep.to_csv(RAW_DIR / "ip_reputation.csv", index=False)
    events.to_csv(RAW_DIR / "auth_events.csv", index=False)

    logger.info(f"Wrote raw files to: {RAW_DIR}")
    logger.info("Files: user_profiles.csv, ip_reputation.csv, auth_events.csv")


if __name__ == "__main__":
    main()

2026-01-29 21:16:53,994 | INFO | Wrote raw files to: /Users/sahra/data/raw
2026-01-29 21:16:53,995 | INFO | Files: user_profiles.csv, ip_reputation.csv, auth_events.csv


In [114]:
import pandas as pd
from src.config import RAW_DIR
from src.utils_quality import assert_row_count
from src.utils_logging import get_logger

logger = get_logger()

def extract() -> dict[str, pd.DataFrame]:
    users = pd.read_csv(RAW_DIR / "user_profiles.csv")
    ip_rep = pd.read_csv(RAW_DIR / "ip_reputation.csv")
    events = pd.read_csv(RAW_DIR / "auth_events.csv")

    assert_row_count(users, 100)
    assert_row_count(ip_rep, 100)
    assert_row_count(events, 1000)

    logger.info("Extracted raw datasets")
    return {"users": users, "ip_rep": ip_rep, "events": events}

if __name__ == "__main__":
    extract()

2026-01-29 21:16:54,807 | INFO | Extracted raw datasets


In [116]:
import pandas as pd
import numpy as np
from src.config import STAGING_DIR
from src.utils_logging import get_logger
from src.utils_quality import assert_accepted_values, assert_unique

logger = get_logger()

def transform(dfs: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
    users = dfs["users"].copy()
    ip_rep = dfs["ip_rep"].copy()
    events = dfs["events"].copy()

    # types
    events["timestamp"] = pd.to_datetime(events["timestamp"], errors="coerce")

    # basic cleaning
    events = events.drop_duplicates(subset=["event_id"])
    users = users.drop_duplicates(subset=["user_id"])
    ip_rep = ip_rep.drop_duplicates(subset=["ip"])

    # quality: accepted values
    assert_accepted_values(events, "device_type", {"mobile","desktop","tablet"})
    assert_accepted_values(events, "browser", {"chrome","firefox","safari","edge"})

    # missing handling (simple, interview-ready)
    num_cols = ["session_duration","bytes_sent","bytes_received","failed_logins"]
    for c in num_cols:
        events[c] = pd.to_numeric(events[c], errors="coerce")
        events[c] = events[c].fillna(events[c].median())

    events["is_vpn"] = events["is_vpn"].fillna(0).astype(int)
    events["success"] = events["success"].fillna(0).astype(int)
    events["label"] = events["label"].fillna(0).astype(int)

    # join keys checks
    assert_unique(users, ["user_id"])
    assert_unique(ip_rep, ["ip"])

    # write staging (parquet is better than csv)
    users.to_parquet(STAGING_DIR / "stg_users.parquet", index=False)
    ip_rep.to_parquet(STAGING_DIR / "stg_ip_rep.parquet", index=False)
    events.to_parquet(STAGING_DIR / "stg_events.parquet", index=False)

    logger.info(f"Wrote staging parquet to {STAGING_DIR}")
    return {"stg_users": users, "stg_ip_rep": ip_rep, "stg_events": events}

if __name__ == "__main__":
    # quick local test
    from src.etl_extract import extract
    transform(extract())

2026-01-29 21:16:55,352 | INFO | Extracted raw datasets
2026-01-29 21:16:55,414 | INFO | Wrote staging parquet to /Users/sahra/data/staging


In [118]:
!pip install duckdb



In [120]:
import duckdb
from src.config import DB_PATH, STAGING_DIR
from src.utils_logging import get_logger

logger = get_logger()

def load_to_duckdb() -> None:
    con = duckdb.connect(str(DB_PATH))

    con.execute("CREATE SCHEMA IF NOT EXISTS raw;")
    con.execute("CREATE SCHEMA IF NOT EXISTS staging;")
    con.execute("CREATE SCHEMA IF NOT EXISTS analytics;")

    # idempotent loads: replace tables
    con.execute("DROP TABLE IF EXISTS staging.stg_users;")
    con.execute("DROP TABLE IF EXISTS staging.stg_ip_rep;")
    con.execute("DROP TABLE IF EXISTS staging.stg_events;")

    con.execute(f"""
        CREATE TABLE staging.stg_users AS
        SELECT * FROM read_parquet('{STAGING_DIR / "stg_users.parquet"}');
    """)
    con.execute(f"""
        CREATE TABLE staging.stg_ip_rep AS
        SELECT * FROM read_parquet('{STAGING_DIR / "stg_ip_rep.parquet"}');
    """)
    con.execute(f"""
        CREATE TABLE staging.stg_events AS
        SELECT * FROM read_parquet('{STAGING_DIR / "stg_events.parquet"}');
    """)

    con.close()
    logger.info(f"Loaded staging tables into {DB_PATH}")

if __name__ == "__main__":
    load_to_duckdb()

2026-01-29 21:16:57,832 | INFO | Loaded staging tables into /Users/sahra/warehouse/security.duckdb


In [122]:
import duckdb
from src.config import DB_PATH
from src.utils_logging import get_logger

logger = get_logger()

def build_models() -> None:
    con = duckdb.connect(str(DB_PATH))
    con.execute("CREATE SCHEMA IF NOT EXISTS analytics;")

    # dim_date
    con.execute("DROP TABLE IF EXISTS analytics.dim_date;")
    con.execute("""
        CREATE TABLE analytics.dim_date AS
        SELECT DISTINCT
            CAST(timestamp AS DATE) AS date,
            EXTRACT(year FROM timestamp) AS year,
            EXTRACT(month FROM timestamp) AS month,
            EXTRACT(day FROM timestamp) AS day,
            EXTRACT(dow FROM timestamp) AS day_of_week
        FROM staging.stg_events;
    """)

    # dim_user
    con.execute("DROP TABLE IF EXISTS analytics.dim_user;")
    con.execute("""
        CREATE TABLE analytics.dim_user AS
        SELECT
            user_id,
            country,
            plan,
            account_age_days
        FROM staging.stg_users;
    """)

    # fact_auth_event
    con.execute("DROP TABLE IF EXISTS analytics.fact_auth_event;")
    con.execute("""
        CREATE TABLE analytics.fact_auth_event AS
        SELECT
            e.event_id,
            e.user_id,
            CAST(e.timestamp AS DATE) AS date,
            e.timestamp,
            e.device_type,
            e.browser,
            e.is_vpn,
            e.failed_logins,
            e.session_duration,
            e.bytes_sent,
            e.bytes_received,
            e.ip,
            r.ip_risk,
            r.is_known_bad,
            e.success,
            e.label
        FROM staging.stg_events e
        LEFT JOIN staging.stg_ip_rep r USING (ip);
    """)

    # mart: daily KPIs
    con.execute("DROP TABLE IF EXISTS analytics.mart_daily_kpis;")
    con.execute("""
        CREATE TABLE analytics.mart_daily_kpis AS
        SELECT
            date,
            COUNT(*) AS total_events,
            AVG(success) AS success_rate,
            AVG(label) AS suspicious_rate,
            AVG(ip_risk) AS avg_ip_risk,
            AVG(failed_logins) AS avg_failed_logins,
            SUM(CASE WHEN is_known_bad=1 THEN 1 ELSE 0 END) AS known_bad_events
        FROM analytics.fact_auth_event
        GROUP BY 1
        ORDER BY 1;
    """)

    # mart: risk segments
    con.execute("DROP TABLE IF EXISTS analytics.mart_risk_segments;")
    con.execute("""
        CREATE TABLE analytics.mart_risk_segments AS
        SELECT
            CASE
                WHEN ip_risk >= 0.7 OR is_known_bad=1 THEN 'high'
                WHEN ip_risk >= 0.4 THEN 'medium'
                ELSE 'low'
            END AS risk_segment,
            COUNT(*) AS events,
            AVG(label) AS suspicious_rate,
            AVG(success) AS success_rate
        FROM analytics.fact_auth_event
        GROUP BY 1
        ORDER BY suspicious_rate DESC;
    """)

    con.close()
    logger.info("Built analytics models: dims, fact, marts")

if __name__ == "__main__":
    build_models()

2026-01-29 21:16:59,116 | INFO | Built analytics models: dims, fact, marts


In [124]:
import duckdb
from src.config import DB_PATH
from src.utils_logging import get_logger

logger = get_logger()

def run_checks() -> None:
    con = duckdb.connect(str(DB_PATH))

    # Not null checks
    nn = con.execute("""
        SELECT
          SUM(CASE WHEN user_id IS NULL THEN 1 ELSE 0 END) AS null_user_id,
          SUM(CASE WHEN event_id IS NULL THEN 1 ELSE 0 END) AS null_event_id
        FROM analytics.fact_auth_event;
    """).fetchone()

    if nn[0] > 0 or nn[1] > 0:
        raise ValueError(f"NOT NULL failed: {nn}")

    # Unique key checks
    dupes = con.execute("""
        SELECT COUNT(*) FROM (
            SELECT event_id, COUNT(*) c
            FROM analytics.fact_auth_event
            GROUP BY 1
            HAVING c > 1
        );
    """).fetchone()[0]
    if dupes > 0:
        raise ValueError(f"UNIQUE failed: duplicate event_id groups={dupes}")

    # Accepted values checks
    bad_device = con.execute("""
        SELECT COUNT(*) FROM analytics.fact_auth_event
        WHERE device_type NOT IN ('mobile','desktop','tablet');
    """).fetchone()[0]
    if bad_device > 0:
        raise ValueError(f"Accepted values failed for device_type: {bad_device}")

    con.close()
    logger.info("Analytics checks passed ✅")

if __name__ == "__main__":
    run_checks()

2026-01-29 21:17:00,438 | INFO | Analytics checks passed ✅


In [126]:
import streamlit as st
import duckdb
import pandas as pd
from src.config import DB_PATH

st.set_page_config(page_title="Security Login Analytics", layout="wide")
st.title("Security Login Analytics Dashboard")

con = duckdb.connect(str(DB_PATH), read_only=True)

kpis = con.execute("SELECT * FROM analytics.mart_daily_kpis").df()
seg = con.execute("SELECT * FROM analytics.mart_risk_segments").df()

st.subheader("Daily KPIs")
st.line_chart(kpis.set_index("date")[["total_events","suspicious_rate","success_rate"]])

st.subheader("Risk Segments")
st.dataframe(seg)

con.close()

In [None]:
# To run: PYTHONPATH=/Users/.../src/dashboard_app.py