# Start analysis

## Import

In [1]:
# [Step 0.1] Minimal imports, project constants, and logger (English-only comments)

from pathlib import Path
import sqlite3
import csv
import datetime
import sys

# ---- Project constants (relative paths only) ----
PROJECT_ROOT = Path.cwd() if (Path.cwd() / "data").exists() else Path.cwd().parent
DATA_DIR     = PROJECT_ROOT / "data"
SQL_DIR      = PROJECT_ROOT / "sql"
EXPORT_DIR   = DATA_DIR / "exports"

RAW_CSV_NAME = "Retail_supply_chain - Retails Order Full Dataset.csv"
DB_NAME      = "Retail_supply_chain.db"

DB_PATH      = DATA_DIR / DB_NAME
RAW_CSV_PATH = DATA_DIR / RAW_CSV_NAME

# ---- Logging (concise, no absolute paths) ----
def log_ok(msg: str):
    # Print messages with relative paths only
    print(f"OK: {msg}")

def rel(p: Path) -> str:
    try:
        return p.relative_to(PROJECT_ROOT).as_posix()
    except Exception:
        return p.name

# ---- Sanity (directories only; actual file checks in next step) ----
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

log_ok(f"project_root = .")
log_ok(f"data dir     = {rel(DATA_DIR)}")
log_ok(f"sql dir      = {rel(SQL_DIR)}")
log_ok(f"exports dir  = {rel(EXPORT_DIR)}")



OK: project_root = .
OK: data dir     = data
OK: sql dir      = sql
OK: exports dir  = data/exports


## Locate path 

In [2]:
# [Step 0.2] Paths & checks (ensure raw CSV exists; set rebuild/export policies)

# ---- Policies (idempotent build) ----
RECREATE_ALL = True          # drop & create all tables on each run
OVERWRITE_EXPORTS = True     # overwrite CSV exports on each run

# ---- Existence checks ----
if not DATA_DIR.exists():
    raise FileNotFoundError("Missing 'data/' directory at project root.")

# Locate raw CSV (exact name first, then fallback glob)
if not RAW_CSV_PATH.exists():
    candidates = sorted(DATA_DIR.rglob("Retail_supply_chain*Full*Dataset*.csv"))
    if not candidates:
        raise FileNotFoundError(
            "Raw CSV not found. Place the file in 'data/' (e.g., "
            "'Retail_supply_chain - Retails Order Full Dataset.csv')."
        )
    RAW_CSV_PATH = candidates[0]

# DB path (may not exist yet; will be created later)
DB_PARENT = DB_PATH.parent
DB_PARENT.mkdir(parents=True, exist_ok=True)

# ---- Concise log (relative paths only) ----
log_ok(f"raw csv      = {rel(RAW_CSV_PATH)}")
log_ok(f"database     = {rel(DB_PATH)} (will be created if missing)")
log_ok(f"recreate_all = {RECREATE_ALL}, overwrite_exports = {OVERWRITE_EXPORTS}")


OK: raw csv      = data/Retail_supply_chain - Retails Order Full Dataset.csv
OK: database     = data/Retail_supply_chain.db (will be created if missing)
OK: recreate_all = True, overwrite_exports = True


## Helpers for Parsing dtype

In [3]:
# [Step 1.1] Helpers for robust parsing (dates / floats / ints)

def to_iso_date(s: str):
    if s is None:
        return None
    s = str(s).strip()
    if not s:
        return None
    for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%d-%m-%Y", "%m-%d-%Y"):
        try:
            return datetime.datetime.strptime(s, fmt).date().isoformat()
        except Exception:
            continue
    return s  # keep as-is if already ISO-like

def parse_float(s):
    if s is None:
        return None
    s = str(s).strip()
    if not s:
        return None
    for ch in ["€", "%", "\u00A0", " "]:
        s = s.replace(ch, "")
    if "," in s and "." in s:
        s = s.replace(".", "").replace(",", ".")
    elif "," in s:
        s = s.replace(",", ".")
    try:
        return float(s)
    except Exception:
        return None

def parse_int(s):
    f = parse_float(s)
    if f is None:
        return None
    try:
        return int(round(f))
    except Exception:
        return None

log_ok("parsers ready")


OK: parsers ready


## Create sql schema and import data into database

In [4]:
# [Step 1.2] Ingest raw CSV -> stg_orders (drop&create), then load rows

schema_sql = """
DROP TABLE IF EXISTS stg_orders;
CREATE TABLE stg_orders (
    row_id               INTEGER,
    order_id             TEXT,
    order_date           TEXT,   -- ISO 8601 YYYY-MM-DD
    ship_date            TEXT,   -- ISO 8601 YYYY-MM-DD
    ship_mode            TEXT,
    customer_id          TEXT,
    customer_name        TEXT,
    segment              TEXT,
    country              TEXT,
    city                 TEXT,
    state                TEXT,
    postal_code          INTEGER,
    region               TEXT,
    retail_sales_people  TEXT,
    product_id           TEXT,
    category             TEXT,
    sub_category         TEXT,
    product_name         TEXT,
    returned             TEXT,
    sales                REAL,
    quantity             INTEGER,
    discount             REAL,
    profit               REAL
);
CREATE INDEX IF NOT EXISTS idx_stg_orders_order_date ON stg_orders(order_date);
CREATE INDEX IF NOT EXISTS idx_stg_orders_region_state ON stg_orders(region, state);
CREATE INDEX IF NOT EXISTS idx_stg_orders_category ON stg_orders(category, sub_category);
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    if RECREATE_ALL:
        conn.executescript(schema_sql)
        log_ok("stg_orders schema created")
    # Load CSV
    import csv as _csv  # local alias to avoid shadowing
    rows = []
    with open(RAW_CSV_PATH, "r", encoding="utf-8-sig", newline="") as f:
        reader = _csv.DictReader(f)
        for r in reader:
            rows.append((
                parse_int(r.get("Row ID")),
                (r.get("Order ID") or "").strip() or None,
                to_iso_date(r.get("Order Date")),
                to_iso_date(r.get("Ship Date")),
                (r.get("Ship Mode") or "").strip() or None,
                (r.get("Customer ID") or "").strip() or None,
                (r.get("Customer Name") or "").strip() or None,
                (r.get("Segment") or "").strip() or None,
                (r.get("Country") or "").strip() or None,
                (r.get("City") or "").strip() or None,
                (r.get("State") or "").strip() or None,
                parse_int(r.get("Postal Code")),
                (r.get("Region") or "").strip() or None,
                (r.get("Retail Sales People") or "").strip() or None,
                (r.get("Product ID") or "").strip() or None,
                (r.get("Category") or "").strip() or None,
                (r.get("Sub-Category") or "").strip() or None,
                (r.get("Product Name") or "").strip() or None,
                (r.get("Returned") or "").strip() or None,
                parse_float(r.get("Sales")),
                parse_int(r.get("Quantity")),
                parse_float(r.get("Discount")),
                parse_float(r.get("Profit")),
            ))
    insert_sql = """
        INSERT INTO stg_orders (
          row_id, order_id, order_date, ship_date, ship_mode, customer_id, customer_name,
          segment, country, city, state, postal_code, region, retail_sales_people,
          product_id, category, sub_category, product_name, returned, sales, quantity, discount, profit
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """
    conn.executemany(insert_sql, rows)
    conn.commit()

# Quick QA
with sqlite3.connect(DB_PATH.as_posix()) as conn:
    cnt = conn.execute("SELECT COUNT(*) FROM stg_orders;").fetchone()[0]
    null_rowid = conn.execute("SELECT COUNT(*) FROM stg_orders WHERE row_id IS NULL;").fetchone()[0]

print(f"stg_orders rows: {cnt} (expected ~9994)")
print(f"row_id NULLs: {null_rowid}")
assert cnt >= 9000, "Unexpected low row count in stg_orders."
assert null_rowid == 0, "row_id should not be NULL."

log_ok("ingest complete")


OK: stg_orders schema created
stg_orders rows: 9994 (expected ~9994)
row_id NULLs: 0
OK: ingest complete


## Cleaning table, prepare for export 

In [5]:
# [Step 2.1] Build Rsc_cleaned (dedup + normalization) and run concise QA

clean_sql = """
-- Deduplicate by row_id (latest order_date, fallback rowid)
DROP TABLE IF EXISTS stg_orders_dedup;
CREATE TABLE stg_orders_dedup AS
WITH ranked AS (
  SELECT
    s.*,
    ROW_NUMBER() OVER (
      PARTITION BY s.row_id
      ORDER BY s.order_date DESC, s.rowid ASC
    ) AS rn
  FROM stg_orders s
)
SELECT
  row_id, order_id, order_date, ship_date, ship_mode, customer_id, customer_name,
  segment, country, city, state, postal_code, region, retail_sales_people,
  product_id, category, sub_category, product_name, returned, sales, quantity, discount, profit
FROM ranked
WHERE rn = 1;

CREATE INDEX IF NOT EXISTS idx_dedup_order_date ON stg_orders_dedup(order_date);
CREATE INDEX IF NOT EXISTS idx_dedup_region_state ON stg_orders_dedup(region, state);

-- Normalization view (trim/case, YES/NO for returned, numeric guards)
DROP VIEW IF EXISTS clean_orders;
CREATE VIEW clean_orders AS
WITH base AS (
  SELECT
    CAST(row_id AS INTEGER)                  AS row_id,
    TRIM(order_id)                           AS order_id,
    TRIM(order_date)                         AS order_date,   -- ISO text
    TRIM(ship_date)                          AS ship_date,    -- ISO text
    TRIM(ship_mode)                          AS ship_mode,
    TRIM(customer_id)                        AS customer_id,
    TRIM(segment)                            AS segment,
    UPPER(TRIM(country))                     AS country,
    TRIM(city)                               AS city,
    TRIM(state)                              AS state,
    CAST(postal_code AS INTEGER)             AS postal_code,
    UPPER(TRIM(region))                      AS region,
    TRIM(product_id)                         AS product_id,
    UPPER(TRIM(category))                    AS category,
    UPPER(TRIM(sub_category))                AS sub_category,
    TRIM(product_name)                       AS product_name,
    UPPER(TRIM(returned))                    AS returned_raw,
    CAST(sales    AS REAL)                   AS sales,
    CASE WHEN CAST(quantity AS REAL) < 0 THEN 0 ELSE CAST(quantity AS INTEGER) END AS quantity,
    CASE
      WHEN discount IS NULL THEN NULL
      WHEN discount > 1.0 AND discount <= 100.0 THEN ROUND(discount/100.0, 4)
      WHEN discount < 0.0 THEN 0.0
      ELSE CAST(discount AS REAL)
    END                                      AS discount,
    CAST(profit   AS REAL)                   AS profit
  FROM stg_orders_dedup
),
normalized AS (
  SELECT
    row_id, order_id, order_date, ship_date, ship_mode, customer_id, segment, country,
    city, state, postal_code, region, product_id, category, sub_category, product_name,
    CASE
      WHEN returned_raw IS NULL OR returned_raw = '' THEN 'NO'
      WHEN returned_raw IN ('Y','YES','TRUE','T','1','RETURNED') THEN 'YES'
      WHEN returned_raw IN ('N','NO','FALSE','F','0','NOT RETURNED','NONE') THEN 'NO'
      WHEN returned_raw LIKE 'NOT%' THEN 'NO'
      ELSE returned_raw
    END AS returned,
    sales, quantity, discount, profit
  FROM base
)
SELECT
  row_id, order_id, order_date, ship_date, ship_mode, customer_id, segment, country,
  city, state, postal_code, region, product_id, category, sub_category, product_name,
  returned, sales, quantity, discount, profit
FROM normalized;

-- Materialize for BI
DROP TABLE IF EXISTS Rsc_cleaned;
CREATE TABLE Rsc_cleaned AS
SELECT * FROM clean_orders;

-- Indexes
CREATE UNIQUE INDEX IF NOT EXISTS ux_rsc_row_id ON Rsc_cleaned(row_id);
CREATE INDEX IF NOT EXISTS idx_rsc_order_date ON Rsc_cleaned(order_date);
CREATE INDEX IF NOT EXISTS idx_rsc_region_state ON Rsc_cleaned(region, state);
CREATE INDEX IF NOT EXISTS idx_rsc_category_sub ON Rsc_cleaned(category, sub_category);
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    conn.executescript(clean_sql)

# --- QA (concise, no absolute paths) ---
with sqlite3.connect(DB_PATH.as_posix()) as conn:
    cur = conn.cursor()
    counts = cur.execute("""
        SELECT
          (SELECT COUNT(*) FROM stg_orders)       AS stg_raw,
          (SELECT COUNT(*) FROM stg_orders_dedup) AS stg_dedup,
          (SELECT COUNT(*) FROM Rsc_cleaned)      AS rsc_cnt
    """).fetchone()
    uniq = cur.execute("""
        SELECT COUNT(*) AS total, COUNT(DISTINCT row_id) AS distinct_row_id
        FROM Rsc_cleaned
    """).fetchone()
    returned = cur.execute("""
        SELECT returned, COUNT(*) FROM Rsc_cleaned
        GROUP BY returned ORDER BY 2 DESC
    """).fetchall()
    sanity = cur.execute("""
        SELECT
          SUM(CASE WHEN discount < 0 OR discount > 1 THEN 1 ELSE 0 END) AS bad_discount,
          SUM(CASE WHEN quantity < 0 THEN 1 ELSE 0 END) AS bad_qty
        FROM Rsc_cleaned
    """).fetchone()
    date_rng = cur.execute("SELECT MIN(order_date), MAX(order_date) FROM Rsc_cleaned").fetchone()

print("counts (stg_raw, stg_dedup, rsc):", counts)
print("row_id uniqueness (total, distinct):", uniq)
print("returned distribution:", returned)
print("numeric sanity (bad_discount, bad_qty):", sanity)
print("order_date range (min, max):", date_rng)

log_ok("cleaning complete -> Rsc_cleaned ready")


counts (stg_raw, stg_dedup, rsc): (9994, 9994, 9994)
row_id uniqueness (total, distinct): (9994, 9994)
returned distribution: [('NO', 9194), ('YES', 800)]
numeric sanity (bad_discount, bad_qty): (0, 0)
order_date range (min, max): ('2014-01-02', '2017-12-30')
OK: cleaning complete -> Rsc_cleaned ready


## create .csv viz for Tableau

In [7]:
# Fix viz_* creation: move CREATE TABLE before the WITH clause in section (2)

# Resolve DB path (supports project root or ./notebook)
cwd = Path.cwd()
data_dir_candidates = [cwd / "data", cwd.parent / "data"]
data_dir = next((p for p in data_dir_candidates if p.exists()), data_dir_candidates[0])
db_path = data_dir / "Retail_supply_chain.db"

viz_sql = """
-- Safety drops
DROP TABLE IF EXISTS viz_sales_by_region_month;
DROP TABLE IF EXISTS viz_top_category_by_region;
DROP TABLE IF EXISTS viz_returns_by_cat_subcat;

-- 1) Monthly sales by region (for interactive map + trend)
CREATE TABLE viz_sales_by_region_month AS
WITH d AS (
  SELECT date_key, year, month, year_month FROM dim_date
)
SELECT
  fs.region,
  fs.state,
  d.year,
  d.month,
  d.year_month,
  SUM(fs.sales)      AS sales,
  SUM(fs.profit)     AS profit,
  SUM(fs.quantity)   AS quantity,
  AVG(fs.discount)   AS avg_discount
FROM fct_sales fs
JOIN d ON fs.date_key = d.date_key
GROUP BY fs.region, fs.state, d.year, d.month, d.year_month;

CREATE INDEX IF NOT EXISTS idx_viz1_region_state_month ON viz_sales_by_region_month(region, state, year_month);

-- 2) Top-selling category by region (by total sales)
CREATE TABLE viz_top_category_by_region AS
WITH cat_region AS (
  SELECT
    fs.region,
    fs.category,
    SUM(fs.sales) AS sales
  FROM fct_sales fs
  GROUP BY fs.region, fs.category
),
ranked AS (
  SELECT
    cr.*,
    ROW_NUMBER() OVER (PARTITION BY cr.region ORDER BY cr.sales DESC) AS rn
  FROM cat_region cr
)
SELECT region, category, sales
FROM ranked
WHERE rn = 1;

CREATE INDEX IF NOT EXISTS idx_viz2_region ON viz_top_category_by_region(region);

-- 3) Returns by category/sub-category (count + rate)
CREATE TABLE viz_returns_by_cat_subcat AS
WITH base AS (
  SELECT
    UPPER(TRIM(category))      AS category,
    UPPER(TRIM(sub_category))  AS sub_category,
    COUNT(*)                   AS total_orders,
    SUM(CASE WHEN returned = 'YES' THEN 1 ELSE 0 END) AS returned_orders
  FROM fct_sales
  GROUP BY UPPER(TRIM(category)), UPPER(TRIM(sub_category))
)
SELECT
  category,
  sub_category,
  returned_orders,
  total_orders,
  CASE WHEN total_orders = 0 THEN 0.0
       ELSE ROUND(1.0 * returned_orders / total_orders, 4) END AS return_rate
FROM base;

CREATE INDEX IF NOT EXISTS idx_viz3_cat_sub ON viz_returns_by_cat_subcat(category, sub_category);
"""

with sqlite3.connect(db_path.as_posix()) as conn:
    conn.executescript(viz_sql)
print("OK: viz tables created (viz_sales_by_region_month, viz_top_category_by_region, viz_returns_by_cat_subcat).")

# Concise QA
with sqlite3.connect(db_path.as_posix()) as conn:
    cur = conn.cursor()
    sizes = cur.execute("""
        SELECT
          (SELECT COUNT(*) FROM viz_sales_by_region_month),
          (SELECT COUNT(*) FROM viz_top_category_by_region),
          (SELECT COUNT(*) FROM viz_returns_by_cat_subcat)
    """).fetchone()
    top_per_region = cur.execute("""
        SELECT region, COUNT(*) AS rows_per_region
        FROM viz_top_category_by_region
        GROUP BY region
    """).fetchall()
    return_rates_ok = cur.execute("""
        SELECT 
          SUM(CASE WHEN return_rate < 0 OR return_rate > 1 THEN 1 ELSE 0 END)
        FROM viz_returns_by_cat_subcat
    """).fetchone()[0]

print("Row counts (viz1, viz2, viz3):", sizes)
print("Rows per region in viz_top_category_by_region:", top_per_region)
print("Return_rate out-of-range rows (should be 0):", return_rates_ok)



OK: viz tables created (viz_sales_by_region_month, viz_top_category_by_region, viz_returns_by_cat_subcat).
Row counts (viz1, viz2, viz3): (1301, 4, 17)
Rows per region in viz_top_category_by_region: [('CENTRAL', 1), ('EAST', 1), ('SOUTH', 1), ('WEST', 1)]
Return_rate out-of-range rows (should be 0): 0


## Create dimension for viz state_month sales

In [None]:
# [Step 3.1] Modeling — create dim_date, dim_geo, dim_product (VIEWs) and fct_sales (TABLE) + QA


model_sql = """
-- Safety drops
DROP VIEW IF EXISTS dim_date;
DROP VIEW IF EXISTS dim_geo;
DROP VIEW IF EXISTS dim_product;
DROP TABLE IF EXISTS fct_sales;

-- Date dimension (month-level helpers)
CREATE VIEW dim_date AS
WITH base AS (
  SELECT DISTINCT order_date
  FROM Rsc_cleaned
  WHERE order_date IS NOT NULL
)
SELECT
  order_date                                        AS date_key,         -- TEXT YYYY-MM-DD
  SUBSTR(order_date,1,4)                            AS year,
  SUBSTR(order_date,6,2)                            AS month,
  SUBSTR(order_date,9,2)                            AS day,
  (SUBSTR(order_date,1,4)||'-'||SUBSTR(order_date,6,2)) AS year_month,
  CASE SUBSTR(order_date,6,2)
    WHEN '01' THEN 'Q1' WHEN '02' THEN 'Q1' WHEN '03' THEN 'Q1'
    WHEN '04' THEN 'Q2' WHEN '05' THEN 'Q2' WHEN '06' THEN 'Q2'
    WHEN '07' THEN 'Q3' WHEN '08' THEN 'Q3' WHEN '09' THEN 'Q3'
    ELSE 'Q4'
  END AS quarter
FROM base;

-- Geography dimension
CREATE VIEW dim_geo AS
SELECT DISTINCT
  UPPER(TRIM(region)) AS region,
  TRIM(state)         AS state,
  TRIM(city)          AS city
FROM Rsc_cleaned
WHERE region IS NOT NULL AND state IS NOT NULL;

-- Product dimension
CREATE VIEW dim_product AS
SELECT DISTINCT
  TRIM(product_id)         AS product_id,
  UPPER(TRIM(category))    AS category,
  UPPER(TRIM(sub_category)) AS sub_category,
  TRIM(product_name)       AS product_name
FROM Rsc_cleaned
WHERE product_id IS NOT NULL;

-- Fact table
CREATE TABLE fct_sales AS
SELECT
  r.row_id         AS row_id,        -- PK
  r.order_id       AS order_id,
  r.order_date     AS date_key,      -- join to dim_date.date_key
  r.region         AS region,        -- join to dim_geo.region
  r.state          AS state,         -- join to dim_geo.state
  r.city           AS city,
  r.product_id     AS product_id,    -- join to dim_product.product_id
  r.category       AS category,
  r.sub_category   AS sub_category,
  r.ship_mode      AS ship_mode,
  r.returned       AS returned,
  r.sales          AS sales,
  r.quantity       AS quantity,
  r.discount       AS discount,
  r.profit         AS profit
FROM Rsc_cleaned r;

-- Indexes for BI
CREATE UNIQUE INDEX IF NOT EXISTS ux_fct_row_id ON fct_sales(row_id);
CREATE INDEX IF NOT EXISTS idx_fct_date     ON fct_sales(date_key);
CREATE INDEX IF NOT EXISTS idx_fct_region   ON fct_sales(region, state);
CREATE INDEX IF NOT EXISTS idx_fct_product  ON fct_sales(product_id);
CREATE INDEX IF NOT EXISTS idx_fct_cat_sub  ON fct_sales(category, sub_category);
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    conn.executescript(model_sql)

# QA (concise)
with sqlite3.connect(DB_PATH.as_posix()) as conn:
    cur = conn.cursor()
    cnts = cur.execute("""
        SELECT
          (SELECT COUNT(*) FROM Rsc_cleaned),
          (SELECT COUNT(*) FROM fct_sales)
    """).fetchone()
    geo = cur.execute("""
        SELECT
          (SELECT COUNT(DISTINCT region) FROM dim_geo),
          (SELECT COUNT(DISTINCT state)  FROM dim_geo),
          (SELECT COUNT(DISTINCT city)   FROM dim_geo)
    """).fetchone()
    prod = cur.execute("""
        SELECT
          (SELECT COUNT(DISTINCT product_id) FROM dim_product),
          (SELECT COUNT(DISTINCT category)   FROM dim_product),
          (SELECT COUNT(DISTINCT sub_category) FROM dim_product)
    """).fetchone()
    nulls = cur.execute("""
        SELECT
          SUM(CASE WHEN date_key  IS NULL THEN 1 ELSE 0 END),
          SUM(CASE WHEN region    IS NULL THEN 1 ELSE 0 END),
          SUM(CASE WHEN state     IS NULL THEN 1 ELSE 0 END),
          SUM(CASE WHEN product_id IS NULL THEN 1 ELSE 0 END)
        FROM fct_sales
    """).fetchone()
    dminmax = cur.execute("SELECT MIN(date_key), MAX(date_key) FROM dim_date").fetchone()

print("counts (Rsc_cleaned, fct_sales):", cnts)
print("dim_geo distinct (region, state, city):", geo)
print("dim_product distinct (product_id, category, subcat):", prod)
print("fct_sales NULLs (date_key, region, state, product_id):", nulls)
print("dim_date range (min, max):", dminmax)

log_ok("modeling complete")


counts (Rsc_cleaned, fct_sales): (9994, 9994)
dim_geo distinct (region, state, city): (4, 49, 531)
dim_product distinct (product_id, category, subcat): (1862, 3, 17)
fct_sales NULLs (date_key, region, state, product_id): (0, 0, 0, 0)
dim_date range (min, max): ('2014-01-02', '2017-12-30')
OK: modeling complete


## Scaffold, re create state month viz with new dim

In [None]:
# [Step 4.1] Build monthly scaffold for map (state × year × month) and export CSV (relative-safe)

# 1) Create / refresh the scaffolded table
scaffold_sql = """
DROP TABLE IF EXISTS viz_state_month_sales;

CREATE TABLE viz_state_month_sales AS
WITH months AS (
  SELECT DISTINCT
    SUBSTR(date_key,1,4) AS year,
    SUBSTR(date_key,6,2) AS month,
    SUBSTR(date_key,1,4) || '-' || SUBSTR(date_key,6,2) AS year_month
  FROM dim_date
),
states AS (
  SELECT DISTINCT region, state
  FROM fct_sales
  WHERE region IS NOT NULL AND state IS NOT NULL
),
scaffold AS (
  SELECT s.region, s.state, m.year, m.month, m.year_month
  FROM states s
  CROSS JOIN months m
),
facts AS (
  SELECT
    fs.region,
    fs.state,
    SUBSTR(fs.date_key,1,4) AS year,
    SUBSTR(fs.date_key,6,2) AS month,
    SUBSTR(fs.date_key,1,4) || '-' || SUBSTR(fs.date_key,6,2) AS year_month,
    SUM(fs.sales)    AS sales,
    SUM(fs.profit)   AS profit,
    SUM(fs.quantity) AS quantity,
    AVG(fs.discount) AS avg_discount
  FROM fct_sales fs
  GROUP BY fs.region, fs.state, SUBSTR(fs.date_key,1,4), SUBSTR(fs.date_key,6,2)
)
SELECT
  sc.region,
  sc.state,
  sc.year,
  sc.month,
  sc.year_month,
  COALESCE(f.sales, 0.0)      AS sales,
  COALESCE(f.profit, 0.0)     AS profit,
  COALESCE(f.quantity, 0)     AS quantity,
  COALESCE(f.avg_discount, 0) AS avg_discount
FROM scaffold sc
LEFT JOIN facts f
  ON f.region = sc.region
 AND f.state  = sc.state
 AND f.year   = sc.year
 AND f.month  = sc.month
;

CREATE INDEX IF NOT EXISTS idx_vsms_region_state_month
  ON viz_state_month_sales(region, state, year_month);
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    conn.executescript(scaffold_sql)

# 2) Quick QA: totals must match fct_sales over full year/month range
years = ('2014','2015','2016','2017')
months = tuple(range(1,13))

q_tot_viz = f"""
SELECT COALESCE(SUM(sales),0.0)
FROM viz_state_month_sales
WHERE year IN ({",".join(f"'{y}'" for y in years)})
  AND CAST(month AS INTEGER) IN ({",".join(str(m) for m in months)});
"""
q_tot_fact = f"""
SELECT COALESCE(SUM(sales),0.0)
FROM fct_sales
WHERE SUBSTR(date_key,1,4) IN ({",".join(f"'{y}'" for y in years)});
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    tot_viz  = conn.execute(q_tot_viz).fetchone()[0]
    tot_fact = conn.execute(q_tot_fact).fetchone()[0]

print("totals (viz_state_month_sales, fct_sales):", (tot_viz, tot_fact))
assert abs((tot_viz or 0) - (tot_fact or 0)) < 1e-6, "Totals mismatch: scaffold not aligned."

# 3) Export CSV for Tableau Public 
out_path = EXPORT_DIR / "viz_state_month_sales.csv"
with sqlite3.connect(DB_PATH.as_posix()) as conn, open(out_path, "w", encoding="utf-8", newline="", buffering=1_048_576) as f:
    w = csv.writer(f)
    cols = [row[1] for row in conn.execute('PRAGMA table_info("viz_state_month_sales");')]
    w.writerow(cols)
    for row in conn.execute(
        'SELECT "region","state","year","month","year_month","sales","profit","quantity","avg_discount" '
        'FROM "viz_state_month_sales";'
    ):
        w.writerow(row)

print("export:", (out_path.relative_to(PROJECT_ROOT)).as_posix())
log_ok("scaffold ready and exported")


totals (viz_state_month_sales, fct_sales): (2297201.07, 2297201.07)
export: data/exports/viz_state_month_sales.csv
OK: scaffold ready and exported


## Recreate top category by region and return viz

In [12]:
# [Step 4.2 ] Recreate viz tables + export CSVs (SQLite-compliant)

viz_sql = """
-- (A) Top-selling category by region
DROP TABLE IF EXISTS viz_top_category_by_region;
CREATE TABLE viz_top_category_by_region AS
WITH cat_region AS (
  SELECT region, category, SUM(sales) AS sales
  FROM fct_sales
  GROUP BY region, category
),
ranked AS (
  SELECT
    region, category, sales,
    ROW_NUMBER() OVER (PARTITION BY region ORDER BY sales DESC) AS rn
  FROM cat_region
)
SELECT region, category, sales
FROM ranked
WHERE rn = 1;

CREATE INDEX IF NOT EXISTS idx_viz2_region ON viz_top_category_by_region(region);

-- (B) Returns by category/sub-category (count + rate)
DROP TABLE IF EXISTS viz_returns_by_cat_subcat;
CREATE TABLE viz_returns_by_cat_subcat AS
WITH base AS (
  SELECT
    UPPER(TRIM(category))     AS category,
    UPPER(TRIM(sub_category)) AS sub_category,
    COUNT(*)                  AS total_orders,
    SUM(CASE WHEN returned = 'YES' THEN 1 ELSE 0 END) AS returned_orders
  FROM fct_sales
  GROUP BY UPPER(TRIM(category)), UPPER(TRIM(sub_category))
)
SELECT
  category,
  sub_category,
  returned_orders,
  total_orders,
  CASE WHEN total_orders = 0 THEN 0.0
       ELSE ROUND(1.0 * returned_orders / total_orders, 4) END AS return_rate
FROM base;

CREATE INDEX IF NOT EXISTS idx_viz3_cat_sub ON viz_returns_by_cat_subcat(category, sub_category);
"""

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    conn.executescript(viz_sql)

# --- QA (concise) ---
with sqlite3.connect(DB_PATH.as_posix()) as conn:
    rows_per_region = conn.execute("""
        SELECT region, COUNT(*) AS cnt
        FROM viz_top_category_by_region
        GROUP BY region
        ORDER BY region
    """).fetchall()
    bad_rr = conn.execute("""
        SELECT COUNT(*) FROM viz_returns_by_cat_subcat
        WHERE return_rate < 0 OR return_rate > 1
    """).fetchone()[0]

print("rows per region (top category):", rows_per_region)
print("return_rate out-of-range rows:", bad_rr)
assert all(cnt == 1 for _, cnt in rows_per_region), "Top-category must be 1 row per region."
assert bad_rr == 0, "Return rate must be within [0,1]."

# --- Export both CSVs (streaming, relative-safe prints) ---
paths = {
    "viz_top_category_by_region": EXPORT_DIR / "viz_top_category_by_region.csv",
    "viz_returns_by_cat_subcat": EXPORT_DIR / "viz_returns_by_cat_subcat.csv",
}

with sqlite3.connect(DB_PATH.as_posix()) as conn:
    for tbl, path in paths.items():
        cols = [row[1] for row in conn.execute(f'PRAGMA table_info("{tbl}");')]
        sel  = f'SELECT {", ".join(f"""\"{c}\"""" for c in cols)} FROM "{tbl}";'
        with open(path, "w", encoding="utf-8", newline="", buffering=1_048_576) as f:
            w = csv.writer(f); w.writerow(cols)
            for row in conn.execute(sel):
                w.writerow(row)
        print("export:", (path.relative_to(PROJECT_ROOT)).as_posix())

log_ok("viz tables ready and exported")



rows per region (top category): [('CENTRAL', 1), ('EAST', 1), ('SOUTH', 1), ('WEST', 1)]
return_rate out-of-range rows: 0
export: data/exports/viz_top_category_by_region.csv
export: data/exports/viz_returns_by_cat_subcat.csv
OK: viz tables ready and exported
