# 02 — Feature Engineering (FE)

**Purpose:** Define a *model-agnostic*, reproducible **feature specification** (`feature_spec_v1.json`) for building
a model-ready feature matrix in `src/features/build_features.py`.

This notebook:
- **Consumes** `data/interim/loans_cleaned.parquet` (output of `src/data/preprocess.py`)
- **Consumes** EDA contracts via `data/artifacts/eda_summary.json` (index) + versioned truth artifacts
- **Exports** `data/artifacts/feature_spec_v1.json`
- Does **not** write engineered parquet outputs (those belong to the build script / pipeline)

> Source notebook content and intent: see `02_fe.md`.


In [1]:
from __future__ import annotations

import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict

import numpy as np
import pandas as pd
from src.config import ARTIFACTS_DIR, INTERIM_DATA_DIR

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)

ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
INTERIM_DATA_DIR.mkdir(parents=True, exist_ok=True)

LOANS_CLEANED_PATH = Path(INTERIM_DATA_DIR) / "loans_cleaned.parquet"
EDA_INDEX_PATH = Path(ARTIFACTS_DIR) / "eda_summary.json"
FEATURE_SPEC_PATH = Path(ARTIFACTS_DIR) / "feature_spec_v1.json"

assert LOANS_CLEANED_PATH.exists(), f"Missing preprocess output: {LOANS_CLEANED_PATH}"
assert EDA_INDEX_PATH.exists(), f"Missing EDA index: {EDA_INDEX_PATH}"


def read_json(path: Path) -> Any:
    with path.open("r", encoding="utf-8") as f:
        return json.load(f)


def write_json(path: Path, obj: Any) -> Path:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, sort_keys=True)
    return path


def md5_bytes(b: bytes) -> str:
    return hashlib.md5(b).hexdigest()

## 1) Inputs & Data Contracts

We load:
- `data/interim/loans_cleaned.parquet`
- `data/artifacts/eda_summary.json` (**index**)
- The versioned truth artifacts referenced by that index (**truth**)

We then validate:
- target column exists and is binary
- referenced artifacts exist
- dataset columns are compatible with the EDA roles artifact (drop lists + families)


In [2]:
# --- load dataset ---
df = pd.read_parquet(LOANS_CLEANED_PATH)
print("Loaded loans_cleaned:", df.shape)
df.head(3)

Loaded loans_cleaned: (1370163, 12)


Unnamed: 0,loan_amnt,annual_inc,int_rate,term,loan_status,dti,grade,sub_grade,emp_length,home_ownership,issue_d,default
0,3600.0,55000.0,13.99,36 months,Fully Paid,5.91,C,C4,10+ years,MORTGAGE,Dec-2015,0
1,24700.0,65000.0,11.99,36 months,Fully Paid,16.06,C,C1,10+ years,MORTGAGE,Dec-2015,0
2,20000.0,63000.0,10.78,60 months,Fully Paid,10.78,B,B4,10+ years,MORTGAGE,Dec-2015,0


In [3]:
# --- load EDA index + truth artifacts ---
eda_index = read_json(EDA_INDEX_PATH)
artifact_paths = eda_index.get("artifacts", {})
if not artifact_paths:
    raise KeyError(
        "eda_summary.json must contain an 'artifacts' mapping (index -> truth artifacts)."
    )

# Resolve paths (repo-relative best-effort)
cwd = Path.cwd()
project_root = cwd if (cwd / "data").exists() else cwd.parent


def resolve_from_index(rel_or_abs: str) -> Path:
    p = Path(rel_or_abs)
    if p.is_absolute():
        return p
    return (project_root / p).resolve()


schema_raw_path = resolve_from_index(artifact_paths["schema_raw_accepted_v1"])
missingness_path = resolve_from_index(artifact_paths["missingness_profile_v1"])
target_def_path = resolve_from_index(artifact_paths["target_definition_v1"])
roles_path = resolve_from_index(artifact_paths["column_roles_v1"])

for p in [schema_raw_path, missingness_path, target_def_path, roles_path]:
    if not p.exists():
        raise FileNotFoundError(f"EDA truth artifact missing: {p}")

schema_raw = read_json(schema_raw_path)
missingness_profile = read_json(missingness_path)  # list[dict]
target_def = read_json(target_def_path)
roles = read_json(roles_path)

print("Loaded truth artifacts:")
print("-", schema_raw_path.name)
print("-", missingness_path.name)
print("-", target_def_path.name)
print("-", roles_path.name)

Loaded truth artifacts:
- schema_raw_accepted_v1.json
- missingness_profile_v1.json
- target_definition_v1.json
- column_roles_v1.json


In [4]:
# --- contract validation: target ---
TARGET = target_def["target_name"]
if TARGET not in df.columns:
    raise KeyError(f"Target '{TARGET}' not found in loans_cleaned.parquet columns.")

y = df[TARGET]
if y.isna().any():
    raise ValueError(
        "Target contains missing values; preprocess should have removed/handled these."
    )

unique_vals = sorted({int(v) for v in y.unique()})
if unique_vals != [0, 1]:
    raise ValueError(f"Target must be binary [0,1]. Found: {unique_vals}")

print("Target:", TARGET, "| positive rate:", float(y.mean()))

Target: default | positive rate: 0.21465110355483252


In [5]:
# --- contract validation: roles ---
drop_cols = set(roles.get("drop_cols_ambiguous_or_leakage", []))
id_cols = set(roles.get("id_cols", []))
time_cols = set(roles.get("time_cols", []))

num_cols = set(roles.get("numeric_cols_raw", []))
cat_cols = set(roles.get("categorical_cols_raw", []))
dt_cols = set(roles.get("datetime_string_cols_raw", []))

# Ensure target isn't mistakenly in families
if TARGET in (num_cols | cat_cols | dt_cols):
    raise ValueError("Target appears in role families; it must not be treated as a feature.")

# Feature families should be disjoint (best-effort; roles are raw heuristics)
overlap_nc = num_cols & cat_cols
overlap_nd = num_cols & dt_cols
overlap_cd = cat_cols & dt_cols
if overlap_nc or overlap_nd or overlap_cd:
    print(
        "NOTE: Role families overlap (EDA heuristics). Downstream scripts should treat roles as candidates, not gospel."
    )
    print("num∩cat:", sorted(list(overlap_nc))[:15])
    print("num∩dt:", sorted(list(overlap_nd))[:15])
    print("cat∩dt:", sorted(list(overlap_cd))[:15])

# Columns available for FE consideration
df_cols = set(df.columns)
feature_candidates = (num_cols | cat_cols | dt_cols | time_cols) & df_cols
feature_candidates -= drop_cols
feature_candidates -= id_cols
feature_candidates -= {TARGET}

print("Candidate feature columns present:", len(feature_candidates))
print("Example:", sorted(list(feature_candidates))[:20])

Candidate feature columns present: 11
Example: ['annual_inc', 'dti', 'emp_length', 'grade', 'home_ownership', 'int_rate', 'issue_d', 'loan_amnt', 'loan_status', 'sub_grade', 'term']


## 2) Build the feature spec (design-only)

This is *not* where we fit encoders or imputers. The spec captures:
- which columns are features (and how to treat them)
- what transformations are planned
- what should be fit on train-only
- deterministic derived feature naming

Downstream build script (`src/features/build_features.py`) will implement these rules.


In [6]:
# Initialize feature spec (contract document)
feature_spec: Dict[str, Any] = {
    "version": "v1",
    "source": {
        "dataset": str(LOANS_CLEANED_PATH),
        "eda_index": str(EDA_INDEX_PATH),
        "eda_truth": {
            "schema_raw_accepted_v1": str(schema_raw_path),
            "missingness_profile_v1": str(missingness_path),
            "target_definition_v1": str(target_def_path),
            "column_roles_v1": str(roles_path),
        },
    },
    "target": {
        "name": TARGET,
        "dtype": str(df[TARGET].dtype),
        "allowed_values": [0, 1],
        "missing_allowed": False,
        "positive_label": 1,
        "positive_rate_at_freeze": float(df[TARGET].mean()),
        "definition_source": "target_definition_v1.json",
    },
    "exclusions": {
        "target": TARGET,
        "drop_cols_ambiguous_or_leakage": sorted(list(drop_cols)),
        "id_cols": sorted(list(id_cols)),
        "notes": "Exclusions are EDA-driven (ambiguity/leakage conservative policy). No heuristic dropping here.",
    },
    "features": {
        "datetime": {},
        "categorical": {},
        "numerical": {},
        "engineered": {},
    },
    "defaults": {
        "categorical": {
            "missing_token": "__MISSING__",
            "rare_token": "__OTHER__",
            "unknown_token": "__OTHER__",
            "rare_threshold": {"min_count": 100, "min_fraction": 0.005},
        },
        "datetime": {
            "errors": "coerce",
            "timezone": None,
            "derive": ["year", "month", "quarter"],
            "drop_raw": True,
        },
        "numerical": {
            "skew_threshold": 1.0,
            "log": {"function": "log1p", "applies_to": "x + shift", "clip_min": 0.0},
            "preserve_original": True,
        },
    },
    "fit_rules": {
        # anything that learns from data must be fit on train only
        "target_mean_encoding": {
            "fit_on": "train_only",
            "cv_folds": 5,
            "smoothing": "auto",
            "handle_unknown": "prior",
            "handle_missing": "prior",
        },
        "imputation": {
            "fit_on": "train_only",
            "numeric": "median",
            "categorical": "most_frequent",
            "add_missing_indicators": True,
        },
    },
    "freeze": {
        "created_at": None,
        "dataset_hash_md5": None,
        "target_hash_md5": None,
        "spec_hash_md5": None,
    },
}

## 3) Datetime features

We derive `year`, `month`, `quarter` from `issue_d` (or other date-like columns present).
We store derived column names in the spec and drop raw datetime by default.


In [7]:
# datetime columns to use:
# Prefer explicit time_cols from EDA roles; fall back to dt_cols candidates
dt_present = [c for c in (set(time_cols) | set(dt_cols)) if c in df.columns and c != TARGET]
dt_present = sorted(dt_present)

derive = feature_spec["defaults"]["datetime"]["derive"]
drop_raw = bool(feature_spec["defaults"]["datetime"]["drop_raw"])
errors = feature_spec["defaults"]["datetime"]["errors"]

rows = []
for col in dt_present:
    parsed = pd.to_datetime(df[col], errors=errors)
    derived_cols = [f"{col}_{part}" for part in derive]
    feature_spec["features"]["datetime"][col] = {
        "derived_features": derived_cols,
        "derive": derive,
        "drop_raw": drop_raw,
        "missing_rate": float(parsed.isna().mean()),
        "min": None if parsed.dropna().empty else str(parsed.min()),
        "max": None if parsed.dropna().empty else str(parsed.max()),
    }
    rows.append({"col": col, "derived": derived_cols, "missing_rate": float(parsed.isna().mean())})

pd.DataFrame(rows)

  parsed = pd.to_datetime(df[col], errors=errors)


Unnamed: 0,col,derived,missing_rate
0,issue_d,"[issue_d_year, issue_d_month, issue_d_quarter]",0.0


## 4) Categorical features

We choose a strategy by cardinality tier:
- ≤ `one_hot_max` → `one_hot`
- ≤ `target_mean_max` → `target_mean`
- else → `count_frequency`

We do *not* persist learned category vocabularies in the spec; those are fit during training/build.


In [8]:
cat_present = [
    c
    for c in cat_cols
    if c in df.columns and c not in drop_cols and c not in id_cols and c != TARGET
]
cat_present = sorted(cat_present)

one_hot_max = 10
target_mean_max = 100

feature_spec["strategy_rules"] = {
    "categorical": {
        "one_hot_max": one_hot_max,
        "target_mean_max": target_mean_max,
        "fallback": "count_frequency",
    }
}


def choose_cat_strategy(k: int) -> str:
    if k <= one_hot_max:
        return "one_hot"
    if k <= target_mean_max:
        return "target_mean"
    return "count_frequency"


rows = []
for col in cat_present:
    s = df[col].astype("string")
    k = int(s.nunique(dropna=False))
    miss = float(s.isna().mean())
    strat = choose_cat_strategy(k)
    feature_spec["features"]["categorical"][col] = {
        "cardinality": k,
        "missing_rate": miss,
        "encoding_strategy": strat,
        "missing_token": feature_spec["defaults"]["categorical"]["missing_token"],
        "rare_token": feature_spec["defaults"]["categorical"]["rare_token"],
        "unknown_token": feature_spec["defaults"]["categorical"]["unknown_token"],
        "rare_threshold": feature_spec["defaults"]["categorical"]["rare_threshold"],
    }
    rows.append({"col": col, "cardinality": k, "missing_rate": miss, "strategy": strat})

pd.DataFrame(rows).sort_values(["strategy", "cardinality"], ascending=[True, False]).head(25)

Unnamed: 0,col,cardinality,missing_rate,strategy
1,grade,7,0.0,one_hot
2,home_ownership,6,0.0,one_hot
3,loan_status,5,0.0,one_hot
5,term,2,0.0,one_hot
4,sub_grade,35,0.0,target_mean
0,emp_length,12,0.05849,target_mean


## 5) Numerical features

We compute lightweight distribution diagnostics and plan log transforms for heavily skewed features.
We do not apply transforms here; we only record the plan.


In [9]:
num_present = [
    c
    for c in num_cols
    if c in df.columns and c not in drop_cols and c not in id_cols and c != TARGET
]
num_present = sorted(num_present)

skew_threshold = float(feature_spec["defaults"]["numerical"]["skew_threshold"])
rows = []
for col in num_present:
    s = pd.to_numeric(df[col], errors="coerce")
    miss = float(s.isna().mean())
    skew = float(s.skew(skipna=True)) if s.notna().sum() else float("nan")
    minv = float(np.nanmin(s.values)) if np.isfinite(np.nanmin(s.values)) else float("nan")
    plan = "none"
    shift = 0.0
    if np.isfinite(skew) and abs(skew) > skew_threshold:
        plan = "log"
        if np.isfinite(minv) and minv <= 0:
            shift = float(abs(minv) + 1e-6)

    feature_spec["features"]["numerical"][col] = {
        "missing_rate": miss,
        "skewness": skew,
        "min_value": minv,
        "planned_transformation": plan,
        "shift_for_transformation": shift,
    }
    rows.append({"col": col, "missing_rate": miss, "skew": skew, "plan": plan})

pd.DataFrame(rows).sort_values(["plan", "skew"], ascending=[True, False]).head(25)

Unnamed: 0,col,missing_rate,skew,plan
0,annual_inc,0.0,46.828059,log
3,loan_amnt,0.0,0.780098,none
2,int_rate,0.0,0.717069,none
1,dti,0.0,0.55718,none


## 6) Engineered features (v1)

Keep engineered features small and defensible. For v1 we include:
- `loan_age_months` from `issue_d` **relative to prediction time** (recorded as a derivation rule)

These are design-only definitions; the build script will implement them.


In [10]:
# Engineered feature: loan_age_months (design-only; requires a reference date at inference)
feature_spec["features"]["engineered"]["loan_age_months"] = {
    "type": "derived_numeric",
    "source": ["issue_d"],
    "derivation": {
        "method": "months_since",
        "reference_date": "prediction_time",
        "rounding": "floor",
        "clip": {"min": 0, "max": None},
    },
    "availability": "inference_time",
    "notes": "Months since issue date relative to prediction-time reference date.",
}

feature_spec["features"]["engineered"].keys()

dict_keys(['loan_age_months'])

## 7) Freeze + export `feature_spec_v1.json`

We freeze:
- created_at (UTC)
- dataset hash (content-based)
- target column hash
- spec hash

Then write to `data/artifacts/feature_spec_v1.json`.


In [11]:
# Freeze metadata
feature_spec["freeze"]["created_at"] = datetime.now(timezone.utc).isoformat()

# dataset hash: hash a stable sample of bytes (fast + deterministic enough for governance)
# We avoid hashing the entire dataset file to keep notebook runtime reasonable.
sample = df.head(2000)  # deterministic: head slice
feature_spec["freeze"]["dataset_hash_md5"] = md5_bytes(sample.to_csv(index=False).encode("utf-8"))

# target hash: depends on full y bytes (still manageable)
feature_spec["freeze"]["target_hash_md5"] = md5_bytes(df[TARGET].to_numpy().tobytes())

# spec hash: hash the spec content (with placeholder None already filled)
spec_for_hash = json.dumps(feature_spec, sort_keys=True).encode("utf-8")
feature_spec["freeze"]["spec_hash_md5"] = md5_bytes(spec_for_hash)

# Write spec
write_json(FEATURE_SPEC_PATH, feature_spec)
print("Wrote:", FEATURE_SPEC_PATH)
print("spec_hash_md5:", feature_spec["freeze"]["spec_hash_md5"])
print("target_hash_md5:", feature_spec["freeze"]["target_hash_md5"])

Wrote: /Users/mcharris/Developer/mc-harris1/credit-risk-pd/data/artifacts/feature_spec_v1.json
spec_hash_md5: 9fc4080ad600ec9cbd6861ba32bf54bf
target_hash_md5: 35a969fa8f19d8df67ba285adeb4ee6a


In [12]:
# Read-back validation
spec_check = read_json(FEATURE_SPEC_PATH)
assert spec_check["freeze"]["spec_hash_md5"] == feature_spec["freeze"]["spec_hash_md5"], (
    "Spec hash mismatch"
)
assert spec_check["target"]["name"] == TARGET, "Target name mismatch"
print("Read-back validation passed.")

Read-back validation passed.
