# Raghav Marwaha

# E23CSEU1229

# Batch: 41


In [None]:
## CELL 1: SETUP, INSTALLATION, AND DATA LOADING

print("--- Installing Libraries for Personality & Emotion NLP ---")
# Install necessary libraries: BERT/Transformers, PyTorch, metrics, and word2vec (gensim)
!pip install transformers torch numpy pandas scikit-learn tqdm datasets accelerate
!pip install gensim # Required for the Word2Vec task

# --- Drive Mount for Saving Checkpoints ---
from google.colab import drive
import os
drive.mount('/content/drive', force_remount=True)
SAVE_DIR = '/content/drive/MyDrive/NLP_MBTI_Model'
os.makedirs(SAVE_DIR, exist_ok=True)
print(f"Model will be saved to: {SAVE_DIR}")

# --- Imports ---
import pandas as pd
import numpy as np
import torch
from datasets import Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.preprocessing import MultiLabelBinarizer
from gensim.models import Word2Vec
import random

# Set seed for reproducibility
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
set_seed(42)

# --- Load Data (Assumes mbti_1.csv is uploaded) ---
FILE_NAME = 'mbti_1.csv'
try:
    df = pd.read_csv(FILE_NAME)

    # Concatenate all posts into a single text document for each user
    df.columns = ['type', 'posts']
    df['text'] = df['posts'].apply(lambda x: x.replace('|||', ' '))
    df = df.drop(columns=['posts'])

    print(f"Data loaded successfully. Total users: {len(df)}")
except FileNotFoundError:
    print(f"\nFATAL ERROR: Required file '{FILE_NAME}' not found.")
    print("Please manually upload 'mbti_1.csv' to Colab.")
    raise

In [None]:
## CELL 2: MULTI-LABEL ENCODING AND BERT SETUP

# --- Multi-Label Encoding ---
# We treat this as four simultaneous binary classification problems (num_labels=4)
MBTI_AXES = ['I/E', 'N/S', 'T/F', 'J/P']
# Map characters to binary 0 or 1 for training
LABEL_MAP = {'I': 0, 'E': 1, 'N': 0, 'S': 1, 'T': 0, 'F': 1, 'J': 0, 'P': 1}

# Create a tensor of 4 binary integers per user
df['label_tensors'] = df['type'].apply(lambda x: [LABEL_MAP[x[0]], LABEL_MAP[x[1]], LABEL_MAP[x[2]], LABEL_MAP[x[3]]])
labels = np.array(df['label_tensors'].tolist(), dtype=float) # Must be float for BCEWithLogitsLoss

# --- Split Data and Tokenizer Setup ---
MODEL_NAME = 'bert-base-uncased'
MAX_LENGTH = 512 # Longer length for concatenated posts

train_texts, val_texts, train_labels, val_labels = train_test_split(
    df['text'].tolist(), labels, test_size=0.2, random_state=42
)

# Load Tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Load Model for Multi-Label Classification (num_labels=4)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=4)

# --- Custom Dataset and Tokenization ---
class MBTIDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        # Labels must be float for BCEWithLogitsLoss
        self.labels = torch.tensor(labels, dtype=torch.float)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = self.labels[idx]
        return item

    def __len__(self):
        return len(self.labels)

# Tokenize all texts
train_encodings = tokenizer(train_texts, truncation=True, padding=True, max_length=MAX_LENGTH)
val_encodings = tokenizer(val_texts, truncation=True, padding=True, max_length=MAX_LENGTH)

train_dataset = MBTIDataset(train_encodings, train_labels)
val_dataset = MBTIDataset(val_encodings, val_labels)

print(f"Training Samples: {len(train_dataset)}")
print("Multi-Label Encoding and BERT Tokenization complete.")

In [None]:
## CELL 3: TRAINING THE MBTI DETECTION MODEL (FINALIZED LOGGING)

# Set logging verbosity for clearer console output
from transformers.utils import logging
logging.set_verbosity_info()

# --- Custom Multi-Label Metrics ---
def compute_metrics(p):
    # Predictions are logits (unscaled scores), labels are binary floats (0 or 1)
    logits = p.predictions
    labels = p.label_ids

    # Apply sigmoid and convert to binary predictions (0 or 1) using a standard threshold of 0.5
    y_pred = (1 / (1 + np.exp(-logits)) > 0.5).astype(int)

    # Calculate metrics across all 4 axes
    acc = accuracy_score(labels, y_pred)
    f1_micro = f1_score(labels, y_pred, average='micro', zero_division=0)

    # NOTE: The Trainer looks for metrics named 'eval_[metric_for_best_model]'
    return {"accuracy": acc, "f1_micro": f1_micro}

# --- Training Arguments (Load Best Model and Avoid Overfitting) ---
training_args = TrainingArguments(
    output_dir='./mbti_results',
    num_train_epochs=3, # 3 epochs are standard for fine-tuning BERT
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir='./mbti_logs',

    # --- CRITICAL FIX: Ensure logging is visible per epoch ---
    eval_strategy="epoch",
    save_strategy="epoch",

    load_best_model_at_end=True,
    report_to="none",
    metric_for_best_model="f1_micro", # Trainer will optimize for this metric
    greater_is_better=True,
)

# Initialize Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
)

print("--- Starting BERT Fine-tuning for MBTI Classification ---")
# EXECUTE TRAINING: This will now print a table showing validation results after each epoch.
trainer.train()

# --- Save Final Best Model ---
MODEL_SAVE_PATH = os.path.join(SAVE_DIR, "best_mbti_model")
trainer.save_model(MODEL_SAVE_PATH)
tokenizer.save_pretrained(MODEL_SAVE_PATH)

results = trainer.evaluate(val_dataset)

print("\n--- FINAL OPTIMIZED RESULTS ---")
print(f"Accuracy (Total Match): {results['eval_accuracy']:.4f}")
print(f"F1 Micro Score (Per-Axis Match): {results['eval_f1_micro']:.4f}")
print(f"Model saved to Drive at {MODEL_SAVE_PATH}")

In [None]:
## CELL 4: WORD2VEC, TESTING, AND EMOTION/CONTEXT GENERATION

# --- 1. Word2Vec Embedding Analysis (Required Task) ---
print("--- 1. Word2Vec Embedding Analysis ---")
# Preprocess text for Word2Vec (requires tokenized sentences)
tokenized_sentences = [text.split() for text in df['text'].tolist()]

# Train Word2Vec model (Word2Vec is required for task 3)
w2v_model = Word2Vec(tokenized_sentences, vector_size=100, window=5, min_count=5, workers=4)
print("Word2Vec model trained successfully.")

# Example: Find words semantically similar to 'logic' (T) and 'feeling' (F)
print("\nWords closest to 'logic' (Thinking):")
try:
    print(w2v_model.wv.most_similar('logic', topn=5))
except KeyError:
    print("[Error: 'logic' not in vocab. Try 'analysis']")
    print(w2v_model.wv.most_similar('analysis', topn=5))

print("\nWords closest to 'feel' (Feeling):")
w2v_model.wv.most_similar('feel', topn=5)


# --- 2. Personality Prediction Function ---
FINAL_MODEL_PATH = os.path.join(SAVE_DIR, "best_mbti_model")
final_model = AutoModelForSequenceClassification.from_pretrained(FINAL_MODEL_PATH)
final_tokenizer = AutoTokenizer.from_pretrained(FINAL_MODEL_PATH)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
final_model.to(device)
final_model.eval()

def predict_mbti(text):
    encodings = final_tokenizer(text, truncation=True, padding=True, max_length=MAX_LENGTH, return_tensors='pt').to(device)
    with torch.no_grad():
        logits = final_model(**encodings).logits.cpu().numpy().flatten()

    predictions = (1 / (1 + np.exp(-logits)) > 0.5).astype(int)

    axes = ['I/E', 'N/S', 'T/F', 'J/P']
    DECODE_MAP = {0: ['I', 'N', 'T', 'J'], 1: ['E', 'S', 'F', 'P']}

    mbti_type = ""
    traits = {}

    for i, pred in enumerate(predictions):
        letter = DECODE_MAP[pred][i]
        mbti_type += letter
        traits[axes[i]] = letter

    return mbti_type, traits, logits.tolist()


# --- 3. Emotional Context Generation (Simulated Gemini API) ---
async def generate_emotion_context(text, mbti_type):
    """Simulates GPT/Gemini API call for emotional context generation."""

    systemPrompt = f"Act as a psychologist analyzing a social media post from an {mbti_type} personality type. Analyze the text for core emotion, underlying motivation, and linguistic markers. Provide a short, structured analysis."

    userQuery = f"Analyze the following user post: '{text}'"
    apiKey = ""

    apiUrl = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-09-2025:generateContent?key=${apiKey}"

    payload = {
        "contents": [{"parts": [{"text": userQuery}]}],
        "systemInstruction": {"parts": [{"text": systemPrompt}]},
    }

    try:
        # NOTE: Using aiohttp for robust async call simulation
        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.post(apiUrl, json=payload) as response:
                if response.status == 200:
                    result = await response.json()
                    return result['candidates'][0]['content']['parts'][0]['text']
                else:
                    error_detail = await response.text()
                    return f"[API ERROR: Status {response.status}]"
    except Exception as e:
        return f"[REWRITE SIMULATION FAILED: {e}]"


# --- 4. Final Testing and Display ---
test_post = "I struggle to see the purpose of small talk. I spend most of my time analyzing abstract theories and trying to optimize complex systems, but I hate confrontation and prefer to decide things logically."
expected_type = "INTP"

predicted_mbti, traits, logits = predict_mbti(test_post)

print("\n\n#####################################################")
print(f"### MBTI and Emotion Detection Results (SDG 3) ###")
print("#####################################################")
print(f"INPUT POST: {test_post}")
print("-" * 50)
print(f"PREDICTED MBTI TYPE: {predicted_mbti}")
print(f"EXPECTED MBTI TYPE: {expected_type}")
print("-" * 50)
print("TRAIT PREDICTION LIKELIHOOD (Logit Score):")
for axis, letter in traits.items():
    # Find the corresponding logit score for the predicted letter
    # This is a robust way to display the model's confidence for each axis
    index = ['I', 'N', 'T', 'J'].index(letter) if letter in ['I', 'N', 'T', 'J'] else ['E', 'S', 'F', 'P'].index(letter)
    print(f"  {axis} ({letter}): Logit Score = {logits[index]:.4f}")

print("\n--- EMOTIONAL CONTEXT GENERATION (Simulated GPT/Gemini Analysis) ---")
# Execute the async function
import asyncio
import nest_asyncio
nest_asyncio.apply() # Fix for running asyncio in Colab

analysis = asyncio.run(generate_emotion_context(test_post, predicted_mbti))
print(analysis)
print("-" * 50)