In [2]:
# === predict_mood.py (Jupyter-safe with interactive fallback) ===

import os
import sys
import argparse
import numpy as np
import pandas as pd
import joblib

from sklearn.base import BaseEstimator, TransformerMixin
from tensorflow.keras.models import load_model

# -----------------------------
# Default artifact locations
# -----------------------------
OUT_DIR   = r"C:\Users\sagni\Downloads\Mind Pal"
PKL_PATH  = os.path.join(OUT_DIR, "mindpal_preprocess.pkl")
H5_PATH   = os.path.join(OUT_DIR, "mindpal_model.h5")

# -----------------------------
# Helper classes (must match training)
# -----------------------------
class ColumnSelector(BaseEstimator, TransformerMixin):
    def __init__(self, column): self.column = column
    def fit(self, X, y=None): return self
    def transform(self, X): return X[[self.column]]

class To1DString(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None): return self
    def transform(self, X):
        if isinstance(X, pd.DataFrame):
            return X.iloc[:, 0].astype(str).values
        return np.asarray(X).astype(str).ravel()

class DateTimeExpand(BaseEstimator, TransformerMixin):
    def __init__(self, columns): self.columns = columns; self.out_cols = []
    def fit(self, X, y=None):
        self.out_cols = []
        for c in self.columns:
            self.out_cols += [f"{c}_year", f"{c}_month", f"{c}_day", f"{c}_dow"]
        return self
    def transform(self, X):
        outs = []
        for c in self.columns:
            s = pd.to_datetime(X[c], errors="coerce")
            outs.append(pd.DataFrame({
                f"{c}_year":  s.dt.year.fillna(0).astype(int),
                f"{c}_month": s.dt.month.fillna(0).astype(int),
                f"{c}_day":   s.dt.day.fillna(0).astype(int),
                f"{c}_dow":   s.dt.dayofweek.fillna(0).astype(int),
            }))
        return pd.concat(outs, axis=1) if outs else np.empty((len(X), 0))

# -----------------------------
# Utilities
# -----------------------------
def load_bundle_and_model(pkl_path: str, h5_path: str):
    if not os.path.exists(pkl_path):
        raise FileNotFoundError(f"Missing preprocess bundle: {pkl_path}")
    if not os.path.exists(h5_path):
        raise FileNotFoundError(f"Missing model file: {h5_path}")
    bundle = joblib.load(pkl_path)
    model  = load_model(h5_path)
    # Compile to silence "compiled metrics not built" warning
    n_classes = len(bundle["label_encoder"].classes_)
    if n_classes <= 2:
        model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    else:
        model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
    return bundle, model

def build_df_for_text(single_text: str, bundle: dict) -> pd.DataFrame:
    numeric_cols  = bundle.get("numeric_cols", [])
    cat_cols      = bundle.get("cat_cols", [])
    text_cols     = bundle.get("text_cols", [])
    datetime_cols = bundle.get("datetime_cols", [])
    all_cols = list(dict.fromkeys(numeric_cols + cat_cols + text_cols + datetime_cols)) or ["text"]
    df = pd.DataFrame({c: [np.nan] for c in all_cols})
    if text_cols:
        df[text_cols[0]] = single_text
    else:
        if "text" not in df.columns:
            df["text"] = [single_text]
        else:
            df.loc[0, "text"] = single_text
    return df

def ensure_dense_if_small(X):
    if hasattr(X, "toarray"):
        return X.toarray() if X.shape[1] <= 50000 else X
    return X

def predict_dataframe(df_in: pd.DataFrame, bundle: dict, model) -> pd.DataFrame:
    preprocess     = bundle["preprocess"]
    label_encoder  = bundle["label_encoder"]
    target_col     = bundle.get("target_col", None)

    X = df_in.copy()
    if target_col and target_col in X.columns:
        X = X.drop(columns=[target_col])

    Xt = preprocess.transform(X)
    Xt = ensure_dense_if_small(Xt)

    probs = model.predict(Xt, verbose=0)
    classes = [str(c) for c in label_encoder.classes_]

    if probs.ndim == 1 or probs.shape[1] == 1:
        pos = probs.ravel()
        neg = 1.0 - pos
        probs_mat = np.vstack([neg, pos]).T
        top_idx = (pos >= 0.5).astype(int)
    else:
        probs_mat = probs
        top_idx = np.argmax(probs_mat, axis=1)

    top_label = [classes[i] for i in top_idx]
    top_prob  = probs_mat[np.arange(len(probs_mat)), top_idx]

    out = pd.DataFrame({"id": np.arange(len(df_in)), "top1_label": top_label, "top1_prob": top_prob})
    for j, cls in enumerate(classes):
        out[f"prob_{cls}"] = probs_mat[:, j]
    return out

def parse_args():
    parser = argparse.ArgumentParser(description="Predict mood/emotion labels with MindPal model.")
    parser.add_argument("--in",   dest="in_csv",   type=str, default=None, help="Path to input CSV.")
    parser.add_argument("--out",  dest="out_csv",  type=str, default=None, help="Output CSV path.")
    parser.add_argument("--text", dest="single_text", type=str, default=None, help="Single text to predict.")
    parser.add_argument("--print", dest="do_print", action="store_true", help="Print predictions to console.")
    return parser.parse_args()

def main():
    args = parse_args()

    # If no args supplied (e.g., user just ran the file in Jupyter), prompt interactively
    if args.single_text is None and args.in_csv is None:
        try:
            user_text = input("Enter a sentence to predict mood (or leave blank to use a CSV): ").strip()
        except EOFError:
            user_text = ""
        if user_text:
            args.single_text = user_text
        else:
            csv_guess = input("Enter path to input CSV (or press Enter to cancel): ").strip()
            if not csv_guess:
                raise ValueError("Provide either --text 'your sentence' or --in path\\to\\file.csv")
            args.in_csv = csv_guess

    bundle, model = load_bundle_and_model(PKL_PATH, H5_PATH)

    if args.single_text is not None:
        df_in = build_df_for_text(args.single_text, bundle)
    else:
        if not os.path.exists(args.in_csv):
            raise FileNotFoundError(f"Input CSV not found: {args.in_csv}")
        df_in = pd.read_csv(args.in_csv)

    preds = predict_dataframe(df_in, bundle, model)

    out_csv = args.out_csv or os.path.join(OUT_DIR, "predictions_mood.csv")
    preds.to_csv(out_csv, index=False, encoding="utf-8")
    print(f"[SAVE] Predictions -> {out_csv}")

    if args.do_print:
        cols_show = ["id", "top1_label", "top1_prob"] + [c for c in preds.columns if c.startswith("prob_")]
        print(preds[cols_show].head(20).to_string(index=False))

# -----------------------------
# Jupyter arg-strip + run
# -----------------------------
if __name__ == "__main__":
    # Strip Jupyter's injected args so argparse doesn't choke
    if "ipykernel_launcher" in sys.argv[0] or any(a == "-f" for a in sys.argv):
        sys.argv = [sys.argv[0]] + sys.argv[1:][:0]  # keep just script name (no extra args)
        # You can also prefill defaults for quick tests, e.g.:
        # sys.argv += ["--text", "I feel focused and calm.", "--print"]

    main()


Enter a sentence to predict mood (or leave blank to use a CSV):  i am not very happy




[SAVE] Predictions -> C:\Users\sagni\Downloads\Mind Pal\predictions_mood.csv
