---
title: "Advanced Quality Analytics"
format: html
---

# ✅ Advanced Quality Analytics & Predictive Defect Prevention
## Portfolio Project 7 — Causal Inference, Survival Analysis, XGBoost-Style Boosting, Shapley Root-Cause, and Statistical Process Capability Forecasting

---

### What This Notebook Covers (Beyond Basics)
| Topic | Technique |
|---|---|
| Causal inference | Propensity-score matching + treatment-effect estimation |
| Survival analysis | Kaplan-Meier curves for time-to-defect |
| Advanced boosting | Hand-rolled histogram gradient boosting (XGBoost-style) |
| Shapley root-cause | Additive feature attribution for defect probability |
| Capability forecasting | Predicting future Cpk drift using time-series regression |
| Cost-benefit analysis | Expected cost of defect vs inspection — optimal policy |

### Dataset  
**Simulated injection-moulding quality log** (10 process vars, 5 quality dims, binary defect)  
Structure mirrors Kaggle Steel Defect / UCI Concrete datasets  

---


In [None]:
# ─── 1. Imports ─────────────────────────────────────────────
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import seaborn as sns
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import (roc_curve, auc, classification_report,
                             confusion_matrix, precision_recall_curve)
import warnings
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-whitegrid')
print('✓ All imports loaded.')

In [None]:
# ─── 2. Synthetic injection-moulding quality data ─────────
def gen_quality_data(n=8000, seed=2025):
    rng = np.random.default_rng(seed)

    # ─── Process inputs (10 variables) ───
    mold_temp = rng.normal(220, 8, n)
    inject_pres = rng.normal(150, 10, n)
    hold_time = rng.normal(3.0, 0.3, n)
    cool_time = rng.normal(12.0, 1.5, n)
    screw_rpm = rng.normal(200, 15, n)
    material_mfr = rng.normal(180, 5, n)   # material melt flow rate
    humidity = rng.uniform(20, 70, n)
    operator_id = rng.integers(0, 8, n)    # 8 operators
    shift = rng.choice([0, 1, 2], n)   # 0=day, 1=eve, 2=night
    machine_age = rng.uniform(0, 10, n)    # years

    # ─── Quality measurements (5 dimensions) ───
    wall_thick = 2.0 + 0.003*mold_temp - 0.002 * \
        inject_pres + rng.normal(0, 0.04, n)
    surface_fin = 1.5 - 0.005*screw_rpm + 0.01 * \
        material_mfr + rng.normal(0, 0.08, n)
    tensile = 45 + 0.2*material_mfr - 0.1*humidity + rng.normal(0, 2.0, n)
    shrinkage = 0.8 + 0.002*mold_temp - 0.001 * \
        cool_time + rng.normal(0, 0.05, n)
    flash_score = 0.1 + 0.003*inject_pres - \
        0.005*hold_time + rng.normal(0, 0.03, n)

    # ─── Defect probability (logistic) ───
    logit = (
        -4.0
        + 0.04 * (mold_temp - 220)
        + 0.03 * (inject_pres - 150)
        - 0.08 * hold_time
        - 0.06 * cool_time
        + 0.02 * humidity
        + 0.15 * machine_age
        + 0.3 * (shift == 2).astype(float)   # night shift effect
        + 0.25 * np.abs(wall_thick - 2.0) * 20
        + 0.2 * flash_score * 10
    )
    p_defect = 1 / (1 + np.exp(-logit))
    defect = (rng.uniform(0, 1, n) < p_defect).astype(int)

    # Time to first defect (for survival analysis) — in minutes since start
    # Each sample is ~2 min apart; defect time = cumulative sum of inter-arrival
    inter_arrival = rng.exponential(40, n)
    time_to_event = np.cumsum(inter_arrival)   # monotonically increasing

    df = pd.DataFrame({
        'Mold_Temp': mold_temp.round(1), 'Inject_Pres': inject_pres.round(1),
        'Hold_Time': hold_time.round(2), 'Cool_Time': cool_time.round(2),
        'Screw_RPM': screw_rpm.round(1), 'Material_MFR': material_mfr.round(1),
        'Humidity': humidity.round(1), 'Operator_ID': operator_id,
        'Shift': shift, 'Machine_Age': machine_age.round(2),
        'Wall_Thickness': wall_thick.round(4), 'Surface_Finish': surface_fin.round(4),
        'Tensile_Strength': tensile.round(2), 'Shrinkage': shrinkage.round(4),
        'Flash_Score': flash_score.round(4),
        'Defect': defect,
        'Time_min': time_to_event.round(1)
    })
    return df


df = gen_quality_data()
PROCESS_VARS = ['Mold_Temp', 'Inject_Pres', 'Hold_Time', 'Cool_Time', 'Screw_RPM',
                'Material_MFR', 'Humidity', 'Operator_ID', 'Shift', 'Machine_Age']
QUALITY_VARS = ['Wall_Thickness', 'Surface_Finish',
                'Tensile_Strength', 'Shrinkage', 'Flash_Score']
ALL_FEATS = PROCESS_VARS + QUALITY_VARS

print(f'Shape: {df.shape}  |  Defect rate: {df["Defect"].mean()*100:.1f}%')
df.describe().round(2)

In [None]:
# 1. Survival Analysis — Kaplan-Meier Time-to-Defect

# Kaplan-Meier estimates the **survival function** S(t) = P(no defect up to time t) without assuming any parametric distribution. We stratify by key risk factors.

# ─── 3. Kaplan-Meier estimator (manual) ──────────────────
def kaplan_meier(times, events):
    """
    Compute KM survival curve.
    times:  array of event/censoring times
    events: 1 = event (defect), 0 = censored
    Returns (t_km, S_km) — sorted unique event times and survival probs.
    """
    # Sort by time
    order = np.argsort(times)
    t_sort = times[order]
    e_sort = events[order]

    unique_times = np.unique(t_sort[e_sort == 1])  # event times only
    n_at_risk = len(times)
    S = 1.0
    t_km, S_km = [0], [1.0]

    ptr = 0   # pointer into sorted arrays
    for t in unique_times:
        # Count events and at-risk at time t
        while ptr < len(t_sort) and t_sort[ptr] < t:
            n_at_risk -= 1
            ptr += 1
        # Events at exactly t
        d = 0
        while ptr < len(t_sort) and t_sort[ptr] == t:
            d += e_sort[ptr]
            n_at_risk -= 1
            ptr += 1
        n_at_risk += d   # they were at risk at time t

        if n_at_risk > 0:
            S *= (1 - d / n_at_risk)
        t_km.append(t)
        S_km.append(S)
        n_at_risk -= d

    return np.array(t_km), np.array(S_km)


# For survival analysis: treat each defect as an event.
# Use cumulative time within each "batch" (reset every 200 samples for realism)
BATCH = 200
survival_rows = []
for batch_start in range(0, len(df), BATCH):
    batch = df.iloc[batch_start:batch_start+BATCH]
    cum_time = 0
    for _, row in batch.iterrows():
        cum_time += np.random.exponential(2.0)  # ~2 min between parts
        survival_rows.append({
            'Time': cum_time,
            'Event': row['Defect'],
            'Shift': row['Shift'],
            'Machine_Age_Cat': 'Young' if row['Machine_Age'] < 5 else 'Old'
        })

surv_df = pd.DataFrame(survival_rows)
print(
    f'Survival data: {len(surv_df)} observations, {surv_df["Event"].sum()} defect events')

In [None]:
# ─── 4. KM curves stratified by shift and machine age ──
fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# By shift
shift_names = {0: 'Day', 1: 'Evening', 2: 'Night'}
colors_shift = {0: '#4c72b0', 1: '#55a868', 2: '#c44e52'}
for shift_id in [0, 1, 2]:
    mask = surv_df['Shift'] == shift_id
    t_km, S_km = kaplan_meier(surv_df.loc[mask, 'Time'].values,
                              surv_df.loc[mask, 'Event'].values)
    axes[0].step(t_km, S_km, where='post', lw=2, color=colors_shift[shift_id],
                 label=f'{shift_names[shift_id]} (n={mask.sum()})')

axes[0].set_title('Kaplan-Meier by Shift', fontsize=13)
axes[0].set_xlabel('Time (min)')
axes[0].set_ylabel('Survival Probability S(t)')
axes[0].legend()
axes[0].set_ylim(0, 1.05)

# By machine age
for age_cat, color in [('Young', '#4c72b0'), ('Old', '#c44e52')]:
    mask = surv_df['Machine_Age_Cat'] == age_cat
    t_km, S_km = kaplan_meier(surv_df.loc[mask, 'Time'].values,
                              surv_df.loc[mask, 'Event'].values)
    axes[1].step(t_km, S_km, where='post', lw=2, color=color,
                 label=f'{age_cat} machine (n={mask.sum()})')

axes[1].set_title('Kaplan-Meier by Machine Age', fontsize=13)
axes[1].set_xlabel('Time (min)')
axes[1].set_ylabel('Survival Probability S(t)')
axes[1].legend()
axes[1].set_ylim(0, 1.05)

plt.suptitle('Survival Analysis — Time to Defect', fontsize=14, y=1.03)
plt.tight_layout()
plt.show()

In [None]:
# 2. Causal Inference — Propensity-Score Matching

# We want to estimate the **causal effect** of night-shift operation on defect rate, controlling for confounders. Propensity-score matching pairs night-shift units with similar day-shift units.

# ─── 5. Propensity-score matching ───────────────────────────
# Treatment: Shift == 2 (night) vs Shift != 2 (control)
df['is_night'] = (df['Shift'] == 2).astype(int)

# Confounders: all process + quality vars except Shift
CONFOUNDERS = [v for v in ALL_FEATS if v != 'Shift']
scaler_ps = StandardScaler()
X_conf = scaler_ps.fit_transform(df[CONFOUNDERS])

# Logistic regression for propensity scores
ps_model = LogisticRegression(max_iter=1000, C=1.0, random_state=0)
ps_model.fit(X_conf, df['is_night'])
propensity = ps_model.predict_proba(X_conf)[:, 1]

df['propensity'] = propensity

print(
    f'Propensity score range: [{propensity.min():.3f}, {propensity.max():.3f}]')
print(
    f'Night-shift samples: {df["is_night"].sum()}  |  Control: {(1-df["is_night"]).sum()}')

# Matching: for each treated unit, find the closest control unit (greedy)
treated_idx = df.index[df['is_night'] == 1].tolist()
control_idx = df.index[df['is_night'] == 0].tolist()
control_ps = propensity[control_idx]

matched_pairs = []
used_controls = set()

# Sort treated by propensity for efficiency
treated_sorted = sorted(treated_idx, key=lambda i: propensity[i])

for t_idx in treated_sorted:
    t_ps = propensity[t_idx]
    # Find closest unused control
    best_c, best_diff = None, np.inf
    for c_pos, c_idx in enumerate(control_idx):
        if c_idx in used_controls:
            continue
        diff = abs(propensity[c_idx] - t_ps)
        if diff < best_diff:
            best_diff = diff
            best_c = c_idx
            best_c_pos = c_pos
        if diff == 0:
            break   # perfect match

    if best_c is not None and best_diff < 0.05:  # caliper = 0.05
        matched_pairs.append((t_idx, best_c))
        used_controls.add(best_c)

print(f'Matched pairs (caliper=0.05): {len(matched_pairs)}')

# Estimate ATT (Average Treatment Effect on Treated)
defect_treated = np.mean([df.loc[t, 'Defect'] for t, c in matched_pairs])
defect_control = np.mean([df.loc[c, 'Defect'] for t, c in matched_pairs])
ATT = defect_treated - defect_control

print(f'\nCausal Effect of Night Shift on Defect Rate:')
print(f'  Matched treated defect rate:  {defect_treated*100:.2f}%')
print(f'  Matched control defect rate:  {defect_control*100:.2f}%')
print(f'  ATT (causal effect):          {ATT*100:+.2f} percentage points')

In [None]:
# ─── 6. Propensity + causal effect visualisation ──────────
fig, axes = plt.subplots(1, 3, figsize=(17, 5))

# Propensity distribution before matching
axes[0].hist(propensity[df['is_night'] == 1], bins=40, alpha=0.6, color='#c44e52',
             density=True, label='Night shift')
axes[0].hist(propensity[df['is_night'] == 0], bins=40, alpha=0.6, color='#4c72b0',
             density=True, label='Control')
axes[0].set_title('Propensity Score Distribution (Before Matching)')
axes[0].set_xlabel('Propensity Score')
axes[0].legend()

# After matching
matched_t_ps = [propensity[t] for t, c in matched_pairs]
matched_c_ps = [propensity[c] for t, c in matched_pairs]
axes[1].hist(matched_t_ps, bins=30, alpha=0.6, color='#c44e52',
             density=True, label='Night (matched)')
axes[1].hist(matched_c_ps, bins=30, alpha=0.6, color='#4c72b0',
             density=True, label='Control (matched)')
axes[1].set_title('Propensity Score Distribution (After Matching)')
axes[1].set_xlabel('Propensity Score')
axes[1].legend()

# Treatment effect bar chart
axes[2].bar(['Control\n(matched)', 'Night Shift\n(matched)', 'Causal Effect\n(ATT)'],
            [defect_control*100, defect_treated*100, ATT*100],
            color=['#4c72b0', '#c44e52', '#8172b2'], edgecolor='white', width=0.5)
axes[2].axhline(0, color='black', lw=0.5)
axes[2].set_title('Causal Effect of Night Shift')
axes[2].set_ylabel('Defect Rate (%)')
for i, v in enumerate([defect_control*100, defect_treated*100, ATT*100]):
    axes[2].text(i, v + 0.3, f'{v:+.1f}%',
                 ha='center', fontsize=10, fontweight='bold')

plt.tight_layout()
plt.show()

In [None]:
# 3. Hand-Rolled Histogram Gradient Boosting (XGBoost-Style)

# ─── 7. Histogram Gradient Boosting (simplified XGBoost) ──
class HistGBTree:
    """Single decision stump using histogram-based splitting."""

    def __init__(self, n_bins=32):
        self.n_bins = n_bins
        self.split_feat = None
        self.split_thresh = None
        self.left_val = 0
        self.right_val = 0

    def fit(self, X, grad, hess, bin_edges):
        """Find the best split to minimise the second-order objective."""
        best_gain = -np.inf
        n_feat = X.shape[1]

        for feat in range(n_feat):
            edges = bin_edges[feat]
            for b in range(1, len(edges)-1):
                thresh = edges[b]
                left_mask = X[:, feat] <= thresh
                right_mask = ~left_mask
                if left_mask.sum() < 5 or right_mask.sum() < 5:
                    continue

                G_L = grad[left_mask].sum()
                H_L = hess[left_mask].sum()
                G_R = grad[right_mask].sum()
                H_R = hess[right_mask].sum()

                # Gain = 0.5 * (G_L²/H_L + G_R²/H_R - (G_L+G_R)²/(H_L+H_R))
                gain = 0.5 * (G_L**2/(H_L+1e-6) + G_R**2/(H_R+1e-6)
                              - (G_L+G_R)**2/(H_L+H_R+1e-6))

                if gain > best_gain:
                    best_gain = gain
                    self.split_feat = feat
                    self.split_thresh = thresh
                    self.left_val = -G_L / (H_L + 1e-6)
                    self.right_val = -G_R / (H_R + 1e-6)

    def predict(self, X):
        mask = X[:, self.split_feat] <= self.split_thresh
        preds = np.where(mask, self.left_val, self.right_val)
        return preds


class HandRolledGBM:
    """Binary classification via histogram gradient boosting."""

    def __init__(self, n_trees=200, learning_rate=0.1, n_bins=16):
        self.n_trees = n_trees
        self.lr = learning_rate
        self.n_bins = n_bins
        self.trees = []
        self.bin_edges = None

    def _sigmoid(self, x):
        return 1 / (1 + np.exp(-np.clip(x, -30, 30)))

    def fit(self, X, y):
        # Precompute bin edges per feature
        self.bin_edges = []
        for feat in range(X.shape[1]):
            edges = np.quantile(X[:, feat], np.linspace(0, 1, self.n_bins+1))
            self.bin_edges.append(edges)

        F = np.zeros(len(y))   # cumulative prediction (log-odds)
        for t in range(self.n_trees):
            p = self._sigmoid(F)
            grad = p - y              # gradient of log-loss
            hess = p * (1 - p)        # hessian

            tree = HistGBTree(self.n_bins)
            tree.fit(X, grad, hess, self.bin_edges)
            self.trees.append(tree)
            F += self.lr * tree.predict(X)
        return self

    def predict_proba(self, X):
        F = np.zeros(len(X))
        for tree in self.trees:
            F += self.lr * tree.predict(X)
        return self._sigmoid(F)

    def predict(self, X):
        return (self.predict_proba(X) >= 0.5).astype(int)


# Train
X_feat = df[ALL_FEATS].values
y_feat = df['Defect'].values
scaler_gb = StandardScaler()
X_s = scaler_gb.fit_transform(X_feat)

X_train, X_test, y_train, y_test = train_test_split(X_s, y_feat, test_size=0.25,
                                                    stratify=y_feat, random_state=42)

print('Training Hand-Rolled GBM (200 trees) …')
hgbm = HandRolledGBM(n_trees=200, learning_rate=0.08, n_bins=16)
hgbm.fit(X_train, y_train)

# Evaluate
hgbm_proba = hgbm.predict_proba(X_test)
hgbm_pred = hgbm.predict(X_test)
fpr, tpr, _ = roc_curve(y_test, hgbm_proba)
auc_val = auc(fpr, tpr)
print(f'Hand-Rolled GBM AUC: {auc_val:.3f}')
print(classification_report(y_test, hgbm_pred, target_names=['OK', 'Defect']))

In [None]:
# ─── 8. Compare with sklearn GBR ──────────────────────────
sk_gbr = GradientBoostingClassifier(n_estimators=200, max_depth=2,
                                    learning_rate=0.08, random_state=0)
sk_gbr.fit(X_train, y_train)
sk_proba = sk_gbr.predict_proba(X_test)[:, 1]
fpr_sk, tpr_sk, _ = roc_curve(y_test, sk_proba)
auc_sk = auc(fpr_sk, tpr_sk)

fig, ax = plt.subplots(figsize=(8, 6))
ax.plot(fpr, tpr, lw=2, color='steelblue',
        label=f'Hand-Rolled GBM (AUC={auc_val:.3f})')
ax.plot(fpr_sk, tpr_sk, lw=2, color='#c44e52',
        label=f'Sklearn GBR (AUC={auc_sk:.3f})')
ax.plot([0, 1], [0, 1], 'k--', lw=0.8)
ax.set_xlabel('FPR')
ax.set_ylabel('TPR')
ax.set_title('ROC Comparison — Hand-Rolled vs Sklearn GBM')
ax.legend(loc='lower right')
plt.tight_layout()
plt.show()

In [None]:
# 4. Shapley Root-Cause Attribution

# ─── 9. Shapley values via marginal contribution sampling ─
def shapley_values(model_fn, X_sample, X_background, n_perms=100, seed=0):
    """
    Estimate Shapley values for one sample using sampling-based approach.
    model_fn: callable X → probability
    X_sample: single sample (1, d)
    X_background: background dataset for marginalisation
    """
    rng = np.random.default_rng(seed)
    d = X_sample.shape[1]
    shap = np.zeros(d)
    base = model_fn(X_background).mean()   # expected value under background

    for _ in range(n_perms):
        # Random permutation of features
        perm = rng.permutation(d)
        # Random background sample
        bg = X_background[rng.integers(0, len(X_background))]
        x_bg = bg.copy()
        x_fg = bg.copy()

        for feat in perm:
            # Before including feat: x_bg has background for feat
            pred_before = model_fn(x_bg.reshape(1, -1))[0]
            # Include feat from sample
            x_bg[feat] = X_sample[0, feat]
            pred_after = model_fn(x_bg.reshape(1, -1))[0]
            # Marginal contribution
            shap[feat] += (pred_after - pred_before)

    shap /= n_perms
    return shap


# Compute Shapley for top-10 defective samples
defect_idx = np.where(y_test == 1)[0][:10]
all_shap = []

print('Computing Shapley values for 10 defective samples …')
for i, idx in enumerate(defect_idx):
    sv = shapley_values(
        lambda X: sk_gbr.predict_proba(X)[:, 1],
        X_test[idx:idx+1],
        X_train[:500],   # subsample background for speed
        n_perms=80, seed=i
    )
    all_shap.append(sv)
    print(f'  Sample {i+1}/10 done.')

shap_matrix = np.array(all_shap)   # (10, n_features)
print('\nShapley matrix shape:', shap_matrix.shape)

In [None]:
# ─── 10. Shapley beeswarm + waterfall plot ────────────────
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Beeswarm: mean |SHAP| across defective samples
mean_abs_shap = np.abs(shap_matrix).mean(axis=0)
ranked = np.argsort(mean_abs_shap)

top_n = min(12, len(ALL_FEATS))
top_idx = ranked[-top_n:]

# Beeswarm-style dot plot
for plot_pos, feat_idx in enumerate(top_idx):
    vals = shap_matrix[:, feat_idx]
    # Jitter y for visibility
    jitter = np.random.default_rng(plot_pos).uniform(-0.15, 0.15, len(vals))
    colors = ['#c44e52' if v > 0 else '#4c72b0' for v in vals]
    axes[0].scatter(vals, np.full(len(vals), plot_pos) + jitter,
                    c=colors, s=40, edgecolors='white', zorder=3)

axes[0].set_yticks(range(top_n))
axes[0].set_yticklabels([ALL_FEATS[i] for i in top_idx], fontsize=9)
axes[0].axvline(0, color='black', lw=0.8)
axes[0].set_xlabel('SHAP Value')
axes[0].set_title('Shapley Beeswarm — Top Features (red=↑defect risk)')

# Waterfall for the single highest-risk defective sample
worst_idx = np.argmax(sk_gbr.predict_proba(X_test[defect_idx])[:, 1])
worst_shap = shap_matrix[worst_idx]
order = np.argsort(np.abs(worst_shap))[::-1][:10]
cumulative = np.zeros(len(order)+1)
base_val = sk_gbr.predict_proba(X_train[:500]).mean()   # E[f(x)]
cumulative[0] = base_val
for i, feat_idx in enumerate(order):
    cumulative[i+1] = cumulative[i] + worst_shap[feat_idx]

labels_wf = ['Base'] + [ALL_FEATS[i] for i in order]
colors_wf = ['grey'] + ['#c44e52' if worst_shap[i]
                        > 0 else '#4c72b0' for i in order]

axes[1].bar(range(len(labels_wf)), cumulative,
            color=colors_wf, edgecolor='white', width=0.6)
axes[1].axhline(cumulative[-1], color='black', ls='--', lw=0.8, alpha=0.5)
axes[1].set_xticks(range(len(labels_wf)))
axes[1].set_xticklabels(labels_wf, rotation=30, ha='right', fontsize=8)
axes[1].set_ylabel('Defect Probability')
axes[1].set_title(f'Waterfall — Highest-Risk Sample (P={cumulative[-1]:.3f})')

plt.tight_layout()
plt.show()

In [None]:
# 5. Capability Forecasting — Predicting Future Cpk

# ─── 11. Rolling Cpk + forecasting ────────────────────────
# Compute rolling Cpk for Wall_Thickness
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import GradientBoostingRegressor as GBReg
USL_wt = 2.08   # upper spec limit
LSL_wt = 1.92   # lower spec limit

WIN_CPK = 200
cpk_series = []
for i in range(WIN_CPK, len(df), 50):
    chunk = df['Wall_Thickness'].iloc[i-WIN_CPK:i]
    mu = chunk.mean()
    sig = chunk.std()
    cpu = (USL_wt - mu) / (3*sig) if sig > 0 else 99
    cpl = (mu - LSL_wt) / (3*sig) if sig > 0 else 99
    cpk_series.append({'idx': i, 'Cpk': min(cpu, cpl), 'mu': mu, 'std': sig})

cpk_df = pd.DataFrame(cpk_series)
print(f'Cpk time series: {len(cpk_df)} points')
print(f'  Mean Cpk: {cpk_df["Cpk"].mean():.3f}')
print(f'  Min Cpk:  {cpk_df["Cpk"].min():.3f}')

# Forecast Cpk using GBR on lagged Cpk values
LAG_CPK = 5
cpk_vals = cpk_df['Cpk'].values
X_cpk = np.column_stack([cpk_vals[i:len(cpk_vals)-LAG_CPK+i]
                        for i in range(LAG_CPK)])
y_cpk = cpk_vals[LAG_CPK:]

split_cpk = int(0.8 * len(X_cpk))
cpk_model = GBReg(n_estimators=100, max_depth=3, random_state=0)
cpk_model.fit(X_cpk[:split_cpk], y_cpk[:split_cpk])
cpk_pred = cpk_model.predict(X_cpk[split_cpk:])

cpk_rmse = np.sqrt(mean_squared_error(y_cpk[split_cpk:], cpk_pred))
print(f'  Cpk Forecast RMSE: {cpk_rmse:.4f}')

In [None]:
# ─── 12. Cpk forecast plot ─────────────────────────────────
fig, ax = plt.subplots(figsize=(14, 5))

# Full Cpk series
ax.plot(cpk_df['idx'], cpk_df['Cpk'], lw=1.2,
        color='steelblue', label='Actual Cpk')

# Forecast region
test_indices = cpk_df['idx'].values[LAG_CPK + split_cpk:]
ax.plot(test_indices, cpk_pred, lw=1.5, color='crimson',
        ls='--', label='Forecasted Cpk')

# Reference lines
ax.axhline(1.33, color='green', ls=':', lw=1, label='Target (1.33)')
ax.axhline(1.00, color='orange', ls=':', lw=1, label='Minimum (1.00)')

# Shade forecast region
ax.axvspan(test_indices[0], test_indices[-1], alpha=0.08,
           color='crimson', label='Forecast window')

ax.set_title('Process Capability (Cpk) — Actual vs Forecasted', fontsize=13)
ax.set_xlabel('Sample Index')
ax.set_ylabel('Cpk')
ax.legend(loc='lower left')
plt.tight_layout()
plt.show()

In [None]:
# 6. Cost-Benefit Analysis — Optimal Inspection Policy

# ─── 13. Expected cost model ──────────────────────────────
# Cost parameters (arbitrary but realistic units)
COST_DEFECT = 500   # cost of a defect reaching customer ($)
COST_INSPECTION = 25    # cost of inspecting one part ($)
COST_SCRAP = 80    # cost of scrapping a detected defect ($)

# For each test sample, compute expected cost under two policies:
# Policy A: Ship without inspection
# Policy B: Inspect (if defect found → scrap; if not → ship)

proba_defect = sk_gbr.predict_proba(X_test)[:, 1]

# expected cost if shipped
cost_no_inspect = proba_defect * COST_DEFECT
cost_inspect = COST_INSPECTION + proba_defect * \
    COST_SCRAP             # expected cost if inspected

# Optimal policy: inspect if cost_inspect < cost_no_inspect
# → inspect when p > COST_INSPECTION / (COST_DEFECT - COST_SCRAP)
threshold_optimal = COST_INSPECTION / (COST_DEFECT - COST_SCRAP)
print(f'Optimal inspection threshold: p > {threshold_optimal:.4f}')
print(
    f'  (i.e., inspect when predicted defect prob > {threshold_optimal*100:.2f}%)')

optimal_policy = (proba_defect > threshold_optimal).astype(int)
actual_cost = np.where(optimal_policy == 1,
                       COST_INSPECTION + y_test * COST_SCRAP,
                       y_test * COST_DEFECT)
naive_cost = y_test * COST_DEFECT   # ship everything

total_optimal = actual_cost.sum()
total_naive = naive_cost.sum()
savings = total_naive - total_optimal
print(f'\n  Total cost (ship all):      ${total_naive:,.0f}')
print(f'  Total cost (optimal policy): ${total_optimal:,.0f}')
print(
    f'  Savings:                     ${savings:,.0f} ({savings/total_naive*100:.1f}%)')
print(
    f'  Parts inspected:            {optimal_policy.sum()} / {len(optimal_policy)} ({optimal_policy.mean()*100:.1f}%)')

In [None]:
# ─── 14. Cost analysis visualisation ──────────────────────
fig, axes = plt.subplots(1, 3, figsize=(17, 5))

# Cost distribution comparison
thresholds = np.linspace(0, 0.5, 100)
total_costs = []
for t in thresholds:
    policy = (proba_defect > t).astype(int)
    cost = np.where(policy == 1,
                    COST_INSPECTION + y_test * COST_SCRAP,
                    y_test * COST_DEFECT).sum()
    total_costs.append(cost)

axes[0].plot(thresholds, total_costs, lw=2, color='steelblue')
axes[0].axvline(threshold_optimal, color='crimson', ls='--', lw=1.5,
                label=f'Optimal threshold={threshold_optimal:.3f}')
axes[0].set_title('Total Cost vs Inspection Threshold')
axes[0].set_xlabel('Defect Probability Threshold')
axes[0].set_ylabel('Total Expected Cost ($)')
axes[0].legend()

# Breakdown: inspected vs shipped
categories = ['Ship All\n(naive)', 'Optimal\nPolicy']
defects_escaped = [y_test.sum(), ((1-optimal_policy) * y_test).sum()]
defects_caught = [0, (optimal_policy * y_test).sum()]
no_defect = [len(y_test)-y_test.sum(), len(y_test)-y_test.sum()]

x_pos = [0, 1]
axes[1].bar(x_pos, defects_escaped, color='#c44e52', label='Defects escaped')
axes[1].bar(x_pos, defects_caught, bottom=defects_escaped,
            color='#55a868', label='Defects caught')
axes[1].set_xticks(x_pos)
axes[1].set_xticklabels(categories)
axes[1].set_title('Defect Disposition by Policy')
axes[1].set_ylabel('Count')
axes[1].legend()

# Cost per part distribution
cost_per_part_naive = y_test * COST_DEFECT
cost_per_part_optimal = np.where(optimal_policy == 1,
                                 COST_INSPECTION + y_test * COST_SCRAP,
                                 y_test * COST_DEFECT)
axes[2].hist(cost_per_part_naive[cost_per_part_naive > 0], bins=20, alpha=0.5,
             color='#c44e52', label='Naive', density=True)
axes[2].hist(cost_per_part_optimal[cost_per_part_optimal > 0], bins=20, alpha=0.5,
             color='#4c72b0', label='Optimal', density=True)
axes[2].set_title('Per-Part Cost Distribution (non-zero only)')
axes[2].set_xlabel('Cost ($)')
axes[2].legend()

plt.suptitle('Cost-Benefit Analysis — Optimal Inspection Policy',
             fontsize=14, y=1.03)
plt.tight_layout()
plt.show()

---
## Summary & Portfolio Takeaways

| Technique | Value |
|---|---|
| **Kaplan-Meier** | Nonparametric survival curves reveal that night shift and old machines degrade faster |
| **Propensity-Score Matching** | Isolates the **causal** effect of night shift on defect rate — not just correlation |
| **Hand-Rolled GBM** | Demonstrates deep understanding of boosting internals — histogram splits, second-order gradients |
| **Shapley Attribution** | Beeswarm + waterfall plots provide actionable root-cause explanations per defective part |
| **Cpk Forecasting** | Predicts future capability drift — enables **proactive** maintenance before quality degrades |
| **Cost-Benefit Policy** | Translates model output into an **optimal business decision** — the final step from ML to value |

This notebook completes the full quality analytics pipeline: **monitor → diagnose → explain → decide → forecast**.
