In [None]:
# - Trains two independent RandomForest models (Vitality, Stress)
# - Uses raw PlanetScope bands + original indices + 8 extra indices
# - Generates feature importance, confusion matrix, and per-class performance graphs.
import json
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    f1_score,
    balanced_accuracy_score,
)
from joblib import dump, load

In [None]:
# -------------------- CONFIG --------------------
DATA_PATH = r"/content/Data_To_Train_3_vitality.xlsx"
RAW_BANDS = ["B1","B2","B3","B4","B5","B6","B7","B8"]
TARGET_VITALITY = "VitalityCategory"
TARGET_STRESS   = "StressCategory"

TEST_SIZE    = 0.2
RANDOM_STATE = 42
EPS = 1e-6

TEXTURE_PATCH_COLS = {
    "NDVI": "NDVI_patch",
    "B7":   "B7_patch",
    "B8":   "B8_patch",
}

MODEL_DIR = Path("model_artifacts_extended")
(MODEL_DIR / "vitality").mkdir(exist_ok=True, parents=True)
(MODEL_DIR / "stress").mkdir(exist_ok=True, parents=True)

In [None]:
# -------------------- FEATURE ENGINEERING --------------------
def _safe_div(numer, denom):
    return numer / np.clip(denom, EPS, None)

def build_indices_base(df_bands: pd.DataFrame) -> pd.DataFrame:
    """Original 8 indices."""
    B2 = df_bands["B2"].astype(float)
    B4 = df_bands["B4"].astype(float)
    B6 = df_bands["B6"].astype(float)
    B7 = df_bands["B7"].astype(float)
    B8 = df_bands["B8"].astype(float)

    NDVI = _safe_div(B8 - B6, B8 + B6)
    EVI  = 2.5 * _safe_div(B8 - B6, (B8 + 6.0*B6 - 7.5*B2 + 1.0))
    NDRE = _safe_div(B8 - B7, B8 + B7)
    CIre = _safe_div(B8, B7) - 1.0
    GNDVI = _safe_div(B8 - B4, B8 + B4)

    ratio_RE_Red   = _safe_div(B7, B6)
    ratio_Red_NIR  = _safe_div(B6, B8)
    ratio_Blue_Red = _safe_div(B2, B6)

    return pd.DataFrame({
        "NDVI": NDVI, "EVI": EVI, "NDRE": NDRE, "CIre": CIre, "GNDVI": GNDVI,
        "ratio_RE_Red": ratio_RE_Red, "ratio_Red_NIR": ratio_Red_NIR, "ratio_Blue_Red": ratio_Blue_Red,
    })

def build_indices_extra(df_bands: pd.DataFrame) -> pd.DataFrame:
    """Extra 8 indices."""
    B2 = df_bands["B2"].astype(float)
    B3 = df_bands["B3"].astype(float)
    B4 = df_bands["B4"].astype(float)
    B6 = df_bands["B6"].astype(float)
    B7 = df_bands["B7"].astype(float)
    B8 = df_bands["B8"].astype(float)

    SAVI = 1.5 * _safe_div(B8 - B6, B8 + B6 + 0.5)
    MSAVI2 = 0.5 * (2.0*B8 + 1.0 - np.sqrt(np.clip((2.0*B8 + 1.0)**2 - 8.0*(B8 - B6), 0, None)))
    RDVI = _safe_div(B8 - B6, np.sqrt(np.clip(B8 + B6, EPS, None)))
    PRI = _safe_div(B3 - B4, B3 + B4)
    VARI = _safe_div(B4 - B6, B4 + B6 - B2)
    CIgreen = _safe_div(B8, B4) - 1.0
    MTCI = _safe_div(B8 - B7, B7 - B6)
    NDWI = _safe_div(B4 - B8, B4 + B8)

    return pd.DataFrame({
        "SAVI": SAVI, "MSAVI2": MSAVI2, "RDVI": RDVI, "PRI": PRI,
        "VARI": VARI, "CIgreen": CIgreen, "MTCI": MTCI, "NDWI": NDWI,
    })

def build_texture_optional(df_source: pd.DataFrame, idx_df: pd.DataFrame) -> pd.DataFrame:
    avail_cols = [c for c in TEXTURE_PATCH_COLS.values() if c in df_source.columns]
    if not avail_cols:
        return pd.DataFrame(index=idx_df.index)

    out = {}
    for name, col in TEXTURE_PATCH_COLS.items():
        if col not in df_source.columns:
            continue
        vals = df_source[col]
        means, stds = [], []
        for v in vals:
            if isinstance(v, (list, tuple, np.ndarray)) and len(v) > 0:
                arr = np.asarray(v, dtype=float)
                means.append(np.nanmean(arr))
                stds.append(np.nanstd(arr))
            else:
                means.append(np.nan)
                stds.append(np.nan)
        out[f"tex_{name}_mean"] = means
        out[f"tex_{name}_std"]  = stds

    tex_df = pd.DataFrame(out, index=idx_df.index)
    if tex_df.isna().all().all():
        return pd.DataFrame(index=idx_df.index)
    return tex_df.fillna(tex_df.median(numeric_only=True))

def make_feature_matrix(df_full: pd.DataFrame):
    base_idx  = build_indices_base(df_full[RAW_BANDS])
    extra_idx = build_indices_extra(df_full[RAW_BANDS])
    tex_feats = build_texture_optional(df_full, base_idx)
    X = pd.concat([df_full[RAW_BANDS].astype(float), base_idx, extra_idx, tex_feats], axis=1)
    return X, list(base_idx.columns), list(extra_idx.columns), list(tex_feats.columns)

In [None]:
# -------------------- VISUALIZATION HELPERS --------------------
def plot_confusion_matrix(cm, class_names, title, save_path):
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues",
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f"{title} - Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.close()

def plot_class_report(report_dict, title, save_path):
    df = pd.DataFrame(report_dict).T
    df = df.drop(["accuracy"], errors="ignore")
    df = df.loc[~df.index.str.contains("avg", case=False)]
    df[["precision", "recall", "f1-score"]].plot(kind="bar", figsize=(7,5))
    plt.title(f"{title} - Per-Class Performance")
    plt.ylabel("Score")
    plt.ylim(0, 1)
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.close()

def plot_feature_importance(model, feature_names, title, save_path, top_n=20):
    imp = model.feature_importances_
    order = np.argsort(imp)[::-1][:top_n]
    plt.figure(figsize=(8, 6))
    sns.barplot(x=imp[order], y=[feature_names[i] for i in order], palette="mako")
    plt.title(f"{title} - Top {top_n} Feature Importances")
    plt.tight_layout()
    plt.savefig(save_path, dpi=300)
    plt.close()

In [None]:
# -------------------- LOAD DATA --------------------
print("Loading data...")
df = pd.read_excel(DATA_PATH)

required = RAW_BANDS + [TARGET_VITALITY, TARGET_STRESS]
missing = [c for c in required if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns in data: {missing}")

optional_cols = [c for c in TEXTURE_PATCH_COLS.values() if c in df.columns]
df = df[required + optional_cols].copy()

before = len(df)
df = df.dropna(subset=RAW_BANDS + [TARGET_VITALITY, TARGET_STRESS]).reset_index(drop=True)
after = len(df)
print(f"Rows before/after NA drop: {before}/{after}")

In [None]:
# -------------------- FEATURES & TARGETS --------------------
X, base_cols, extra_cols, tex_cols = make_feature_matrix(df)
FEATURE_COLS = RAW_BANDS + base_cols + extra_cols + (tex_cols if tex_cols else [])
print(f"Total features: {X.shape[1]} (raw={len(RAW_BANDS)}, base={len(base_cols)}, extra={len(extra_cols)}, texture={len(tex_cols)})")

y_vital  = df[TARGET_VITALITY].astype(str)
y_stress = df[TARGET_STRESS].astype(str)

# -------------------- ENCODERS --------------------
le_vital  = LabelEncoder().fit(y_vital)
le_stress = LabelEncoder().fit(y_stress)
yv = le_vital.transform(y_vital)
ys = le_stress.transform(y_stress)

print("Vitality classes:", list(le_vital.classes_))
print("Stress classes:",   list(le_stress.classes_))


In [None]:
# -------------------- SPLITS --------------------
Xv_tr, Xv_te, yv_tr, yv_te = train_test_split(
    X, yv, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=yv
)
Xs_tr, Xs_te, ys_tr, ys_te = train_test_split(
    X, ys, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=ys
)

# -------------------- MODELS --------------------
rf_params = dict(
    n_estimators=500,
    max_depth=None,
    min_samples_leaf=2,
    class_weight="balanced_subsample",
    n_jobs=-1,
    random_state=RANDOM_STATE,
)

rf_vital  = RandomForestClassifier(**rf_params)
rf_stress = RandomForestClassifier(**rf_params)

print("Training Vitality model...")
rf_vital.fit(Xv_tr, yv_tr)
print("Training Stress model...")
rf_stress.fit(Xs_tr, ys_tr)

In [None]:
# -------------------- EVALUATION --------------------
def eval_target(X_te, y_te, y_hat, label_encoder, name, model, feature_cols):
    acc   = (y_hat == y_te).mean()
    bal   = balanced_accuracy_score(y_te, y_hat)
    f1m   = f1_score(y_te, y_hat, average="macro", zero_division=0)
    rep_dict = classification_report(
        y_te, y_hat,
        labels=np.arange(len(label_encoder.classes_)),
        target_names=label_encoder.classes_,
        output_dict=True,
        digits=4, zero_division=0
    )
    print(f"\n==== {name} ====")
    print(f"Accuracy: {acc:.4f} | Balanced Acc: {bal:.4f} | Macro-F1: {f1m:.4f}")
    print(classification_report(
        y_te, y_hat,
        labels=np.arange(len(label_encoder.classes_)),
        target_names=label_encoder.classes_,
        digits=4, zero_division=0
    ))

    cm = confusion_matrix(y_te, y_hat, labels=np.arange(len(label_encoder.classes_)))
    print("Confusion matrix:")
    print(pd.DataFrame(cm, index=[f"true_{c}" for c in label_encoder.classes_],
                          columns=[f"pred_{c}" for c in label_encoder.classes_]))

    # --- Visualization ---
    outdir = MODEL_DIR / name
    outdir.mkdir(exist_ok=True, parents=True)
    plot_confusion_matrix(cm, label_encoder.classes_, name, outdir / f"{name}_confusion_matrix.png")
    plot_class_report(rep_dict, name, outdir / f"{name}_class_report.png")
    plot_feature_importance(model, feature_cols, name, outdir / f"{name}_feature_importance.png")

yv_hat = rf_vital.predict(Xv_te)
ys_hat = rf_stress.predict(Xs_te)
eval_target(Xv_te, yv_te, yv_hat, le_vital, "VitalityCategory", rf_vital, FEATURE_COLS)
eval_target(Xs_te, ys_te, ys_hat, le_stress, "StressCategory", rf_stress, FEATURE_COLS)

# -------------------- SAVE --------------------
print("\nSaving artifacts...")
dump(rf_vital, MODEL_DIR / "vitality" / "rf_vitality_ext.joblib")
dump(le_vital, MODEL_DIR / "vitality" / "label_encoder_vitality.joblib")
dump(rf_stress, MODEL_DIR / "stress" / "rf_stress_ext.joblib")
dump(le_stress, MODEL_DIR / "stress" / "label_encoder_stress.joblib")

print(f"Artifacts saved to: {MODEL_DIR.resolve()}")
