<a href="https://colab.research.google.com/github/priyankadas1109/Test/blob/main/Copy_of_Priyanka_Das_HW6a.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install datasets
!pip install evaluate
!pip install scikit-learn
!pip install transformers
!pip install wandb
!pip install emoji

Collecting emoji
  Downloading emoji-2.14.0-py3-none-any.whl.metadata (5.7 kB)
Downloading emoji-2.14.0-py3-none-any.whl (586 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m586.9/586.9 kB[0m [31m28.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: emoji
Successfully installed emoji-2.14.0


In [None]:


import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.utils.class_weight import compute_class_weight
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, DataCollatorWithPadding
from datasets import Dataset, DatasetDict
from evaluate import load as load_metric
import wandb
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import multilabel_confusion_matrix, precision_score, recall_score, f1_score
from torch.utils.data import DataLoader
import emoji
import re  # For regular expressions


In [None]:
# Set file paths
train_path = '/content/drive/MyDrive/data/datasets/train.csv'
test_path = '/content/drive/MyDrive/data/datasets/test.csv'

# Load the data
df_train = pd.read_csv(train_path)
df_test = pd.read_csv(test_path)

In [None]:
# Replace 'NONE' with 0 in emotion columns
label_names = ['anger', 'anticipation', 'disgust', 'fear', 'joy', 'love', 'optimism', 'pessimism', 'sadness', 'surprise', 'trust']
df_train[label_names] = df_train[label_names].replace('NONE', 0).astype(int)
df_test[label_names] = df_test[label_names].replace('NONE', 0).astype(int)

In [None]:
# Encode labels using MultiLabelBinarizer
label_encoder = MultiLabelBinarizer()
df_train['label'] = list(label_encoder.fit_transform(df_train[label_names].values))
df_test['label'] = list(label_encoder.transform(df_test[label_names].values))

In [None]:
# Split data into train and validation sets
train_df, val_df = train_test_split(df_train, test_size=0.2, random_state=42)
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(df_test)

In [None]:
# Model and Tokenizer
model_name = "roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
max_length = 128

In [None]:
# Function to preprocess text
def preprocess_text(text):
  # Expand contractions
  text = re.sub(r"n\'t", " not", text)
  text = re.sub(r"\'re", " are", text)
  # ... other contractions ...

  # Remove unnecessary characters and extra whitespace
  text = re.sub(r"[^\w\s]", "", text)
  text = re.sub(r"\s+", " ", text).strip()

  # Handle URLs and hashtags (you can customize this)
  text = re.sub(r"http\S+", "URL", text)
  text = re.sub(r"#\w+", "HASHTAG", text)

  # Handle emojis
  text = emoji.demojize(text)  # Convert emojis to text descriptions
  text = re.sub(r":[^:]+:", "", text)  # Remove emoji descriptions

  return text

# Tokenization function
def preprocess_function(examples):
    examples["Tweet"] = [preprocess_text(text) for text in examples["Tweet"]]
    return tokenizer(examples["Tweet"], padding="max_length", truncation=True, max_length=max_length)




In [None]:
# Tokenize datasets
train_dataset = train_dataset.map(preprocess_function, batched=True)
val_dataset = val_dataset.map(preprocess_function, batched=True)
test_dataset = test_dataset.map(preprocess_function, batched=True)

In [None]:

# Prepare labels for multi-label classification
def prepare_labels(batch):
    labels = torch.tensor([batch[label] for label in label_names], dtype=torch.float).T
    batch["labels"] = labels
    return batch

train_dataset = train_dataset.map(prepare_labels, remove_columns=label_names)
val_dataset = val_dataset.map(prepare_labels, remove_columns=label_names)
test_dataset = test_dataset.map(prepare_labels, remove_columns=label_names)

Map:   0%|          | 0/7724 [00:00<?, ? examples/s]

Map:   0%|          | 0/3259 [00:00<?, ? examples/s]

Map:   0%|          | 0/7724 [00:00<?, ? examples/s]

Map:   0%|          | 0/3259 [00:00<?, ? examples/s]

In [None]:
# DataCollator for dynamic padding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
# Define the model
model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=len(label_names),
    problem_type="multi_label_classification"
)

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
# Calculate class weights (modified for multi-label)
class_weights = []
for label in label_names:
    # Calculate class weights using sklearn's compute_class_weight
    y = df_train[label].values  # Get labels for this emotion
    classes = np.unique(y)  # Get all unique labels in y
    weights = compute_class_weight(class_weight='balanced', classes=classes, y=y)
    class_weights.append(weights[1] if 1 in classes else 1.0)

class_weights_tensor = torch.tensor(class_weights, dtype=torch.float)

# Define custom Trainer with weighted loss
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # forward pass
        outputs = model(**inputs)
        logits = outputs.logits

        # Apply BCEWithLogitsLoss with pos_weight for each label
        loss_fct = torch.nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor.to(logits.device))
        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
# Training arguments with wandb logging
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="steps",
    save_steps=100,
    eval_steps=100,
    logging_steps=100,
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=6,
    weight_decay=0.01,
    report_to="wandb",
    run_name="roberta_emotion_detection",
    metric_for_best_model="eval_f1_macro",
    greater_is_better=True
)

In [None]:
# Define evaluation metrics
metric = load_metric("f1")
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions
    preds = np.where(preds > 0.5, 1, 0) # convert probabilities to binary labels

    # Convert to the expected format
    predictions = {"predictions": preds.astype(np.int32)}
    references = {"references": labels.astype(np.int32)}

    # Calculate metrics using the converted predictions and references
    # precision, recall, f1, _ = precision_recall_fscore_support(references["references"], predictions["predictions"], average='micro')
    # acc = accuracy_score(references["references"], predictions["predictions"])

    # Get predicted and true labels for each class
    num_classes = predictions["predictions"].shape[1]
    all_true_labels = []
    all_predicted_labels = []

    for i in range(num_classes):
        true_labels = references["references"][:, i]
        predicted_labels = predictions["predictions"][:, i]
        all_true_labels.extend(true_labels)
        all_predicted_labels.extend(predicted_labels)

    # Calculate overall metrics
    precision, recall, f1, _ = precision_recall_fscore_support(all_true_labels, all_predicted_labels, average='micro')
    acc = accuracy_score(all_true_labels, all_predicted_labels)


    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

In [None]:
# Initialize Trainer
trainer = WeightedLossTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics  # Add your compute_metrics function if needed
)

# Train the model
trainer.train()

Step,Training Loss
500,0.6815
1000,0.5411
1500,0.4671
2000,0.4112


TrainOutput(global_step=2415, training_loss=0.4987692128057065, metrics={'train_runtime': 482.6085, 'train_samples_per_second': 80.023, 'train_steps_per_second': 5.004, 'total_flos': 2540542517468160.0, 'train_loss': 0.4987692128057065, 'epoch': 5.0})

In [None]:
eval_results = trainer.evaluate()
print(eval_results)

{'eval_loss': 0.6687085032463074, 'eval_accuracy': 0.7790175458171776, 'eval_f1': 0.7790175458171776, 'eval_precision': 0.7790175458171776, 'eval_recall': 0.7790175458171776, 'eval_runtime': 11.2549, 'eval_samples_per_second': 289.563, 'eval_steps_per_second': 18.125, 'epoch': 5.0}
