In [None]:
import os
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback, get_scheduler
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight

In [None]:
df = pd.read_csv("/kaggle/working/ML_week_SA_drugs/data/drug_review_train.csv")
df_test = pd.read_csv("/kaggle/working/ML_week_SA_drugs/data/drug_review_validation.csv")
### mapping ratings to sentiment categories
def map_rating_to_sentiment(rating):
    if 8 <= rating <= 10:
        return 'positive'
    elif 5 <= rating <= 7:
        return 'neutral'
    elif 1 <= rating <= 4:
        return 'negative'


df['rating_category'] = df['rating'].apply(map_rating_to_sentiment)
df_test['rating_category'] = df_test['rating'].apply(map_rating_to_sentiment)

In [None]:
checkpoint_path = "/kaggle/input/biobert-084" 

best_learning_rate = 1.0935406790014464e-05 
best_num_epochs = 9 
best_batch_size = 8
# Map sentiments to numeric values
label_mapping = {"positive": 2, "neutral": 1, "negative": 0}
df["rating_category"] = df["rating_category"].map(label_mapping)
df_test["rating_category"] = df_test["rating_category"].map(label_mapping)
# Extract test data
X_full = df["review"]
y_full = df["rating_category"]
X_test = df_test["review"]
y_test = df_test["rating_category"]
# Load Tokenizer
tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)

# Function to Tokenize Text
def tokenize_function(texts):
    return tokenizer(
        texts,
        padding="max_length",
        truncation=True,
        max_length=256,
        return_tensors="pt"
    )

# Tokenize training data
train_encodings = tokenize_function(X_full.tolist())

# Tokenize test data
test_encodings = tokenize_function(X_test.tolist())

In [None]:
class SentimentDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = {key: val.numpy() for key, val in encodings.items()}
        self.labels = labels.reset_index(drop=True).astype(int).to_numpy()

        # Ensure dataset sizes match
        self.size = min(len(self.encodings["input_ids"]), len(self.labels))

    def __getitem__(self, idx):
        if idx >= self.size:  # Prevent out-of-bounds errors
            raise IndexError(f"Index {idx} out of bounds for dataset of size {self.size}")

        item = {key: torch.tensor(val[idx], dtype=torch.long) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(int(self.labels[idx]), dtype=torch.long)
        return item

    def __len__(self):
        return self.size  # Use the minimum size to prevent mismatches

# Create datasets
train_dataset = SentimentDataset(train_encodings, y_full)
test_dataset = SentimentDataset(test_encodings, y_test)

# ===================== Load Model & Compute Class Weights =====================
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path, num_labels=3)

# Compute Class Weights for Imbalanced Data
y_full_np = y_full.to_numpy()
class_weights = compute_class_weight(class_weight="balanced", classes=np.unique(y_full_np), y=y_full_np)
class_weights = torch.tensor(class_weights, dtype=torch.float)

In [None]:
# ===================== Custom Trainer (Adds Weighted Loss & Confusion Matrix) =====================
class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        """
        Custom loss function that applies class weights.
        """
        labels = inputs.pop("labels")  # Extract labels
        outputs = model(**inputs)  # Forward pass
        logits = outputs.logits

        # Apply class weights
        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights.to(logits.device))
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss

    def on_epoch_end(self):
        # Generate Predictions
        predictions = self.predict(test_dataset)
        y_pred = np.argmax(predictions.predictions, axis=1)  # Convert logits to predicted class
        y_true = predictions.label_ids  # True labels

        # Compute Confusion Matrix
        cm = confusion_matrix(y_true, y_pred)

        # Plot Confusion Matrix
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
                    xticklabels=["Negative", "Neutral", "Positive"],
                    yticklabels=["Negative", "Neutral", "Positive"])
        plt.xlabel("Predicted Label")
        plt.ylabel("True Label")
        plt.title(f"Confusion Matrix - Epoch {self.state.epoch}")
        plt.show()

# ===================== Define Metrics Function =====================
def compute_metrics(pred):
    logits, labels = pred
    preds = logits.argmax(-1)  # Convert logits to predicted class labels

    accuracy = accuracy_score(labels, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="macro", zero_division=1)

    return {
        "accuracy": accuracy,
        "f1": f1,
        "precision": precision,
        "recall": recall
    }

# ===================== Hyperparameter Search Function =====================
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(checkpoint_path, num_labels=3)

# ===================== Train Model with Early Stopping, LR Scheduler & Hyperparameter Search =====================
training_args = TrainingArguments(
    output_dir="./final_results",
    evaluation_strategy="steps",  # Evaluate after a set number of steps instead of every epoch
    save_strategy="steps",  # Save model checkpoints at regular intervals
    eval_steps=1000,  # Evaluate every 1000 steps (adjust based on dataset size)
    save_steps=1000,  # Save checkpoints every 1000 steps
    save_total_limit=3,  # Keep only the last 3 checkpoints to save storage
    learning_rate=best_learning_rate,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=6,
    weight_decay=0.01,
    logging_dir="./final_logs",
    logging_steps=500,  # Log loss and metrics every 500 steps
    load_best_model_at_end=True,  # Ensures we use the best checkpoint
    metric_for_best_model="f1",  # Save the model that gives the best F1-score
    greater_is_better=True,  # Higher F1-score is better
    report_to="none",  # Disable external tracking (e.g., WandB)
    fp16=True,  # Use mixed precision for faster training
)

In [None]:
# Initialize final trainer
final_trainer = CustomTrainer(
    model_init=model_init,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,  # Separate test dataset for evaluation
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]  # Stops if no improvement for 2 evaluations
)

# Train on full dataset
final_trainer.train()

# Save final trained model
model.save_pretrained("./final_biobert_model")
tokenizer.save_pretrained("./final_biobert_model")

# =====================  Run One Final Evaluation  =====================
print("Final Evaluation on Full Training Set...")
evaluation_results = final_trainer.evaluate(eval_dataset=train_dataset)  # Manual evaluation
print("Final Training Dataset Evaluation Results:", evaluation_results)