In [0]:
import sys
import logging
from pathlib import Path
from datetime import datetime

def find_repo_root(start: Path) -> Path:
    cur = start.resolve()
    for _ in range(25):
        if (cur / "logs").exists() and (cur / "data").exists():
            return cur
        if (cur / "RUN.md").exists():
            return cur
        if cur.parent == cur:
            break
        cur = cur.parent
    raise FileNotFoundError("Repo root not found. Make sure the notebook is created inside the repo.")

REPO_ROOT = find_repo_root(Path.cwd())
DATA_DIR = REPO_ROOT / "data"
LOGS_DIR = REPO_ROOT / "logs"
DATA_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

run_ts = datetime.now().strftime("%Y%m%d_%H%M")
log_file = LOGS_DIR / f"run_{run_ts}.log"

logger = logging.getLogger("lab_2_4")
logger.setLevel(logging.INFO)
logger.handlers.clear()
logger.propagate = False

fmt = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")

ch = logging.StreamHandler(stream=sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(fmt)

fh = logging.FileHandler(log_file, mode="w", encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(fmt)

logger.addHandler(ch)
logger.addHandler(fh)

logger.info("=== START RUN ===")
logger.info(f"Repo root: {REPO_ROOT}")
logger.info(f"Data dir : {DATA_DIR}")
logger.info(f"Logs dir : {LOGS_DIR}")
logger.info(f"Log file : {log_file}")
logger.info(f"Python   : {sys.version.split()[0]}")




import os, random
import numpy as np

os.environ["PYTHONHASHSEED"] = "0"
random.seed(0)
np.random.seed(0)

logger.info("Reproducibility seeds set: PYTHONHASHSEED=0, random=0, numpy=0")




import json, glob, hashlib
from pathlib import Path

def sha256_file(path: Path, chunk_size: int = 1024 * 1024) -> str:
    h = hashlib.sha256()
    with path.open("rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            h.update(chunk)
    return h.hexdigest()

csvs = sorted([Path(p) for p in glob.glob(str(DATA_DIR / "*.csv"))])
if len(csvs) < 2:
    raise FileNotFoundError(f"Expected 2 CSVs in {DATA_DIR}, found {len(csvs)}. Add them and rerun.")

hashes = {}
for p in csvs:
    digest = sha256_file(p)
    hashes[p.name] = {"sha256": digest, "bytes": p.stat().st_size}
    logger.info(f"Input hash | {p.name} | sha256={digest} | bytes={p.stat().st_size}")

hash_path = REPO_ROOT / "data_hashes.json"
hash_path.write_text(json.dumps(hashes, indent=2), encoding="utf-8")
logger.info(f"Wrote hashes to: {hash_path}")




import pandas as pd

# If you have exact filenames, set them here. Otherwise it uses the first two CSVs found.
menu_path = DATA_DIR / "menu_items.csv"
orders_path = DATA_DIR / "order_details.csv"

if not menu_path.exists() or not orders_path.exists():
    logger.info("Canonical filenames not found; using first two CSVs found in data/.")
    menu_path, orders_path = csvs[:2]

logger.info(f"Loading menu file  : {menu_path.name}")
logger.info(f"Loading orders file: {orders_path.name}")

menu_items = pd.read_csv(menu_path)
order_details = pd.read_csv(orders_path)

logger.info(f"Loaded menu_items    shape={menu_items.shape}")
logger.info(f"Loaded order_details shape={order_details.shape}")
logger.info(f"menu_items columns    = {list(menu_items.columns)}")
logger.info(f"order_details columns = {list(order_details.columns)}")




import numpy as np

def trim_strings(df: pd.DataFrame) -> pd.DataFrame:
    obj_cols = df.select_dtypes(include=["object"]).columns
    for c in obj_cols:
        df[c] = df[c].astype(str).str.strip()
    return df

menu_items = trim_strings(menu_items)
order_details = trim_strings(order_details)

# Coerce types
if "price" in menu_items.columns:
    menu_items["price"] = pd.to_numeric(menu_items["price"], errors="coerce")

if "menu_item_id" in menu_items.columns:
    menu_items["menu_item_id"] = pd.to_numeric(menu_items["menu_item_id"], errors="coerce").astype("Int64")

for c in ["order_id", "item_id"]:
    if c in order_details.columns:
        order_details[c] = pd.to_numeric(order_details[c], errors="coerce").astype("Int64")

if "quantity" in order_details.columns:
    order_details["quantity"] = pd.to_numeric(order_details["quantity"], errors="coerce").fillna(0).astype(int)
else:
    order_details["quantity"] = 1  # fallback if quantity isn't provided

# Datetime parsing (your dataset: "1/1/23 11:38:36 AM")
if "order_date" in order_details.columns and "order_time" in order_details.columns:
    dt_str = (
        order_details["order_date"].astype(str).str.strip()
        + " "
        + order_details["order_time"].astype(str).str.strip()
    )

    order_details["_order_dt"] = pd.to_datetime(
        dt_str,
        format="%m/%d/%y %I:%M:%S %p",
        errors="coerce"
    )

    logger.info(
        f"Parsed datetime from order_date+order_time -> _order_dt "
        f"(non-null: {order_details['_order_dt'].notna().sum()}, null: {order_details['_order_dt'].isna().sum()})"
    )
else:
    logger.info("order_date/order_time not both present; falling back to date-only parsing.")
    order_details["_order_dt"] = pd.to_datetime(order_details["order_date"], format="%m/%d/%y", errors="coerce")



# Join
if "menu_item_id" not in menu_items.columns or "item_id" not in order_details.columns:
    raise KeyError("Join keys missing. Need menu_items.menu_item_id and order_details.item_id (or adjust code to match your columns).")

tidy = order_details.merge(
    menu_items,
    left_on="item_id",
    right_on="menu_item_id",
    how="left",
    suffixes=("_order", "_menu")
)

# Keep a tidy subset if present
cols_wanted = ["order_id", "_order_dt", "item_id", "menu_item_id", "item_name", "category", "price", "quantity"]
tidy = tidy[[c for c in cols_wanted if c in tidy.columns]].copy()

# Revenue
if "price" in tidy.columns:
    tidy["revenue"] = tidy["price"].fillna(0) * tidy["quantity"].fillna(0)
else:
    tidy["revenue"] = 0.0

logger.info(f"Tidy table shape={tidy.shape}")

# Metrics
top5_items = (tidy.groupby("item_name")["quantity"].sum().sort_values(ascending=False).head(5).reset_index()
              .rename(columns={"quantity": "total_quantity"})) if "item_name" in tidy.columns else pd.DataFrame()

revenue_by_category = (tidy.groupby("category")["revenue"].sum().sort_values(ascending=False).reset_index()
                       .rename(columns={"revenue": "total_revenue"})) if "category" in tidy.columns else pd.DataFrame()

busiest_hour = pd.DataFrame()
if tidy["_order_dt"].notna().any():
    busiest_hour = (tidy.dropna(subset=["_order_dt"])
                    .assign(hour=lambda d: d["_order_dt"].dt.hour)
                    .groupby("hour")["order_id"].count()
                    .sort_values(ascending=False).head(1).reset_index()
                    .rename(columns={"order_id": "order_row_count"}))

logger.info(f"Top5 items rows         : {len(top5_items)}")
logger.info(f"Revenue by category rows : {len(revenue_by_category)}")
logger.info(f"Busiest hour rows        : {len(busiest_hour)}")

print(top5_items.head().to_string(index=False))
print(revenue_by_category.head().to_string(index=False))

if busiest_hour is not None and len(busiest_hour) > 0:
    print(busiest_hour.to_string(index=False))
else:
    logger.info("busiest_hour is empty (no valid _order_dt values parsed).")





from pathlib import Path
from datetime import datetime

OUT_DIR = REPO_ROOT / "etl_output"
OUT_DIR.mkdir(parents=True, exist_ok=True)

out_ts = datetime.now().strftime("%Y%m%d_%H%M")
out_path = OUT_DIR / f"metrics_{out_ts}.csv"



rows = []
for _, r in top5_items.iterrows():
    rows.append({"metric": "top_item_quantity", "key": str(r["item_name"]), "value": float(r["total_quantity"])})

for _, r in revenue_by_category.iterrows():
    rows.append({"metric": "revenue_by_category", "key": str(r["category"]), "value": float(r["total_revenue"])})

if len(busiest_hour) > 0:
    r = busiest_hour.iloc[0]
    rows.append({"metric": "busiest_hour", "key": str(int(r["hour"])), "value": float(r["order_row_count"])})

metrics_df = pd.DataFrame(rows)
metrics_df["metric"] = metrics_df["metric"].astype("string")
metrics_df["key"] = metrics_df["key"].astype("string")
metrics_df["value"] = pd.to_numeric(metrics_df["value"], errors="coerce").astype(float)

OUT_DIR = REPO_ROOT / "etl_output"
OUT_DIR.mkdir(parents=True, exist_ok=True)

out_ts = datetime.now().strftime("%Y%m%d_%H%M")
out_path = OUT_DIR / f"metrics_{out_ts}.csv"
metrics_df.to_csv(out_path, index=False)

logger.info(f"Saved metrics CSV to: {out_path}")
print(metrics_df.to_string(index=False))


required_cols = {"order_id", "item_id", "quantity", "revenue"}
missing = required_cols - set(tidy.columns)
assert not missing, f"Missing required columns in tidy: {missing}"

assert len(tidy) > 0, "Tidy table is empty — check inputs/join."
assert len(metrics_df) > 0, "Metrics output is empty — check calculations."

logger.info("All asserts passed.")
logger.info("=== END RUN ===")
