In [None]:
import torch
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, TensorDataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import AdamW
import os
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
torch.manual_seed(42)
np.random.seed(42)

In [None]:
from google.colab import drive

drive.mount('/content/drive')

# Specify the file path in your Google Drive
file_path = '/content/drive/StonyBrook/Conversational MBTI Classifier/Train.csv' 


In [None]:
BATCH_SIZE = 16
EPOCHS = 3

TEST_PATH = "testdata.csv"
TRAIN_PATH= "traindata.csv"
EVAL_PATH = "evaldata.csv"
SAVE_PATH= "data/Save"

In [None]:
def import_data(path):
    dataset = pd.read_csv(path)
    return dataset
train_data = import_data(TRAIN_PATH)
test_data = import_data(TEST_PATH)
eval_data = import_data(EVAL_PATH)

In [None]:
class Big5Classifier:
    def __init__(self, model_name='distilbert-base-uncased', num_classes=2):
        # Initialize models here
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels = num_classes)

    def get_tokenizer_and_model(self):
        return self.model, self.tokenizer  

In [None]:
class DatasetLoader(Dataset):
    #cateogry can be "OPN", "CON", "EXT", "AGR", "NEU"
    def __init__(self, data, category, tokenizer):
        self.data = data
        self.tokenizer = tokenizer
        self.category = category
        self.questionMap =  {"OPN":"This is a question about openness: Describe a time when you tried something completely new—whether it was a different activity, way of thinking, or environment. What motivated you to try it, and how did you feel about the experience afterward? ",
                "CON":"This is a question about conscientiousness: Think of a goal you set for yourself that required sustained effort over time. How did you manage your time and resources to stay on track, and what strategies helped you stay committed, even when challenges came up? What did you find challenging or rewarding about the experience?",
                "EXT":"This is a question about extraversion: Recall a memorable social experience that either energized you or left you feeling drained. What do you think made the interaction fulfilling or draining? How did it shape your understanding of your social preferences or needs? ",
                "AGR":"This is a question about agreeableness: Describe a situation where you found yourself in disagreement with someone. How did you handle the situation, and what were your priorities in resolving or understanding the conflict? ",
                "NEU":"This is a question about neuroticism: Think of a time when you felt particularly stressed or anxious. How did you respond initially, and what steps did you take to manage your emotions and approach the situation constructively? "
               }
    
    def tokenize_data(self):
        print("Processing data")
        tokens = []
        labels = []
        label_dict = {'y': 1, 'n': 0}

        answers = self.data['essay'].to_list() 
        label_list = self.data['decision'].to_list()

        for (answer, decision) in tqdm(zip(answer, label_list), total = len(answers)):
            #concatenate the question asked before the answer.
            qa_concat = f"{self.questionMap[self.category]} [SEP] {answer} "
            encoded_text = self.tokenizer.encode(qa_concat, max_length = 400, truncation = True)
            tokens.append(torch.tensor(encoded_text))
            labels.append(label_dict)[decision]
        
        tokens = pad_sequence(tokens, batch_first = True)
        labels = torch.tensor(labels)
        return TensorDataset(tokens, labels)
    
    def get_data_loaders(self, batch_size = 32, shuffle = True):
        processed_dataset = self.tokenize_data()

        data_loader = DataLoader(
            processed_dataset,
            shuffle=shuffle,
            batch_size=batch_size
        )

        return data_loader

In [None]:
class Trainer:
    def __init__(self, model, train_data, eval_data, batch_size, epochs, optimizer, criterion, device, savepath, training_type):
        """
        Initializes the Trainer class with training and evaluation parameters.
        Args:
        - model: PyTorch model to be trained (e.g., DistilBERT).
        - train_data: Training dataset.
        - eval_data: Evaluation dataset.
        - batch_size: Batch size for DataLoader.
        - epochs: Number of training epochs.
        - optimizer: Optimizer (e.g., Adam).
        - criterion: Loss function (e.g., CrossEntropyLoss).
        - device: Device to run the model on (e.g., 'cuda' or 'cpu').
        - savepath: Path to save the model after training.
        - training_type: Type of training ("fully_frozen", "top_4_training", "bottom_4_training", "all_training").
        """
        self.model = model
        self.batch_size = batch_size
        self.epochs = epochs
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device
        self.savepath = savepath
        self.training_type = training_type

        # Create DataLoaders from datasets
        self.train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
        self.eval_dataloader = DataLoader(eval_data, batch_size=batch_size, shuffle=False)

        # Send model to device
        self.model.to(self.device)
        
        # Set the training parameters based on the training type
        self.set_training_parameters()

    def set_training_parameters(self):
        """
        Freezes or unfreezes model parameters based on the specified training type.
        """
        # Freeze all parameters by default
        for param in self.model.parameters():
            param.requires_grad = False

        if self.training_type == "fully_frozen":
            # Only the classifier head is trainable
            for param in self.model.classifier.parameters():
                param.requires_grad = True
        elif self.training_type == "top_4_training":
            # Unfreeze the top 4 layers of the transformer and the classifier
            for layer in range(2, 6):  # Loop through top 4 layers
                for param in self.model.distilbert.transformer.layer[layer].parameters():
                    param.requires_grad = True
            for param in self.model.classifier.parameters():
                param.requires_grad = True
        elif self.training_type == "bottom_4_training":
            # Unfreeze the bottom 4 layers of the transformer and the classifier
            for layer in range(0, 4):  # Loop through bottom 4 layers
                for param in self.model.distilbert.transformer.layer[layer].parameters():
                    param.requires_grad = True
            for param in self.model.classifier.parameters():
                param.requires_grad = True
        elif self.training_type == "all_training":
            # Unfreeze all layers
            for param in self.model.parameters():
                param.requires_grad = True

        print(f"Training type set to: {self.training_type}")

    def get_performance_metrics(self, preds, labels):
        """
        Calculate performance metrics: accuracy, precision, recall, and F1-score.
        Args:
        - preds: Predictions from the model.
        - labels: Ground truth labels.
        Returns:
        - metrics: Dictionary containing accuracy, precision, recall, and F1-score.
        """
        pred_flat = np.argmax(preds, axis=1).flatten()
        labels_flat = labels.flatten()

        metrics = {
            "accuracy": accuracy_score(labels_flat, pred_flat),
            "precision": precision_score(labels_flat, pred_flat, zero_division=0, average="weighted"),
            "recall": recall_score(labels_flat, pred_flat, zero_division=0, average="weighted"),
            "f1": f1_score(labels_flat, pred_flat, zero_division=0, average="weighted")
        }
        return metrics

    def train(self):
        """
        Trains the model and evaluates on validation data after each epoch.
        """
        for epoch in range(self.epochs):
            print(f"Epoch {epoch + 1}/{self.epochs}")
            self.model.train()  # Set model to training mode
            total_loss = 0
            all_preds = []
            all_labels = []

            for batch in self.train_dataloader:
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["labels"].to(self.device)

                # Forward pass
                self.optimizer.zero_grad()
                outputs = self.model(input_ids, attention_mask)
                logits = outputs[0]  # Assuming logits are the first output

                # Calculate loss
                loss = self.criterion(logits, labels)
                total_loss += loss.item()

                # Backward pass and optimization
                loss.backward()
                self.optimizer.step()

                # Collect predictions and labels for metrics
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                labels = labels.cpu().numpy()

                all_preds.extend(preds)
                all_labels.extend(labels)

            avg_loss = total_loss / len(self.train_dataloader)
            print(f"Training Loss: {avg_loss:.4f}")

            # Calculate and print training metrics
            metrics = self.get_performance_metrics(np.array(all_preds), np.array(all_labels))
            print(f"Training Metrics: {metrics}")

            # Evaluate on validation set
            print("Evaluating on validation set...")
            self.evaluate()

            # Save model (optional, can save after every epoch or after training)
            torch.save(self.model.state_dict(), self.savepath)

    def evaluate(self):
        """
        Evaluates the model on the validation dataset.
        """
        self.model.eval()  # Set the model to evaluation mode
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for batch in self.eval_dataloader:
                input_ids = batch["input_ids"].to(self.device)
                attention_mask = batch["attention_mask"].to(self.device)
                labels = batch["labels"].to(self.device)

                # Forward pass
                outputs = self.model(input_ids, attention_mask)
                logits = outputs[0]  # Assuming logits are the first output

                # Collect predictions and labels for metrics
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                labels = labels.cpu().numpy()

                all_preds.extend(preds)
                all_labels.extend(labels)

        # Calculate and print evaluation metrics
        metrics = self.get_performance_metrics(np.array(all_preds), np.array(all_labels))
        print(f"Evaluation Metrics: {metrics}")



In [None]:
params = {}
params['batch_size']=BATCH_SIZE
params['device'] = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
params['train_data']= 
