# Mental‑Health Chatbot: Happy Brain

## Project Objective

This project aims to build an emotion-aware mental health chatbot that is both conversational and empathetic. The system leverages:
- **Emotion Classification:** Fine-tuned using `SamLowe/roberta-base-go_emotions` to detect multi-label emotions in user inputs.
- **Text Generation:** Two T5 models are fine-tuned for:
  - **Response Generation:** To generate supportive, therapist-style responses.
  - **Question-Answering (QA):** To provide factual answers when questions are asked.
- **Integrated Pipeline:** A routing mechanism that chooses the proper response mode (QA vs. supportive response) based on detected emotions and input type.
- **Deployment:** A Gradio interface enabling voice and text-based interactions.

## Notebook Structure and Sections

1. **Library Imports & Environment Setup**  
   Sets up all the necessary imports from standard libraries, data handling, PyTorch, Hugging Face, and Gradio.

2. **Data Loading & Preprocessing**  
   - Loads multiple mental health-related CSV datasets.
   - Performs cleaning and mapping to uniform "question"/"answer" columns.
   - Splits the combined dataset into training and testing sets.

3. **Multi-Label Emotion Annotation & Custom Trainer Setup**  
   - Processes multi-label emotion annotations using `MultiLabelBinarizer`.
   - Defines custom data collators and trainers to support binary cross-entropy loss for multi-label classification.
   - Tokenizes and prepares data for the emotion classifier model.

4. **Model Training: Emotion Classification**  
   - Fine-tunes the `SamLowe/roberta-base-go_emotions` model on the processed data.
   - Logs evaluation metrics such as micro F1, precision, recall, and subset accuracy.
   - Saves the fine-tuned emotion classifier model.

5. **Model Training: T5 for Response Generation**  
   - Constructs input/target pairs for response generation (user input -> supportive response).
   - Fine-tunes a T5 model (e.g., `t5-small`) with evaluation metrics like ROUGE and BERTScore.
   - Saves the fine-tuned T5 response generator model.

6. **Model Training: T5 for Question-Answering (QA)**  
   - Constructs QA pairs with a specific prompt format.
   - Fine-tunes a separate T5 model for QA tasks.
   - Saves the fine-tuned T5 QA model.

7. **Model Deployment Setup with Combined Metadata**  
   - Saves a combined metadata file (`combined_model_metadata.pt`) that stores the paths to all three fine-tuned models.
   - Updates the loading code to reference this metadata file, ensuring that future training runs build on previous improvements rather than overwriting models.

8. **Unified Pipeline & Gradio Interface**  
   - Integrates the emotion classifier, T5 response generator, and T5 QA models into a single pipeline.
   - Uses helper functions (for prompt formatting, emotion detection, audio transcription, etc.) to generate responses.
   - Provides a fully-functional Gradio interface supporting both text and voice inputs with options for language, chat history, and conversation logging.

9. **Evaluation**  
   - Evaluates the emotion classifier using metrics such as micro F1, precision, recall, and subset accuracy.
   - Evaluates the generation models (both response generation and QA) using metrics including ROUGE, BERTScore, perplexity, and loss.
   - Outputs formatted evaluation results for easy monitoring.

## Summary

This notebook offers a complete end-to-end pipeline—from data preprocessing and model fine-tuning to evaluation and interactive deployment via Gradio—for an emotion-aware mental health chatbot. The modular structure ensures that each component (emotion detection, response generation, QA) is individually optimized, and a combined metadata system supports incremental training and easy deployment.

## 1. Imports & Configuration

In [None]:
# ================================
# Environment Setup
# ================================
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""  # Force CPU use
import warnings
import logging
from pathlib import Path
from dataclasses import dataclass

# ================================
# Standard Library Imports
# ================================
import glob
import json
import time
import random
import itertools
import tempfile
import datetime
import pprint  # Pretty-printing

# ================================
# Scientific & Data Libraries
# ================================
import numpy as np
import pandas as pd
import math

import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader

from sklearn.metrics import (
    f1_score,
    precision_score,
    recall_score,
    accuracy_score
)
from sklearn.preprocessing import MultiLabelBinarizer

# ================================
# Audio Processing
# ================================
from pydub import AudioSegment
import speech_recognition as sr

# ================================
# NLP & Transformers (Hugging Face)
# ================================
import evaluate
import gradio as gr
import streamlit as st
from datasets import load_dataset, Dataset, concatenate_datasets

# Transformers - Tokenizers & Models
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    T5ForConditionalGeneration
)

# Transformers - Training Tools
from transformers import (
    Trainer,
    TrainingArguments,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    TrainerCallback,
    DataCollatorWithPadding,
    DataCollatorForSeq2Seq,
    default_data_collator
)

# Transformers - Logging
from transformers import logging as hf_logging

# ================================
# PEFT / LoRA
# ================================
from peft import get_peft_model, LoraConfig, TaskType


In [None]:
USE_CPU = True  # Set to True to force CPU mode
device = torch.device('cpu' if USE_CPU else ('cuda' if torch.cuda.is_available() else 'cpu'))
print('Device in use:', device)

# Manual device override

# FORCE CPU for the entire session
USE_CPU = True
device = torch.device("cpu")
print("FORCED device:", device)

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)


# Save directory setup

SAVE_ROOT = Path("./saved_models")
for sub in ["emotion_classifier", "flan_t5_response_generator", "t5_qa", "final_combined"]:
    (SAVE_ROOT / sub).mkdir(parents=True, exist_ok=True)


## Data Loading and Preprocessing

In [None]:
# Toggle individual CSVs and provide their column mapping
# Format: name: (enabled, path, question_col, answer_col)

DATASETS = {
    "ds1": (True,  "./data/ds1_transformed_mental_health_chatbot_dataset.csv",  "question", "answer"),
    "ds2": (False, "./data/ds2_transformed_mental_health_chatbot.csv",         "question", "answer"),
    "ds3": (False, "./data/ds3_mental_health_faq_cleaned.csv",                 "Question", "Answer"),
    "ds4": (False, "./data/ds4_mental_health_chatbot_dataset_merged_modes.csv","prompt",   "response"),
    "ds5": (False, "./data/ds5_Mental_Health_FAQ.csv",                         "Question", "Answer"),
    "ds6": (False, "./data/ds6_mental_health_counseling.csv",                  "query",    "completion"),
}

In [None]:
# Cleaner that auto-maps columns to 'question' / 'answer'

def load_and_clean(path, q_col, a_col):
    df = pd.read_csv(path)

    # Normalize headers
    df.columns = [c.lower().strip() for c in df.columns]
    q_col = q_col.lower().strip()
    a_col = a_col.lower().strip()

    # Common renames
    rename_map = {
        "prompt": "question",
        "response": "answer",
        "questions": "question",
        "answers": "answer",
    }
    df = df.rename(columns=rename_map)

    # If provided cols exist, rename them to standard names
    
    if q_col in df.columns:
        df = df.rename(columns={q_col: "question"})
    if a_col in df.columns:
        df = df.rename(columns={a_col: "answer"})

    # Try to map 'context' -> 'question' if needed
    if "question" not in df.columns and "context" in df.columns:
        df = df.rename(columns={"context": "question"})

    # Verify if necessary columns exist
    if not {"question", "answer"}.issubset(df.columns):
        raise ValueError(f"Could not find 'question'/'answer' in {path}. Available columns: {list(df.columns)}")

    # Retain only required columns, drop missing values and duplicates
    df = df[["question", "answer"]].dropna()
    df["question"] = df["question"].astype(str).str.strip().str.replace(r"\s+", " ", regex=True)
    df["answer"]   = df["answer"].astype(str).str.strip().str.replace(r"\s+", " ", regex=True)
    df = df.drop_duplicates()

    # Convert to Hugging Face Dataset
    return Dataset.from_pandas(df.reset_index(drop=True))

In [None]:
# Load enabled datasets and create a unified dataset
datasets = []
for key, (enabled, path, q_col, a_col) in DATASETS.items():
    if enabled:
        print(f"Loading dataset '{key}' from {path} ...")
        ds = load_and_clean(path, q_col, a_col)
        print(f"Loaded {len(ds)} examples from '{key}'.")
        datasets.append(ds)
    else:
        print(f"Skipping dataset '{key}' as its toggle is off.")

In [None]:
if datasets:
    combined_dataset = concatenate_datasets(datasets)
    print(f"\nCombined dataset contains {len(combined_dataset)} examples.")
else:
    raise ValueError("No datasets enabled. Please enable at least one dataset in DATASETS.")

# Shuffle and split into training and testing datasets (e.g., 90% train, 10% test)
combined_dataset = combined_dataset.shuffle(seed=42)
split_dataset = combined_dataset.train_test_split(test_size=0.1, seed=42)
train_dataset = split_dataset['train']
test_dataset = split_dataset['test']
print(f"Training set: {len(train_dataset)} examples, Testing set: {len(test_dataset)} examples.")

### Multi-Label Emotion Annotation & Custom Trainer Setup

In [None]:

# Multi-label Emotion Annotation & Trainer Setup


# Set a seed if not already defined
SEED = 42


# Data Collator: casts labels to float32

def float_label_collator(features):
    """
    Wrap the default HF collator but cast the `labels` tensor to float32
    so BCEWithLogitsLoss gets the right dtype.
    """
    batch = default_data_collator(features)
    if "labels" in batch:
        batch["labels"] = batch["labels"].to(torch.float32)
    # Uncomment next line to print label info during debugging
    # print("collator labels dtype/shape:", batch["labels"].dtype, batch["labels"].shape)
    return batch

In [None]:

# Custom Trainer for Multi-Label Classification

class MultiLabelTrainer(Trainer):
    """
    Custom Trainer that computes loss using binary cross‑entropy with logits.
    This ensures multi‑label targets (e.g., emotions) are correctly processed.
    """
    def compute_loss(self, model, inputs, return_outputs: bool = False, **kwargs):
        labels = inputs.pop("labels").float()
        outputs = model(**inputs)
        logits = outputs.logits

        # If labels' shape does not match logits, reshape to match
        if labels.shape != logits.shape:
            labels = labels.view_as(logits)
        
        loss = F.binary_cross_entropy_with_logits(logits, labels, reduction="mean")
        return (loss, outputs) if return_outputs else loss

In [None]:

# Callback: Print training progress (optional)

class StepPrinter(TrainerCallback):
    """
    A Trainer callback that prints step-wise loss and evaluation metrics
    while keeping the tqdm progress bar.
    """
    def on_log(self, args, state, control, logs=None, **kwargs):
        if not logs or not state.is_local_process_zero:
            return
        if "loss" in logs:
            print(f"Step {state.global_step:>6} • loss {logs['loss']:.4f}")
        if "eval_loss" in logs:
            metric = logs.get("micro_f1") or logs.get("bertscore_f1") or logs.get("rougeL")
            metric_str = f" • metric {metric:.4f}" if metric is not None else ""
            print(f"Epoch {int(state.epoch)}/{int(args.num_train_epochs)}"
                  f" • eval_loss {logs['eval_loss']:.4f}{metric_str}")

In [None]:

# Define Emotion Labels (28 total: 27 + neutral)

GO_EMOTION_LABELS = [
    'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring',
    'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval',
    'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief',
    'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief',
    'remorse', 'sadness', 'surprise', 'neutral'
]
num_labels = len(GO_EMOTION_LABELS)

In [None]:

# Annotate each example with multi-label emotion annotations.
# For this demo, we simulate emotion annotations: if an example has no emotion,
# we default to "neutral". Replace this with your real emotion annotations if available.

def annotate_emotions(example):
    emos = example.get("emotions", [])
    
    # If no explicit emotion annotations, default to ["neutral"]
    if not emos:
        emos = ["neutral"]
    
    # Save original emotions for visibility (optional)
    example["emotions"] = emos
    
    # Create a binary label vector for each of the 28 emotions
    example["labels"] = [1.0 if lbl in emos else 0.0 for lbl in GO_EMOTION_LABELS]
    return example

In [None]:

# Helper to retrieve input text from an example (for debugging)

def get_input_text(example):
    return example.get("text") or example.get("question") or "[NO TEXT FOUND]"


# (Re‑)load datasets for emotion annotation if not already loaded.

if 'train_dataset' not in globals() or 'test_dataset' not in globals():
    print("Reloading datasets for emotion annotation ...")
    datasets_list = []
    for name, (enabled, path, q_col, a_col) in DATASETS.items():
        if not enabled:
            continue
        ds = load_and_clean(path=path, q_col=q_col, a_col=a_col)
        datasets_list.append(ds)
    if not datasets_list:
        print("No datasets were enabled, using a fallback test dataset.")
        fallback_data = {
            "text": [
                "How are you?",
                "I feel really down today.",
                "I'm so happy with my progress!",
                "Why does nobody understand me?",
                "I'm feeling anxious about school.",
                "Life is good lately.",
                "Sometimes I just want to cry.",
                "Everything is falling apart.",
                "I’m grateful for my therapist.",
                "Can someone please just listen?"
            ]
        }
        ds = Dataset.from_dict(fallback_data)
        datasets_list.append(ds)
    full_ds = concatenate_datasets(datasets_list) if len(datasets_list) > 1 else datasets_list[0]
    full_ds = full_ds.shuffle(seed=SEED)
    split = full_ds.train_test_split(test_size=0.1, seed=SEED)
    train_dataset, test_dataset = split["train"], split["test"]

print(f"Train dataset: {len(train_dataset):,} examples • Test dataset: {len(test_dataset):,} examples")

In [None]:

# Map emotion annotations onto training and testing datasets

emo_train = train_dataset.map(annotate_emotions)
emo_test  = test_dataset.map(annotate_emotions)

print("Sample annotation:")
print(get_input_text(emo_train[0]), "->", emo_train[0]["emotions"])

In [None]:

# Filter out any examples without at least one positive label (shouldn't happen after defaulting to neutral)

def has_nonzero_labels(example):
    return sum(example["labels"]) > 0

emo_train = emo_train.filter(has_nonzero_labels)
emo_test = emo_test.filter(has_nonzero_labels)

# Check for any problematic labels
bad_labels = [ex for ex in emo_train if "labels" not in ex or sum(ex["labels"]) == 0]
print("Number of examples with problematic labels:", len(bad_labels))
if bad_labels:
    print("Example with problematic labels:", bad_labels[0])

In [None]:

# Load the tokenizer for emotion classification.
# We'll be using the 'SamLowe/roberta-base-go_emotions' tokenizer.

emo_tokenizer = AutoTokenizer.from_pretrained("SamLowe/roberta-base-go_emotions")

# Print columns before renaming for clarity
print("Before column rename:", emo_train.column_names)
# Rename 'question' to 'text' if needed for tokenization
if "question" in emo_train.column_names:
    emo_train = emo_train.rename_column("question", "text")
if "question" in emo_test.column_names:
    emo_test = emo_test.rename_column("question", "text")
print("After column rename:", emo_train.column_names)

In [None]:

# Tokenization: tokenize using the 'text' column.

def emo_tokenize(batch):
    return emo_tokenizer(batch["text"], padding=True, truncation=True)

In [None]:

# Function to cast labels to float32 using numpy

def cast_to_float(example):
    example["labels"] = np.array(example["labels"], dtype=np.float32)
    return example

In [None]:
# Tokenize and cast labels for both training and testing sets (batched processing)
emo_train_tok = emo_train.map(emo_tokenize, batched=True).map(cast_to_float)
emo_test_tok  = emo_test.map(emo_tokenize, batched=True).map(cast_to_float)

# Set the dataset format for PyTorch
emo_train_tok.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
emo_test_tok.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

print(f"Tokenized training examples: {len(emo_train_tok)} • Tokenized testing examples: {len(emo_test_tok)}")


In [None]:
model_paths = {
    "t5_qa": "./saved_models/t5_qa",
    "emotion_classifier": "./saved_models/emotion_classifier",
    "flan_t5_response_generator": "./saved_models/flan_t5_response_generator"
}


## 4. Train Emotion Classifier (RoBERTa)

In [None]:
emotion_model_path = SAVE_ROOT / "emotion_classifier"

try:
    if emotion_model_path.is_dir() and any(emotion_model_path.iterdir()):
        print("Loading previously fine-tuned emotion model...")
        emo_model = AutoModelForSequenceClassification.from_pretrained(emotion_model_path).to(device)
    else:
        raise FileNotFoundError
except Exception:
    print("Loading base emotion model...")
    emo_model = AutoModelForSequenceClassification.from_pretrained(
        "SamLowe/roberta-base-go_emotions",
        problem_type="multi_label_classification",
        num_labels=len(GO_EMOTION_LABELS)
    ).to(device)


In [None]:

# Model Training: Emotion Classification with RoBERTa


# Define device
device = torch.device("cpu" if USE_CPU else ("cuda" if torch.cuda.is_available() else "cpu"))
print("Training device:", device)


In [None]:

# Tokenization for Model Training: Use fixed max_length

def emo_tokenize(batch):
    return emo_tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=64
    )

In [None]:
# Float conversion: ensures labels become float32 arrays.
def cast_to_float(example):
    example["labels"] = np.array(example["labels"], dtype=np.float32)
    return example

In [None]:
# Re-tokenize the datasets using the new tokenization function
emo_train_tok = emo_train.map(emo_tokenize, batched=True).map(cast_to_float)
emo_test_tok  = emo_test.map(emo_tokenize, batched=True).map(cast_to_float)

# Set format to PyTorch tensors
emo_train_tok.set_format("torch", columns=["input_ids", "attention_mask", "labels"])
emo_test_tok.set_format("torch", columns=["input_ids", "attention_mask", "labels"])

print(f"After tokenization: {len(emo_train_tok)} training examples; {len(emo_test_tok)} testing examples.")

In [None]:

# Load the pre-trained emotion classification model

emo_model = AutoModelForSequenceClassification.from_pretrained(
    "SamLowe/roberta-base-go_emotions",
    problem_type="multi_label_classification",
    num_labels=num_labels,  # equals len(GO_EMOTION_LABELS)
).to(device)
print("Loaded model:", emo_model.config._name_or_path)

In [None]:

# Define Evaluation Metrics for Emotion Classification

def compute_emo_metrics(pred):
    logits, labels = pred
    # Compute probabilities using sigmoid on logits
    probs = torch.sigmoid(torch.tensor(logits))
    # Apply threshold (0.3) to decide positive labels
    preds = (probs > 0.3).int().numpy()
    labels = np.array(labels)

    # Defensive check: only consider rows with at least one positive label
    mask = labels.sum(axis=1) > 0
    if mask.sum() == 0:
        print("Warning: all evaluation labels are empty")
        return {"micro_f1": 0.0}

    try:
        f1 = f1_score(labels[mask], preds[mask], average="micro", zero_division=0)
    except ValueError as e:
        print("Metric error:", e)
        f1 = 0.0

    return {"micro_f1": f1}

In [None]:

# Set Up Training Arguments

# Define a root folder for saving model checkpoints if not defined already
from pathlib import Path
SAVE_ROOT = Path("./saved_models")
emo_args = TrainingArguments(
    output_dir=str(SAVE_ROOT / "emotion_classifier"),
    
    # Logging & Reporting
    logging_strategy="steps",
    logging_steps=10,
    logging_dir="./logs",
    report_to="none",
    
    # Training hyper‑parameters
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    learning_rate=2e-5,
    num_train_epochs=3,
    
    # Evaluation and checkpointing settings
    evaluation_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="micro_f1",
    greater_is_better=True,
    
    seed=SEED,
)

In [None]:

# Instantiate the Custom Trainer for Multi-Label Classification

trainer_emo = MultiLabelTrainer(
    model=emo_model,
    args=emo_args,
    train_dataset=emo_train_tok,
    eval_dataset=emo_test_tok,
    tokenizer=emo_tokenizer,
    data_collator=float_label_collator,
    compute_metrics=compute_emo_metrics,
    callbacks=[StepPrinter],
)

In [None]:
# Train Roberta Emotion Classifier
num_labels = len(emo_train_tok[0]['labels'])
emotion_model = AutoModelForSequenceClassification.from_pretrained(
    "roberta-base", num_labels=num_labels
).to(device)

training_args = TrainingArguments(
    output_dir="./saved_models/emotion_classifier",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_strategy="steps",
    logging_steps=10,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1"
)

def compute_metrics(pred):
    logits, labels = pred
    preds = (logits > 0).astype(int)
    return {
        "f1": f1_score(labels, preds, average="micro"),
        "precision": precision_score(labels, preds, average="micro"),
        "recall": recall_score(labels, preds, average="micro"),
        "accuracy": accuracy_score(labels, preds)
    }

trainer = Trainer(
    model=emotion_model,
    args=training_args,
    train_dataset=emo_train_tok,
    eval_dataset=emo_test_tok,
    tokenizer=emo_tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()
emotion_model.save_pretrained("./saved_models/emotion_classifier")

In [None]:

# Start Training (uncomment the next line to train)

trainer_emo.train()


# Save the Best Model and Tokenizer

# Create the directory structure if it doesn't exist
SAVE_ROOT.mkdir(exist_ok=True, parents=True)
(SAVE_ROOT / "emotion_classifier").mkdir(exist_ok=True, parents=True)

emo_model.save_pretrained(SAVE_ROOT / "emotion_classifier")
emo_tokenizer.save_pretrained(SAVE_ROOT / "emotion_classifier")

# Also save the trainer's final model state
trainer_emo.save_model()
print("Emotion classifier model and tokenizer saved to", SAVE_ROOT / "emotion_classifier")


## 5. Train T5 for Response Generation

In [None]:
MODEL_NAME = "google/flan-t5-large"
SAVE_DIR = SAVE_ROOT / "flan_t5_response_generator"

try:
    if SAVE_DIR.is_dir() and any(SAVE_DIR.iterdir()):
        print("Loading previously fine-tuned flan-t5-large response generator...")
        resp_model = T5ForConditionalGeneration.from_pretrained(SAVE_DIR).to(device)
    else:
        raise FileNotFoundError
except Exception:
    print("Loading base FLAN-T5-XL model...")
    resp_model = T5ForConditionalGeneration.from_pretrained(MODEL_NAME).to(device)



In [None]:

# Train T5 for Response Generation

# Build input/target pairs: user text -> helpful response
# For now we use 'question' as input and 'answer' as target.
def build_t5_pairs(example):
    # Retrieve the question from either "question" or "text" columns,
    # and the corresponding answer from either "answer" or "response" columns.
    question = example.get("question") or example.get("text") or ""
    answer = example.get("answer") or example.get("response") or ""
    example["input_text"] = "respond: " + question
    example["target_text"] = answer
    return example

In [None]:
# Map the original training and testing datasets to build T5 pairs.
# We're using the original QA datasets (train_dataset and test_dataset) from our data-loading cell.
resp_train = train_dataset.map(build_t5_pairs)
resp_test  = test_dataset.map(build_t5_pairs)

In [None]:
# Load T5 tokenizer and model.
t5_resp_model_name = "google/flan-t5-large"
tokenizer_t5 = AutoTokenizer.from_pretrained(t5_resp_model_name)

In [None]:
# Tokenize the input and target texts for T5.
def t5_tokenize(batch):
    # Tokenize the input text.
    model_inputs = tokenizer_t5(batch["input_text"], max_length=64, truncation=True)
    # Prepare target text tokenization; this context manager sets the tokenizer into target mode.
    with tokenizer_t5.as_target_tokenizer():
        labels = tokenizer_t5(batch["target_text"], max_length=64, truncation=True)
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

In [None]:
# Tokenize the training and testing T5 pairs.
resp_train_tok = resp_train.map(t5_tokenize, batched=True, remove_columns=resp_train.column_names)
resp_test_tok  = resp_test.map(t5_tokenize, batched=True, remove_columns=resp_test.column_names)

# Set datasets' format to output PyTorch tensors.
resp_train_tok.set_format("torch")
resp_test_tok.set_format("torch")

print(f"Tokenized training examples: {len(resp_train_tok)}; Tokenized testing examples: {len(resp_test_tok)}")

In [None]:
# Load FLAN-T5 Response Generation Model & Tokenizer from metadata
from transformers import T5ForConditionalGeneration, AutoTokenizer

flan_resp_path = Path(model_paths["flan_t5_response_generator"])

try:
    if flan_resp_path.is_dir() and any(flan_resp_path.iterdir()):
        print("Loading fine-tuned FLAN-T5 model...")
        resp_model = T5ForConditionalGeneration.from_pretrained(flan_resp_path).to(device)
        resp_tokenizer = AutoTokenizer.from_pretrained(flan_resp_path)
    else:
        raise FileNotFoundError
except Exception:
    print("Loading base flan-t5-large model...")
    resp_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)
    resp_tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-large")


In [None]:
def compute_metrics(eval_preds):
    preds, labels = eval_preds

    if isinstance(preds, tuple):
        preds = preds[0]

    preds = np.argmax(preds, axis=-1)

    if isinstance(labels, tuple):
        labels = labels[0]
    labels = np.where(labels != -100, labels, resp_tokenizer.pad_token_id)

    decoded_preds = resp_tokenizer.batch_decode(preds, skip_special_tokens=True)
    decoded_labels = resp_tokenizer.batch_decode(labels, skip_special_tokens=True)

    decoded_preds = [pred.strip() for pred in decoded_preds]
    decoded_labels = [label.strip() for label in decoded_labels]

    result = rouge.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)

    return {
        "rougeL": result["rougeL"]
    }

In [None]:
# Set up the Seq2Seq training arguments.

resp_training_args = Seq2SeqTrainingArguments(
    output_dir="./saved_models/flan_t5_response_generator",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    predict_with_generate=False,
    logging_dir="./logs",
    logging_strategy="steps",
    logging_steps=10,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="rougeL"
)


# Assert that tokenized response data exists

assert 'resp_train_tok' in globals() and 'resp_test_tok' in globals(), "Tokenized response data is not defined."

# Create the Seq2SeqTrainer instance

resp_trainer = Seq2SeqTrainer(
    model=resp_model,  
    args=resp_training_args,
    train_dataset=resp_train_tok,  
    eval_dataset=resp_test_tok,    
    tokenizer=resp_tokenizer,      
    data_collator=DataCollatorForSeq2Seq(tokenizer=resp_tokenizer, model=resp_model),
    compute_metrics=compute_metrics  
)

# Begin training

resp_trainer.train()
resp_model.save_pretrained("./saved_models/flan_t5_response_generator")


In [None]:
# Instantiate the T5 model for conditional generation and send it to the appropriate device.

resp_model = T5ForConditionalGeneration.from_pretrained(t5_resp_model_name).to(device)
print("Loaded T5 model:", t5_resp_model_name)

In [None]:

# Save the trained FLAN-T5 response generation model and tokenizer.

(SAVE_ROOT / "flan_t5_response_generator").mkdir(exist_ok=True, parents=True)
resp_model.save_pretrained(SAVE_ROOT / "flan_t5_response_generator")
tokenizer_t5.save_pretrained(SAVE_ROOT / "flan_t5_response_generator")
trainer_resp.save_model()  # Save trainer's model state.

print("FLAN-T5 response generator model and tokenizer saved to", SAVE_ROOT / "flan_t5_response_generator")


## Train T5 for Question‑Answer

In [None]:
# Train or load T5 QA Model
qa_model_path = Path(model_paths["t5_qa"])

try:
    if qa_model_path.is_dir() and any(qa_model_path.iterdir()):
        print("Loading previously fine-tuned T5 QA model...")
        t5_qa_model = T5ForConditionalGeneration.from_pretrained(qa_model_path).to(device)
        t5_qa_tokenizer = AutoTokenizer.from_pretrained(qa_model_path)
    else:
        raise FileNotFoundError
except Exception:
    print("Loading base T5 QA model (t5-base)...")
    t5_qa_model = T5ForConditionalGeneration.from_pretrained("t5-base").to(device)
    t5_qa_tokenizer = AutoTokenizer.from_pretrained("t5-base")


In [None]:
# Build QA pairs: "question: <text>" -> answer.
def build_qa_pairs(example):
    # Retrieve the question from 'question' (or 'text') and answer from 'answer' (or 'response').
    question = example.get("question") or example.get("text") or ""
    answer = example.get("answer") or example.get("response") or ""
    example["input_text"] = "question: " + question
    example["target_text"] = answer
    return example

In [None]:
# Load T5 QA Model & Tokenizer from metadata.
qa_model = T5ForConditionalGeneration.from_pretrained(model_paths["t5_qa"]).to(device)
qa_tokenizer = AutoTokenizer.from_pretrained(model_paths["t5_qa"])

In [None]:
def qa_tokenize(batch):
    # Ensure 'context' exists — fallback to empty strings
    context = batch["context"] if "context" in batch else [""] * len(batch["question"])

    # Tokenize inputs
    inputs = qa_tokenizer(
        batch["question"],
        context,
        padding="max_length",
        truncation=True,
        max_length=64
    )

    # Tokenize labels
    with qa_tokenizer.as_target_tokenizer():
        labels = qa_tokenizer(
            batch["answer"],
            padding="max_length",
            truncation=True,
            max_length=64
        )

    inputs["labels"] = labels["input_ids"]
    return inputs



In [None]:
qa_train = train_dataset.map(build_qa_pairs)
qa_test  = test_dataset.map(build_qa_pairs)

qa_train_tok = qa_train.map(qa_tokenize, batched=True, remove_columns=qa_train.column_names)
qa_test_tok  = qa_test.map(qa_tokenize, batched=True, remove_columns=qa_test.column_names)

qa_train_tok.set_format("torch")
qa_test_tok.set_format("torch")


In [None]:
# # Map the QA pair-building function onto the training and testing datasets.
# qa_train = train_dataset.map(build_qa_pairs)
# qa_test  = test_dataset.map(build_qa_pairs)

In [None]:
# qa_train_tok = qa_train.map(qa_tokenize, batched=True, remove_columns=qa_train.column_names)
# qa_test_tok  = qa_test.map(qa_tokenize, batched=True, remove_columns=qa_test.column_names)

# # Set the format for PyTorch tensors.
# qa_train_tok.set_format("torch")
# qa_test_tok.set_format("torch")

In [None]:
rouge = evaluate.load("rouge")

def compute_metrics(eval_preds):
    preds, labels = eval_preds

    # Decode predictions and references
    decoded_preds = t5_qa_tokenizer.batch_decode(preds, skip_special_tokens=True)
    decoded_labels = t5_qa_tokenizer.batch_decode(labels, skip_special_tokens=True)

    decoded_preds = [pred.strip() for pred in decoded_preds]
    decoded_labels = [label.strip() for label in decoded_labels]

    # Compute ROUGE
    result = rouge.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)

    return {
        "rougeL": result["rougeL"]
    }

In [None]:
# Set up training arguments
qa_training_args = Seq2SeqTrainingArguments(
    output_dir="./saved_models/t5_qa",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-4,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    predict_with_generate=False,
    logging_dir="./logs",
    logging_strategy="steps",
    logging_steps=10,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="rougeL"
)

# Assert to ensure tokenized data is defined
assert 'qa_train_tok' in globals() and 'qa_test_tok' in globals(), "Tokenized QA data is not defined."

# Create the Seq2SeqTrainer instance
qa_trainer = Seq2SeqTrainer(
    model=t5_qa_model,
    args=qa_training_args,
    train_dataset=qa_train_tok,
    eval_dataset=qa_test_tok,
    tokenizer=t5_qa_tokenizer,
    data_collator=DataCollatorForSeq2Seq(tokenizer=t5_qa_tokenizer, model=t5_qa_model),
    compute_metrics=compute_metrics
)

qa_trainer.train()
t5_qa_model.save_pretrained("./saved_models/t5_qa")

In [None]:
# Before training, check if a fine-tuned model exists and load it.

if os.path.isdir(SAVE_ROOT / "t5_qa"):
    print("Loading previously fine-tuned T5 QA model...")
    qa_model = T5ForConditionalGeneration.from_pretrained(SAVE_ROOT / "t5_qa").to(device)
else:
    print("Loading base T5 QA model...")
    qa_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

In [None]:

# Train T5 for Question‑Answer

print("Training T5 for the Question‑Answer task.")

# Load (or initialize) the QA model and tokenizer.
# Here we fine-tune the base T5 model (t5-small) for QA.
qa_model = T5ForConditionalGeneration.from_pretrained(t5_resp_model_name).to(device)
# Note: We're reusing tokenizer_t5 from the response generation section.

# Create a DataCollator for Seq2Seq tasks.
qa_collator = DataCollatorForSeq2Seq(
    tokenizer=tokenizer_t5,
    model=qa_model,
    padding=True
)

In [None]:
# Save the trained QA model and tokenizer

(SAVE_ROOT / "t5_qa").mkdir(exist_ok=True, parents=True)
qa_model.save_pretrained(SAVE_ROOT / "t5_qa")
tokenizer_t5.save_pretrained(SAVE_ROOT / "t5_qa")
qa_trainer.save_model()
tokenizer_t5.save_pretrained(qa_training_args.output_dir)

print("T5 QA model and tokenizer saved to", SAVE_ROOT / "t5_qa")

## Model Deployment Setup with Combined Metadata

In [None]:

# Save Combined Model Metadata

metadata = {
    "emotion_classifier": str(SAVE_ROOT / "emotion_classifier"),
    "flan_t5_response_generator": str(SAVE_ROOT / "flan_t5_response_generator"), 
    "t5_qa": str(SAVE_ROOT / "t5_qa")
}

metadata_path = SAVE_ROOT / "combined_model_metadata.pt"
torch.save(metadata, metadata_path)
print("Saved combined model metadata to:", metadata_path)



In [None]:

# Load Models Using Combined Metadata

metadata_path = SAVE_ROOT / "combined_model_metadata.pt"

if metadata_path.exists():
    model_paths = torch.load(map_location=device, metadata_path)
    print("Loaded model metadata:", model_paths)
else:
    raise FileNotFoundError(f"Metadata file not found at {metadata_path}. Please ensure it exists.")

# Load Emotion Classification Model & Tokenizer from metadata
emo_model = AutoModelForSequenceClassification.from_pretrained(model_paths["emotion_classifier"]).to(device)
emo_tokenizer = AutoTokenizer.from_pretrained(model_paths["emotion_classifier"])






print("All models loaded from metadata.")


## Unified Pipeline & Gradio for the Mental Health Chatbot

In [None]:

# Unified Pipeline & Gradio for the Mental Health Chatbot


from transformers import AutoTokenizer, AutoModelForSequenceClassification, T5ForConditionalGeneration

# ---- Load Fine‑Tuned Models Using Combined Metadata ----
# Define the path to the combined metadata file.
metadata_path = SAVE_ROOT / "combined_model_metadata.pt"

if metadata_path.exists():
    model_paths = torch.load(map_location=device, metadata_path)
    print("Loaded model metadata:", model_paths)
else:
    raise FileNotFoundError(f"Metadata file not found at {metadata_path}. Please ensure it exists.")

In [None]:
# Load Emotion Classification Model & Tokenizer from metadata.
emo_model = AutoModelForSequenceClassification.from_pretrained(model_paths["emotion_classifier"]).to(device)
emo_tokenizer = AutoTokenizer.from_pretrained(model_paths["emotion_classifier"])
emo_model.eval()  # Set emotion classifier to evaluation mode

# Load FLAN-T5 Response Generation Model & Tokenizer from metadata.
resp_model = T5ForConditionalGeneration.from_pretrained(model_paths["flan_t5_response_generator"]).to(device)
resp_tokenizer = AutoTokenizer.from_pretrained(model_paths["flan_t5_response_generator"])


In [None]:


# ---- Define Emotion Labels ----
# These labels match those used in our emotion annotation step.
DEFAULT_LABELS = [
    'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity',
    'desire', 'disappointment', 'disapproval', 'embarrassment', 'excitement', 'fear', 'gratitude',
    'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief', 'remorse',
    'sadness', 'surprise', 'neutral'
]
NUM_EMO_LABELS = emo_model.config.num_labels
EMOTION_LABELS = DEFAULT_LABELS[:NUM_EMO_LABELS]

In [None]:
# Optionally, define a subset for emotion-based routing.
emotion_router_labels = {'confusion', 'caring', 'nervousness', 'grief', 'sadness', 'fear', 'remorse', 'love', 'anger'}

In [None]:
# ---- Helper Functions ----

def detect_emotions(text):
    """
    Detects emotions in the provided text using the fine‑tuned emotion classifier.
    Returns a list of emotion labels whose corresponding probabilities exceed 0.3.
    """
    inputs = emo_tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(emo_model.device)
    with torch.no_grad():
        logits = emo_model(**inputs).logits
    probs = torch.sigmoid(logits).cpu().numpy()[0]
    detected = [EMOTION_LABELS[i] for i, p in enumerate(probs) if p > 0.3]
    return detected if detected else ["neutral"]

In [None]:
def format_input_prompt(user_input, language="English", history=None, emotions=None):
    emotion_note = ""
    if emotions and emotions != ["neutral"]:
        emotion_note = f"The user seems to feel {' and '.join(emotions)}. "
    if history:
        combined = "\n".join(history + [user_input])
        return (f"respond: {emotion_note}The conversation so far:\n{combined}")
    return (f"respond: {emotion_note}{user_input}")

In [None]:
# Unified Chatbot Pipeline Class

class MentalHealthChatbotPipeline:
    def __init__(self, labels, device="cpu"):
        self.device = device
        self.labels = labels
        self.chat_history = []  
        
        # Ensure models are in evaluation mode.
        self.emo_model = emo_model.eval()
        self.qa_model = qa_model.eval()
        self.resp_model = resp_model.eval()

    def __call__(self, text, max_length=64):
        """
        Processes user text, detects emotions, routes the input to the appropriate model,
        and returns a dictionary with detected emotions, the reply, and the updated conversation history.
        """
        self.chat_history.append(("User", text))
        
        # Emotion Detection
        emotions = detect_emotions(text)
        
        # Model Selection Logic
        # Simple decision: if the input contains a question mark, use the QA model; otherwise, use the response generation model.
        if "?" in text:
            model, tokenizer = self.qa_model, qa_tokenizer
        else:
            model, tokenizer = self.resp_model, resp_tokenizer

        # Generate Response
        # (Optionally, you could use format_input_prompt to incorporate history.)
        inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(self.device)
        with torch.no_grad():
            output_ids = model.generate(**inputs, max_length=max_length)
        reply = tokenizer.decode(output_ids[0], skip_special_tokens=True)

        self.chat_history.append(("Bot", reply))
        
        return {"Detected Emotions": emotions, "Response": reply, "History": self.chat_history}

In [None]:
def generate_chatbot_response(user_text, audio_input, mode, language, use_history, history, route_by_emotion, persist):
    history = history or []
    user_input = user_text if mode == "text" else transcribe_audio(audio_input)
    emotions = detect_emotions(user_input)

    use_resp_model = any(e in emotion_router_labels for e in emotions) if route_by_emotion else False

    if use_resp_model:
        prompt = format_input_prompt(
            user_input,
            language=language,
            history=history if use_history else None,
            emotions=emotions
        )
        inputs = resp_tokenizer(prompt, return_tensors="pt", truncation=True, padding=True).to(resp_model.device)
        model, tokenizer = resp_model, resp_tokenizer
    else:
        prompt = "question: " + user_input
        inputs = qa_tokenizer(prompt, return_tensors="pt", truncation=True, padding=True).to(qa_model.device)
        model, tokenizer = qa_model, qa_tokenizer

    with torch.no_grad():
        output_ids = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=64,
            num_beams=4,
            no_repeat_ngram_size=2,
            early_stopping=True
        )

    response = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    full_history = history + [f"User: {user_input}", f"Bot: {response}"]

    if persist:
        with open("chatlog.txt", "a", encoding="utf-8") as log:
            log.write(f"\n[{datetime.datetime.now()}]\n{full_history[-2]}\n{full_history[-1]}\nDetected emotions: {emotions}\n")

    return response, ", ".join(emotions), full_history

In [None]:
# Instantiate the chatbot pipeline.
chatbot = MentalHealthChatbotPipeline(labels=EMOTION_LABELS, device=device)


## Gradio Interface for the Mental Health Chatbot (for testing)

In [None]:

# Full-Functionality Gradio Interface for the Mental Health Chatbot


# ---- Load Fine-Tuned Models Using Combined Metadata ----
metadata_path = SAVE_ROOT / "combined_model_metadata.pt"

if metadata_path.exists():
    model_paths = torch.load(map_location=device, metadata_path)
    print("Loaded model metadata:", model_paths)
else:
    raise FileNotFoundError(f"Metadata file not found at {metadata_path}. Please ensure it exists.")

In [None]:
# Load Response Generation Model & Tokenizer
from peft import get_peft_model, LoraConfig, TaskType

resp_tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-large")
base_model = T5ForConditionalGeneration.from_pretrained("google/flan-t5-large").to(device)

# Apply LoRA
lora_config = LoraConfig(
    r=8,
    lora_alpha=16,
    target_modules=["q", "v"],  # This may vary depending on architecture
    lora_dropout=0.1,
    bias="none",
    task_type=TaskType.SEQ_2_SEQ_LM
)

resp_model = get_peft_model(base_model, lora_config)
resp_model.print_trainable_parameters()  # Optional: view trainable params

In [None]:
# Define Emotion Labels
# Define the complete list of 28 emotion labels (as in the GoEmotions baseline)
DEFAULT_LABELS = [
    'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity',
    'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear',
    'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief',
    'remorse', 'sadness', 'surprise', 'neutral'
]

In [None]:
NUM_EMO_LABELS = emo_model.config.num_labels
EMOTION_LABELS = DEFAULT_LABELS[:NUM_EMO_LABELS]

# Define a subset of emotions for routing decisions (if needed)
emotion_router_labels = set(EMOTION_LABELS) & {'confusion', 'caring', 'nervousness', 'grief', 'sadness', 'fear', 'remorse', 'love', 'anger'}

In [None]:
def format_input_prompt(user_input, language="English", history=None, emotions=None):
    emotion_note = ""
    if emotions and emotions != ["neutral"]:
        emotion_note = f"The user seems to feel {' and '.join(emotions)}. "
    if history:
        combined = "\n".join(history + [user_input])
        return (f"respond: {emotion_note}The conversation so far:\n{combined}")
    return (f"respond: {emotion_note}{user_input}")

In [None]:
def detect_emotions(text):
    """
    Detects emotions in the provided text using the fine‑tuned emotion classifier.
    Returns a list of emotion labels whose corresponding probabilities exceed 0.3.
    """
    inputs = emo_tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(emo_model.device)
    with torch.no_grad():
        logits = emo_model(**inputs).logits
    probs = torch.sigmoid(logits).cpu().numpy()[0]
    
    # Safety check: trim probabilities if there are more than expected.
    if len(probs) > len(EMOTION_LABELS):
        print(f"Warning: Received {len(probs)} probabilities; expected {len(EMOTION_LABELS)}. Trimming extra values.")
        probs = probs[:len(EMOTION_LABELS)]
    
    detected = [EMOTION_LABELS[i] for i, p in enumerate(probs) if p > 0.3]
    return detected if detected else ["neutral"]

In [None]:
def transcribe_audio(audio_file):
    """
    Converts an input audio file to text using speech recognition.
    Returns the transcribed text or an error message.
    """
    recognizer = sr.Recognizer()
    audio = AudioSegment.from_file(audio_file)
    with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
        audio.export(tmp.name, format="wav")
        with sr.AudioFile(tmp.name) as source:
            audio_data = recognizer.record(source)
            try:
                return recognizer.recognize_google(audio_data)
            except sr.UnknownValueError:
                return "[Unrecognized speech]"
            except sr.RequestError:
                return "[Speech recognition failed]"

In [None]:
def generate_chatbot_response(user_text, audio_input, mode, language, use_history, history, route_by_emotion, persist):
    history = history or []
    user_input = user_text if mode == "text" else transcribe_audio(audio_input)
    emotions = detect_emotions(user_input)

    use_resp_model = any(e in emotion_router_labels for e in emotions) if route_by_emotion else False

    if use_resp_model:
        prompt = format_input_prompt(
            user_input,
            language=language,
            history=history if use_history else None,
            emotions=emotions
        )
        inputs = resp_tokenizer(prompt, return_tensors="pt", truncation=True, padding=True).to(resp_model.device)
        model, tokenizer = resp_model, resp_tokenizer
    else:
        prompt = "question: " + user_input
        inputs = qa_tokenizer(prompt, return_tensors="pt", truncation=True, padding=True).to(qa_model.device)
        model, tokenizer = qa_model, qa_tokenizer

    with torch.no_grad():
        output_ids = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_length=64,
            num_beams=4,
            no_repeat_ngram_size=2,
            early_stopping=True
        )

    response = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    full_history = history + [f"User: {user_input}", f"Bot: {response}"]

    if persist:
        with open("chatlog.txt", "a", encoding="utf-8") as log:
            log.write(f"\n[{datetime.datetime.now()}]\n{full_history[-2]}\n{full_history[-1]}\nDetected emotions: {emotions}\n")

    return response, ", ".join(emotions), full_history

In [None]:
# Build the Gradio Interface

demo = gr.Interface(
    fn=generate_chatbot_response,
    inputs=[
        gr.Textbox(label="Type your message here (if using text mode)"),
        gr.Audio(type="filepath", label="Or speak here (if using voice mode)"),
        gr.Radio(["text", "voice"], value="text", label="Input Mode"),
        gr.Dropdown(choices=["English", "German", "Spanish", "French"], value="English", label="Response Language"),
        gr.Checkbox(label="Include chat history in response", value=True),
        gr.State(value=[]),
        gr.Checkbox(label="Route by detected emotion", value=True),
        gr.Checkbox(label="Save conversation to chatlog.txt", value=True)
    ],
    outputs=[
        gr.Textbox(label="Therapist Response"),
        gr.Textbox(label="Detected Emotions"),
        gr.State()
    ],
    title="Voice + Text Enabled Emotion-Aware Mental Health Chatbot",
    description="You can type or speak your message. Emotion-aware routing decides between Q&A and therapist-style support."
)

# Launch the interface.
demo.launch()

In [None]:
# # Streamlit Interface
# st.title("Voice + Text Enabled Emotion-Aware Mental Health Chatbot")
# st.write("You can type or speak your message. Emotion-aware routing decides between Q&A and therapist-style support.")

# # Initialize conversation history
# history = []

# # Input mode
# mode = st.radio("Input Mode", ["text", "voice"])

# # Language selection
# language = st.selectbox("Response Language", ["English", "German", "Spanish", "French"])

# # Include chat history in response
# use_history = st.checkbox("Include chat history in response")

# # Route by detected emotion
# route_by_emotion = st.checkbox("Route by detected emotion")

# # Save conversation to chatlog.txt
# persist = st.checkbox("Save conversation to chatlog.txt")

# # Generate chatbot response
# if mode == "text":
#     user_input = st.text_input("Type your message here")
# else:
#     audio_input = st.file_uploader("Or speak here", type=["wav", "mp3", "ogg"])

# if st.button("Generate Response"):
#     if mode == "text":
#         response, emotions, history = generate_chatbot_response(user_input, None, mode, language, use_history, history, route_by_emotion, persist)
#     else:
#         response, emotions, history = generate_chatbot_response(None, audio_input, mode, language, use_history, history, route_by_emotion, persist)

#     st.write(f"Therapist Response: {response}")
#     st.write(f"Detected Emotions: {emotions}")

## Metrics and Evaluation

In [None]:

# Model Evaluation Cell


### Evaluation for Emotion Classification ###

def evaluate_emotion_classifier(model, tokenizer, dataset, batch_size=16):
    """
    Evaluate the emotion classifier over the provided dataset.
    Computes micro-averaged F1, Precision, Recall, and Subset Accuracy.
    Assumes that dataset is formatted with columns "input_ids", "attention_mask", "labels"
    and that labels is a multi-label binary vector.
    """
    model.eval()
    all_preds = []
    all_labels = []
    
    # Create a DataLoader for batch processing (if dataset is not huge, you can loop through it directly)
    dataloader = DataLoader(dataset, batch_size=batch_size)
    
    for batch in dataloader:
        # Move inputs and labels to device
        inputs = {k: v.to(device) for k, v in batch.items() if k != "labels"}
        labels = batch["labels"].numpy()
        
        with torch.no_grad():
            logits = model(**inputs).logits
        # Apply sigmoid for multi-label classification and threshold at 0.3
        preds = (torch.sigmoid(logits) > 0.3).cpu().numpy().astype(int)
        
        all_preds.append(preds)
        all_labels.append(labels)
    
    all_preds = np.concatenate(all_preds, axis=0)
    all_labels = np.concatenate(all_labels, axis=0)
    
    # Compute micro-averaged metrics
    micro_f1 = f1_score(all_labels, all_preds, average="micro", zero_division=0)
    micro_precision = precision_score(all_labels, all_preds, average="micro", zero_division=0)
    micro_recall = recall_score(all_labels, all_preds, average="micro", zero_division=0)
    subset_acc = accuracy_score(all_labels, all_preds)  # subset accuracy is strict
    
    return {
        "Emotion Classifier Micro-F1": micro_f1,
        "Emotion Classifier Micro-Precision": micro_precision,
        "Emotion Classifier Micro-Recall": micro_recall,
        "Emotion Classifier Subset Accuracy": subset_acc,
    }

In [None]:
print("Evaluating Emotion Classifier...")
emo_metrics = evaluate_emotion_classifier(emo_model, emo_tokenizer, emo_test_tok)
for metric, value in emo_metrics.items():
    print(f"{metric}: {value:.4f}")

In [None]:
# Evaluation for Generation Models (Response Generation and QA)

# We already defined a compute_resp_metrics function in the training cells.
# Here, we define a helper to compute additional perplexity based on the evaluation loss.

def evaluate_generation_model(trainer, test_dataset):
    """
    Uses the Seq2SeqTrainer to compute evaluation metrics over the given test dataset.
    Adds perplexity (exp(eval_loss)) to the standard metrics.
    """
    # Predict returns a dictionary with metrics: eval_loss, and any metrics computed in compute_metrics.
    result = trainer.predict(test_dataset)
    eval_loss = result.metrics.get("eval_loss")
    
    # Compute perplexity if loss is available. (If eval_loss is zero or not available, perplexity is undefined.)
    if eval_loss is not None and eval_loss > 0:
        perplexity = math.exp(eval_loss)
    else:
        perplexity = float("inf")
    
    # Add perplexity to the metrics dictionary.
    result.metrics["perplexity"] = perplexity
    return result.metrics

In [None]:
print("\nEvaluating T5 Response Generation Model...")
resp_metrics = evaluate_generation_model(trainer_resp, resp_test_tok)
for metric, value in resp_metrics.items():
    print(f"T5 Response Generation {metric}: {value:.4f}")

print("\nEvaluating T5 QA Model...")
qa_metrics = evaluate_generation_model(trainer_qa, qa_test_tok)
for metric, value in qa_metrics.items():
    print(f"T5 QA {metric}: {value:.4f}")

In [None]:
# Additionally, you can evaluate other metrics such as ROUGE and BERTScore separately using the `evaluate` library,
# if desired. For example:

rouge_metric = evaluate.load("rouge")
bertscore_metric = evaluate.load("bertscore")

def compute_generation_metrics(trainer, test_dataset, tokenizer):
    result = trainer.predict(test_dataset)
    predictions, labels = result.predictions, result.label_ids
    # Replace -100 in labels by the tokenizer pad id.
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    
    r = rouge_metric.compute(predictions=decoded_preds, references=decoded_labels, use_stemmer=True)
    b = bertscore_metric.compute(predictions=decoded_preds, references=decoded_labels, lang="en")
    
    # Here, we only extract the ROUGE-L and average BERTScore F1.
    metrics = {
        "rougeL": r["rougeL"],
        "bertscore_f1": np.mean(b["f1"])
    }
    return metrics

In [None]:
print("\nAdditional Generation Metrics for T5 Response Generation:")
additional_resp_metrics = compute_generation_metrics(trainer_resp, resp_test_tok, tokenizer_t5)
for metric, value in additional_resp_metrics.items():
    print(f"T5 Response Generation {metric}: {value:.4f}")

print("\nAdditional Generation Metrics for T5 QA Model:")
additional_qa_metrics = compute_generation_metrics(trainer_qa, qa_test_tok, tokenizer_t5)
for metric, value in additional_qa_metrics.items():
    print(f"T5 QA {metric}: {value:.4f}")