In [1]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import joblib

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

import tensorflow as tf
from tensorflow.keras.models import load_model

# -----------------------------
# Paths
# -----------------------------
OUT_DIR   = r"C:\Users\sagni\Downloads\Eco Habit"
DATA_PATH = r"C:\Users\sagni\Downloads\archive\sustainable_fashion_trends_2024.csv"

PKL_PATH  = os.path.join(OUT_DIR, "mindpal_preprocess.pkl")
H5_PATH   = os.path.join(OUT_DIR, "mindpal_model.h5")
HIST_CSV  = os.path.join(OUT_DIR, "training_history.csv")

os.makedirs(OUT_DIR, exist_ok=True)

# -------------------------------------------------
# Define the SAME helper classes used during training
# (needed so joblib can unpickle the pipeline)
# -------------------------------------------------
class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, column): self.column = column
    def fit(self, X, y=None): return self
    def transform(self, X): return X[[self.column]]

class To1DString(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            return X.iloc[:, 0].astype(str).values
        return np.asarray(X).astype(str).ravel()

class DateTimeExpand(BaseEstimator, TransformerMixin):
    def __init__(self, columns): self.columns = columns; self.out_cols = []
    def fit(self, X, y=None):
        self.out_cols = []
        for c in self.columns:
            self.out_cols += [f"{c}_year", f"{c}_month", f"{c}_day", f"{c}_dow"]
        return self
    def transform(self, X):
        outs = []
        for c in self.columns:
            s = pd.to_datetime(X[c], errors="coerce")
            outs.append(pd.DataFrame({
                f"{c}_year":  s.dt.year.fillna(0).astype(int),
                f"{c}_month": s.dt.month.fillna(0).astype(int),
                f"{c}_day":   s.dt.day.fillna(0).astype(int),
                f"{c}_dow":   s.dt.dayofweek.fillna(0).astype(int),
            }))
        return pd.concat(outs, axis=1) if outs else np.empty((len(X), 0))

# -----------------------------
# Helpers
# -----------------------------
def ensure_dense_if_small(X, max_feats=50000):
    if hasattr(X, "toarray") and X.shape[1] <= max_feats:
        return X.toarray()
    return X

def compile_loaded_model(model, n_classes: int):
    if n_classes <= 2:
        model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    else:
        model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])

# -----------------------------
# 1) Plot Accuracy & Loss from history CSV (if available)
# -----------------------------
if os.path.exists(HIST_CSV):
    hist_df = pd.read_csv(HIST_CSV)

    def plot_curve(x, y1, y2, title, ylabel, out_path):
        plt.figure(figsize=(8,5))
        plt.plot(x, y1, label="Train")
        if y2 is not None:
            plt.plot(x, y2, label="Validation")
        plt.title(title)
        plt.xlabel("Epoch")
        plt.ylabel(ylabel)
        plt.legend()
        plt.grid(True, linestyle="--", linewidth=0.5)
        plt.tight_layout()
        plt.savefig(out_path, dpi=150)
        plt.close()

    epochs = hist_df["epoch"] if "epoch" in hist_df.columns else np.arange(1, len(hist_df)+1)

    # Accuracy
    acc_col = "accuracy" if "accuracy" in hist_df.columns else ("acc" if "acc" in hist_df.columns else None)
    val_acc_col = "val_accuracy" if "val_accuracy" in hist_df.columns else ("val_acc" if "val_acc" in hist_df.columns else None)
    if acc_col:
        plot_curve(
            epochs,
            hist_df[acc_col],
            hist_df[val_acc_col] if val_acc_col and val_acc_col in hist_df.columns else None,
            "Model Accuracy",
            "Accuracy",
            os.path.join(OUT_DIR, "accuracy.png")
        )

    # Loss
    val_loss_series = hist_df["val_loss"] if "val_loss" in hist_df.columns else None
    plot_curve(
        epochs,
        hist_df["loss"],
        val_loss_series,
        "Model Loss",
        "Loss",
        os.path.join(OUT_DIR, "loss.png")
    )
else:
    print("[WARN] training_history.csv not found — skipping accuracy/loss curves.")

# -----------------------------
# 2) Recreate test split, predict, and plot confusion matrices
# -----------------------------
missing = [p for p in [PKL_PATH, H5_PATH, DATA_PATH] if not os.path.exists(p)]
if missing:
    raise FileNotFoundError(f"Missing required files: {missing}")

# Load bundle & model
bundle = joblib.load(PKL_PATH)
model  = load_model(H5_PATH)

# Compile to silence absl warning and ensure metrics available
n_classes = len(bundle["label_encoder"].classes_)
compile_loaded_model(model, n_classes)

# Rebuild the test set exactly like training
df = pd.read_csv(DATA_PATH)
target_col = bundle["target_col"]
if target_col not in df.columns:
    raise ValueError(f"Target column '{target_col}' not found in data file.")

X = df.drop(columns=[target_col])
y = bundle["label_encoder"].transform(df[target_col].astype(str))

X_train_df, X_test_df, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y if n_classes > 1 else None
)

preprocess = bundle["preprocess"]
X_test_t = preprocess.transform(X_test_df)
X_test_t = ensure_dense_if_small(X_test_t)

probs = model.predict(X_test_t, verbose=0)
if n_classes <= 2:
    y_pred = (probs.ravel() >= 0.5).astype(int)
else:
    y_pred = np.argmax(probs, axis=1)

# Save predictions CSV and classification report
pred_csv = os.path.join(OUT_DIR, "predictions.csv")
pred_df = pd.DataFrame({
    "true_label": bundle["label_encoder"].inverse_transform(y_test),
    "pred_label": bundle["label_encoder"].inverse_transform(y_pred)
})
pred_df.to_csv(pred_csv, index=False, encoding="utf-8")

report_txt = os.path.join(OUT_DIR, "classification_report.txt")
with open(report_txt, "w", encoding="utf-8") as f:
    f.write(classification_report(y_test, y_pred, target_names=[str(c) for c in bundle["label_encoder"].classes_]))

# Confusion matrices
labels = [str(c) for c in bundle["label_encoder"].classes_]
cm = confusion_matrix(y_test, y_pred, labels=np.arange(len(labels)))
cm_norm = cm.astype(float) / (cm.sum(axis=1, keepdims=True) + 1e-12)

def plot_cm(cm_mat, labels, title, out_path, normalize=False):
    plt.figure(figsize=(8,6))
    im = plt.imshow(cm_mat, interpolation="nearest", aspect="auto")
    plt.title(title)
    plt.colorbar(im, fraction=0.046, pad=0.04)
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45, ha="right")
    plt.yticks(tick_marks, labels)
    thresh = cm_mat.max() / 2.0 if cm_mat.size else 0.5
    for i in range(cm_mat.shape[0]):
        for j in range(cm_mat.shape[1]):
            val = cm_mat[i, j]
            txt = f"{val:.2f}" if normalize else f"{int(val)}"
            plt.text(j, i, txt, ha="center", va="center",
                     color="white" if val > thresh else "black", fontsize=9)
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

plot_cm(cm, labels, "Confusion Matrix (Counts)", os.path.join(OUT_DIR, "confusion_matrix.png"), normalize=False)
plot_cm(cm_norm, labels, "Confusion Matrix (Row-Normalized)", os.path.join(OUT_DIR, "confusion_matrix_norm.png"), normalize=True)

print("\n=== Saved plots & reports to:", OUT_DIR, "===")
print(" - accuracy.png (if history was present)")
print(" - loss.png (if history was present)")
print(" - confusion_matrix.png")
print(" - confusion_matrix_norm.png")
print(" - predictions.csv")
print(" - classification_report.txt")





=== Saved plots & reports to: C:\Users\sagni\Downloads\Eco Habit ===
 - accuracy.png (if history was present)
 - loss.png (if history was present)
 - confusion_matrix.png
 - confusion_matrix_norm.png
 - predictions.csv
 - classification_report.txt
