In [9]:
"""
Feature Engineering Pipeline for Raw Material Delivery Prediction
TDT4173 Course Project - Hydro ASA x Append Consulting

This script builds features for predicting cumulative raw material deliveries.
It generates both submission features and training data with proper time-aware splits.

Key Features:
- Rolling window statistics (7, 14, 28, 56, 365 days)
- Same period last year comparisons
- Purchase order features (respecting temporal data leakage)
- Calendar features (month, day of week)
"""

import pandas as pd
import numpy as np
from pathlib import Path

# ============================================================================
# PATH CONFIGURATION
# ============================================================================
DATA = Path.cwd() / "data"
if not DATA.exists():
    DATA = Path.cwd().parent / "data"

RAW = DATA / "1_raw"
INTERIM = DATA / "2_interim"
PROCESSED = DATA / "3_processed"

# Create directories if they don't exist
INTERIM.mkdir(parents=True, exist_ok=True)
PROCESSED.mkdir(parents=True, exist_ok=True)

# Input files
PREDICTION_MAPPING_CSV = RAW / "prediction_mapping.csv"
MASTER_DATA_CSV = INTERIM / "master_data.csv"
PURCHASE_ORDERS_CSV = RAW / "kernel_purchase_orders.csv"

# Output files
FEATURES_SUBMISSION = PROCESSED / "features_for_submission.csv"
FEATURES_TRAINING = PROCESSED / "training_features_and_labels.csv"


# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def read_mapping(mapping_path: Path) -> pd.DataFrame:
    """
    Read and normalize the prediction mapping file.
    
    Returns DataFrame with columns: ID, rm_id, forecast_start_date, forecast_end_date
    """
    df = pd.read_csv(
        mapping_path,
        parse_dates=["forecast_start_date", "forecast_end_date"]
    )
    df = df[["ID", "rm_id", "forecast_start_date", "forecast_end_date"]].copy()
    df["rm_id"] = pd.to_numeric(df["rm_id"], errors="coerce").astype("int64")
    return df


def read_master(master_path: Path) -> pd.DataFrame:
    """
    Read and preprocess master receival data.
    
    Handles duplicates and normalizes date/weight columns.
    """
    df = pd.read_csv(master_path, low_memory=False)
    
    # Normalize dates and weights
    df["date_arrival"] = pd.to_datetime(
        df["date_arrival"], utc=True, errors="coerce"
    ).dt.tz_convert(None)
    df["net_weight"] = pd.to_numeric(
        df.get("net_weight"), errors="coerce"
    ).fillna(0.0)
    
    # Normalize ID columns
    for col in ["rm_id", "purchase_order_id", "purchase_order_item_no", "receival_item_no"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")
    
    # Remove duplicates based on available keys
    recv_key = ["rm_id", "purchase_order_id", "purchase_order_item_no", "receival_item_no"]
    if not set(recv_key).issubset(df.columns):
        recv_key = ["rm_id", "date_arrival", "net_weight"]
    df = df.drop_duplicates(subset=recv_key, keep="first").copy()
    
    # Create normalized date column (day-level)
    df["rm_id"] = pd.to_numeric(df["rm_id"], errors="coerce").astype("int64")
    df["date"] = pd.to_datetime(df["date_arrival"].dt.date)
    
    return df


def complete_calendar(daily_rm: pd.DataFrame) -> pd.DataFrame:
    """
    Fill missing dates with zero weight for a single rm_id group.
    
    This ensures we have data for every day in the range, enabling exact date joins.
    """
    full = pd.DataFrame({
        "date": pd.date_range(
            daily_rm["date"].min(),
            daily_rm["date"].max(),
            freq="D"
        )
    })
    full["rm_id"] = daily_rm.name
    out = full.merge(daily_rm, on=["rm_id", "date"], how="left")
    out["net_weight"] = out["net_weight"].fillna(0.0)
    return out


def build_daily(master_df: pd.DataFrame) -> pd.DataFrame:
    """
    Build daily aggregated features for each raw material.
    
    Features include:
    - Cumulative weight
    - Rolling window statistics (7, 14, 28, 56, 365 days)
    - Days since last delivery
    - Mean kg per delivery day
    """
    master_df = master_df.copy()
    
    # Aggregate to daily level
    daily = (
        master_df.groupby(["rm_id", "date"], as_index=False)["net_weight"]
        .sum()
        .sort_values(["rm_id", "date"])
    )
    
    # Fill complete calendar for each rm_id
    daily = (
        daily.groupby("rm_id", group_keys=False)
        .apply(complete_calendar)
        .sort_values(["rm_id", "date"])
        .reset_index(drop=True)
    )
    
    # Cumulative weight
    daily["cum_kg"] = daily.groupby("rm_id")["net_weight"].cumsum()
    
    # Delivery flag (1 if delivery, 0 otherwise)
    daily["deliveries_flag"] = (daily["net_weight"] > 0).astype(int)
    
    # Rolling window features
    for window in [7, 14, 28, 56, 365]:
        daily[f"r{window}_kg"] = (
            daily.groupby("rm_id")["net_weight"]
            .rolling(window, min_periods=1)
            .sum()
            .reset_index(level=0, drop=True)
        )
        daily[f"r{window}_days"] = (
            daily.groupby("rm_id")["deliveries_flag"]
            .rolling(window, min_periods=1)
            .sum()
            .reset_index(level=0, drop=True)
        )
    
    # Mean kg per delivery day (avoid division by zero)
    def safe_div(a, b):
        b_safe = b.replace(0, np.nan)
        return (a / b_safe).fillna(0.0)
    
    daily["r28_mean_kg_per_day"] = safe_div(daily["r28_kg"], daily["r28_days"])
    daily["r56_mean_kg_per_day"] = safe_div(daily["r56_kg"], daily["r56_days"])
    
    # Days since last delivery
    tmp = daily[["rm_id", "date", "net_weight"]].copy()
    tmp["last_deliv_date"] = tmp["date"].where(tmp["net_weight"] > 0)
    tmp["last_deliv_date"] = tmp.groupby("rm_id")["last_deliv_date"].ffill()
    daily["days_since_last"] = (
        (daily["date"] - tmp["last_deliv_date"])
        .dt.days.fillna(10_000)
        .astype(int)
    )
    
    return daily


def read_orders_with_rm(
    orders_path: Path | None,
    master_df: pd.DataFrame
) -> pd.DataFrame | None:
    """
    Read purchase orders and join with rm_id from master data.
    
    Returns None if file doesn't exist.
    """
    if orders_path is None or not orders_path.exists():
        return None
    
    po = pd.read_csv(
        orders_path,
        low_memory=False,
        parse_dates=["delivery_date", "created_date_time", "modified_date_time"]
    )
    
    # Normalize ID columns
    for col in ["purchase_order_id", "purchase_order_item_no"]:
        if col in po.columns:
            po[col] = pd.to_numeric(po[col], errors="coerce").astype("Int64")
    
    po["quantity"] = pd.to_numeric(po.get("quantity"), errors="coerce").fillna(0.0)
    
    # Join with master to get rm_id
    rm_lookup = (
        master_df[["rm_id", "purchase_order_id", "purchase_order_item_no"]]
        .dropna()
        .drop_duplicates()
    )
    po = po.merge(rm_lookup, on=["purchase_order_id", "purchase_order_item_no"], how="left")
    
    # Normalize dates
    if "delivery_date" in po.columns:
        po["delivery_date"] = pd.to_datetime(
            po["delivery_date"], utc=True, errors="coerce"
        ).dt.tz_convert(None)
    if "created_date_time" in po.columns:
        po["created_date_time"] = pd.to_datetime(
            po["created_date_time"], utc=True, errors="coerce"
        ).dt.tz_convert(None)
    if "modified_date_time" in po.columns:
        po["modified_date_time"] = pd.to_datetime(
            po["modified_date_time"], utc=True, errors="coerce"
        ).dt.tz_convert(None)
    
    # Keep relevant columns
    keep_cols = [
        c for c in [
            "rm_id", "quantity", "delivery_date",
            "created_date_time", "modified_date_time",
            "purchase_order_id", "purchase_order_item_no"
        ]
        if c in po.columns
    ]
    po = po[keep_cols].dropna(subset=["rm_id", "delivery_date"]).copy()
    po["rm_id"] = pd.to_numeric(po["rm_id"], errors="coerce").astype("int64")
    
    return po


def same_period_last_year(
    df_map: pd.DataFrame,
    csum: pd.DataFrame
) -> pd.DataFrame:
    """
    Calculate total weight delivered in the same period last year.
    
    Uses groupby + manual filtering approach to avoid merge_asof sorting issues.
    For each forecast window, looks back 365-366 days and calculates the difference
    in cumulative weight.
    """
    df_map = df_map.copy()
    df_map["rm_id"] = pd.to_numeric(df_map["rm_id"], errors="coerce").astype("int64")
    
    # Prepare cumulative data
    csum_clean = csum[["rm_id", "date", "cum_kg"]].copy()
    csum_clean["rm_id"] = pd.to_numeric(
        csum_clean["rm_id"], errors="coerce"
    ).astype("int64")
    csum_clean["date"] = pd.to_datetime(csum_clean["date"], errors="coerce")
    csum_clean = csum_clean.dropna(subset=["rm_id", "date"])
    csum_clean = csum_clean.sort_values(["rm_id", "date"]).reset_index(drop=True)
    
    # Calculate dates from last year
    df_map["prev_start"] = df_map["forecast_start_date"] - pd.Timedelta(days=366)
    df_map["prev_end"] = df_map["forecast_end_date"] - pd.Timedelta(days=365)
    
    results = []
    
    # Process each rm_id separately
    for rm_id, group in df_map.groupby("rm_id"):
        rm_csum = csum_clean[csum_clean["rm_id"] == rm_id].copy()
        
        if rm_csum.empty:
            # No historical data for this rm_id
            for _, row in group.iterrows():
                results.append({
                    "ID": row["ID"],
                    "same_period_last_year_kg": 0.0
                })
            continue
        
        # For each forecast window
        for _, row in group.iterrows():
            prev_start = row["prev_start"]
            prev_end = row["prev_end"]
            
            # Find nearest date <= prev_start
            before_start = rm_csum[rm_csum["date"] <= prev_start]
            cum_a = before_start["cum_kg"].iloc[-1] if not before_start.empty else 0.0
            
            # Find nearest date <= prev_end
            before_end = rm_csum[rm_csum["date"] <= prev_end]
            cum_b = before_end["cum_kg"].iloc[-1] if not before_end.empty else 0.0
            
            # Calculate difference
            diff = max(0.0, cum_b - cum_a)
            results.append({
                "ID": row["ID"],
                "same_period_last_year_kg": diff
            })
    
    return pd.DataFrame(results)


def add_orders_features(
    df_map: pd.DataFrame,
    orders: pd.DataFrame | None
) -> pd.DataFrame:
    """
    Add purchase order features while respecting temporal data leakage.
    
    Only includes orders that were:
    1. Created before the cutoff date
    2. Not modified after the cutoff date (or never modified)
    3. Expected to deliver within the forecast window
    """
    if orders is None or orders.empty:
        df_map["orders_qty_in_window"] = 0.0
        df_map["orders_lines_in_window"] = 0
        return df_map
    
    # Join orders with mapping
    ordr = df_map[[
        "ID", "rm_id", "forecast_start_date", "forecast_end_date", "cutoff_date"
    ]].merge(orders, on="rm_id", how="left")
    
    # Filter: only orders known before cutoff
    known_mask = True
    if "created_date_time" in ordr.columns:
        known_mask = ordr["created_date_time"] <= ordr["cutoff_date"]
    if "modified_date_time" in ordr.columns:
        known_mask &= (
            ordr["modified_date_time"].isna() |
            (ordr["modified_date_time"] <= ordr["cutoff_date"])
        )
    ordr = ordr[known_mask]
    
    # Filter: delivery expected in forecast window
    in_window = (
        (ordr["delivery_date"] >= ordr["forecast_start_date"]) &
        (ordr["delivery_date"] <= ordr["forecast_end_date"])
    )
    
    # Aggregate by ID
    if {"purchase_order_id", "purchase_order_item_no"}.issubset(ordr.columns):
        grp = ordr[in_window].groupby("ID", as_index=False).agg(
            orders_qty_in_window=("quantity", "sum"),
            orders_lines_in_window=("purchase_order_id", "nunique")
        )
    else:
        grp = ordr[in_window].groupby("ID", as_index=False).agg(
            orders_qty_in_window=("quantity", "sum"),
            orders_lines_in_window=("quantity", "size")
        )
    
    # Merge back
    out = df_map.merge(grp, on="ID", how="left")
    out["orders_qty_in_window"] = out["orders_qty_in_window"].fillna(0.0)
    out["orders_lines_in_window"] = out["orders_lines_in_window"].fillna(0).astype(int)
    
    return out


def make_features_from_mapping(
    daily: pd.DataFrame,
    df_map_in: pd.DataFrame,
    orders: pd.DataFrame | None
) -> pd.DataFrame:
    """
    Build feature matrix from prediction mapping.
    
    Features include:
    - Historical rolling statistics (as of cutoff date)
    - Same period last year comparison
    - Calendar features
    - Purchase order features (no data leakage)
    """
    df_map = df_map_in.copy()
    df_map["rm_id"] = pd.to_numeric(df_map["rm_id"], errors="coerce").astype("int64")
    
    # Cutoff date is the day before forecast starts
    df_map["cutoff_date"] = df_map["forecast_start_date"] - pd.Timedelta(days=1)
    df_map["window_days"] = (
        df_map["forecast_end_date"] - df_map["forecast_start_date"]
    ).dt.days + 1
    
    # Prepare historical features (rename date -> cutoff_date for join)
    feat_date = daily.rename(columns={"date": "cutoff_date"}).copy()
    feat_date["rm_id"] = pd.to_numeric(
        feat_date["rm_id"], errors="coerce"
    ).astype("int64")
    feat_date = feat_date.sort_values(["rm_id", "cutoff_date"])
    
    hist_cols = [
        "rm_id", "cutoff_date", "cum_kg",
        "r7_kg", "r14_kg", "r28_kg", "r56_kg", "r365_kg",
        "r7_days", "r14_days", "r28_days", "r56_days", "r365_days",
        "r28_mean_kg_per_day", "r56_mean_kg_per_day", "days_since_last"
    ]
    
    # Join historical features
    X = df_map.merge(feat_date[hist_cols], on=["rm_id", "cutoff_date"], how="left")
    
    # Add same period last year
    csum = daily[["rm_id", "date", "cum_kg"]].copy()
    X = X.merge(same_period_last_year(df_map, csum), on="ID", how="left")
    
    # Calendar features
    X["start_month"] = X["forecast_start_date"].dt.month
    X["start_dow"] = X["forecast_start_date"].dt.dayofweek
    X["end_month"] = X["forecast_end_date"].dt.month
    
    # Purchase order features
    X = add_orders_features(X, orders)
    
    # Fill missing values
    numeric_cols = [
        "cum_kg", "r7_kg", "r14_kg", "r28_kg", "r56_kg", "r365_kg",
        "r7_days", "r14_days", "r28_days", "r56_days", "r365_days",
        "r28_mean_kg_per_day", "r56_mean_kg_per_day", "days_since_last",
        "same_period_last_year_kg", "orders_qty_in_window"
    ]
    for col in numeric_cols:
        if col in X.columns:
            X[col] = X[col].fillna(0.0)
    
    return X


def build_training_windows_template(
    daily: pd.DataFrame,
    mapping_like: pd.DataFrame,
    step_days: int = 7,
    min_history_days: int = 400
) -> pd.DataFrame:
    """
    Generate synthetic training windows from historical data.
    
    Creates multiple forecast windows for each rm_id by sliding through time.
    Ensures sufficient history before each window starts.
    
    Args:
        daily: Daily aggregated data
        mapping_like: Example mapping to extract window lengths
        step_days: Days between consecutive training windows
        min_history_days: Minimum days of history required before first window
    """
    # Extract unique window lengths from mapping
    win_lengths = sorted(
        (mapping_like["forecast_end_date"] - mapping_like["forecast_start_date"])
        .dt.days.add(1)
        .unique()
    )
    
    pieces = []
    
    for rm, g in daily.groupby("rm_id"):
        first_date = g["date"].min()
        last_date = g["date"].max()
        
        for wlen in win_lengths:
            # First window starts after min_history_days
            start = first_date + pd.Timedelta(days=min_history_days)
            # Last window must fit completely in data
            end_latest = last_date - pd.Timedelta(days=wlen)
            
            if start > end_latest:
                continue
            
            # Generate sliding windows
            starts = pd.date_range(start, end_latest, freq=f"{step_days}D")
            df = pd.DataFrame({
                "rm_id": rm,
                "forecast_start_date": starts,
                "forecast_end_date": starts + pd.Timedelta(days=wlen - 1)
            })
            pieces.append(df)
    
    if not pieces:
        return pd.DataFrame(
            columns=["ID", "rm_id", "forecast_start_date", "forecast_end_date"]
        )
    
    # Combine and assign IDs
    tmpl = pd.concat(pieces, ignore_index=True)
    tmpl = tmpl.sort_values(["rm_id", "forecast_start_date"]).reset_index(drop=True)
    tmpl["ID"] = np.arange(1, len(tmpl) + 1)
    
    return tmpl[["ID", "rm_id", "forecast_start_date", "forecast_end_date"]]


def label_from_daily(
    daily: pd.DataFrame,
    df_map: pd.DataFrame
) -> pd.DataFrame:
    """
    Calculate actual delivery totals for each forecast window (labels).
    
    Uses groupby approach to avoid merge_asof sorting issues.
    """
    # Prepare daily data with cumulative sums
    g = daily[["rm_id", "date", "net_weight"]].copy()
    g["rm_id"] = pd.to_numeric(g["rm_id"], errors="coerce").astype("int64")
    g["date"] = pd.to_datetime(g["date"], utc=False, errors="coerce")
    g = g.dropna(subset=["rm_id", "date"])
    g = g.sort_values(["rm_id", "date"]).reset_index(drop=True)
    g["cum"] = g.groupby("rm_id")["net_weight"].cumsum()
    
    results = []
    
    # Process each rm_id separately to avoid sorting issues
    for rm_id, group in df_map.groupby("rm_id"):
        rm_daily = g[g["rm_id"] == rm_id].copy()
        
        if rm_daily.empty:
            # No data for this rm_id
            for _, row in group.iterrows():
                results.append({
                    "ID": row["ID"],
                    "y_window_kg": 0.0
                })
            continue
        
        # For each forecast window
        for _, row in group.iterrows():
            start_date = row["forecast_start_date"]
            end_date = row["forecast_end_date"]
            
            # Find cumulative sum at start (day before)
            before_start = rm_daily[rm_daily["date"] < start_date]
            cum_a = before_start["cum"].iloc[-1] if not before_start.empty else 0.0
            
            # Find cumulative sum at end
            before_end = rm_daily[rm_daily["date"] <= end_date]
            cum_b = before_end["cum"].iloc[-1] if not before_end.empty else 0.0
            
            # Label = difference
            y_kg = max(0.0, cum_b - cum_a)
            results.append({
                "ID": row["ID"],
                "y_window_kg": y_kg
            })
    
    return pd.DataFrame(results)


# ============================================================================
# MAIN EXECUTION
# ============================================================================

if __name__ == "__main__":
    print("=" * 70)
    print("FEATURE ENGINEERING PIPELINE")
    print("TDT4173 Course Project - Hydro ASA x Append Consulting")
    print("=" * 70)
    
    # Read input data
    print("\n[1/5] Reading input data...")
    df_map_raw = read_mapping(PREDICTION_MAPPING_CSV)
    df_master = read_master(MASTER_DATA_CSV)
    
    print(f"  Prediction mapping: {len(df_map_raw)} rows")
    print(f"  Master receival data: {len(df_master)} rows")
    print(f"  Unique raw materials: {df_master['rm_id'].nunique()}")
    
    # Build daily features
    print("\n[2/5] Building daily aggregated features...")
    daily = build_daily(df_master)
    print(f"  Daily features: {len(daily)} rows")
    print(f"  Date range: {daily['date'].min()} to {daily['date'].max()}")
    
    # Read purchase orders
    print("\n[3/5] Reading purchase orders...")
    orders = read_orders_with_rm(PURCHASE_ORDERS_CSV, df_master)
    if orders is not None:
        print(f" Purchase orders: {len(orders)} rows")
    else:
        print(" No purchase order data found")
    
    # Generate submission features
    print("\n[4/5] Generating submission features...")
    X_submit = make_features_from_mapping(daily, df_map_raw, orders)
    
    out_cols = [
        "ID", "rm_id", "forecast_start_date", "forecast_end_date",
        "cutoff_date", "window_days",
        "cum_kg", "r7_kg", "r14_kg", "r28_kg", "r56_kg", "r365_kg",
        "r7_days", "r14_days", "r28_days", "r56_days", "r365_days",
        "r28_mean_kg_per_day", "r56_mean_kg_per_day", "days_since_last",
        "same_period_last_year_kg",
        "orders_qty_in_window", "orders_lines_in_window",
        "start_month", "start_dow", "end_month"
    ]
    
    X_submit[out_cols].to_csv(FEATURES_SUBMISSION, index=False)
    print(f"  ✓ Wrote {len(X_submit)} rows -> {FEATURES_SUBMISSION}")
    
    # Generate training features
    print("\n[5/5] Generating training features and labels...")
    try:
        train_template = build_training_windows_template(
            daily, df_map_raw,
            step_days=28,
            min_history_days=500
        )
        train_template = train_template.sample(n=100000, random_state=42)
        print(f" Training windows: {len(train_template)} rows")
        
        X_train = make_features_from_mapping(daily, train_template, orders)
        y_train = label_from_daily(daily, train_template)
        
        train = X_train.merge(y_train, on="ID", how="left")
        train[out_cols + ["y_window_kg"]].to_csv(FEATURES_TRAINING, index=False)
        print(f" Wrote {len(train)} rows -> {FEATURES_TRAINING}")
        
    except Exception as e:
        print(f" Training set build failed: {e}")
        print("    (This is OK if you only need submission features)")
    
    print("\n" + "=" * 70)
    print("FEATURE ENGINEERING COMPLETE")
    print("=" * 70)

FEATURE ENGINEERING PIPELINE
TDT4173 Course Project - Hydro ASA x Append Consulting

[1/5] Reading input data...
  Prediction mapping: 30450 rows
  Master receival data: 40943 rows
  Unique raw materials: 203

[2/5] Building daily aggregated features...


  .apply(complete_calendar)


  Daily features: 223606 rows
  Date range: 2004-06-15 00:00:00 to 2024-12-19 00:00:00

[3/5] Reading purchase orders...
 Purchase orders: 27425 rows

[4/5] Generating submission features...
  ✓ Wrote 30450 rows -> c:\Users\timjo\ML\tdt4173-course-project\data\3_processed\features_for_submission.csv

[5/5] Generating training features and labels...
 Training windows: 100000 rows
 Wrote 100000 rows -> c:\Users\timjo\ML\tdt4173-course-project\data\3_processed\training_features_and_labels.csv

FEATURE ENGINEERING COMPLETE
