In [None]:
import pandas as pd

import matplotlib.pyplot as plt

from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import ExponentialLR
from transformers import AutoModelForSequenceClassification, AutoTokenizer


torch.cuda.is_available()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

torch.cuda.empty_cache()

In [None]:
model_name = 'gpt2'
# model_name = "l3cube-pune/tamil-bert"
# model_name = "google-bert/bert-base-multilingual-uncased"

model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=7)

tokenizer = AutoTokenizer.from_pretrained(model_name)
model.to(device)

tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id

### Dataloader

In [None]:
df = pd.read_csv("PS_train.csv", encoding_errors="ignore", on_bad_lines='skip')
df_test = pd.read_csv("PS_dev.csv", encoding_errors="ignore", on_bad_lines='skip')

print(df.shape, df_test.shape)

df = pd.concat([df, df_test], ignore_index=True)
df_test = df.sample(frac=0.20, random_state=42)
df = df.drop(df_test.index)
df["labels"].value_counts()

In [None]:
label_dict = {
    "Opinionated": 0,
    "Sarcastic": 1,
    "Neutral": 2,
    "Positive": 3,
    "Substantiated": 4,
    "Negative": 5,
    "None of the above": 6
}

df["labels"] = df["labels"].apply(lambda x: label_dict[x])
df_test["labels"] = df_test["labels"].apply(lambda x: label_dict[x])


train_texts = df['content'].tolist()
train_labels = df['labels'].tolist()

test_texts = df_test['content'].tolist()
test_labels = df_test['labels'].tolist()

In [None]:
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = self.texts[item]
        label = self.labels[item]
        label = int(label)

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

max_length = 256

batch_size = 32
train_dataset = TextDataset(train_texts, train_labels, tokenizer, max_length)
test_dataset = TextDataset(test_texts, test_labels, tokenizer, max_length)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)


for batch in train_dataloader:
    print(batch)
    break

### Utils

In [None]:
sentence_lengths = df['content'].apply(lambda x: len(tokenizer.encode(x, add_special_tokens=True)))

plt.figure(figsize=(10, 6))
plt.hist(sentence_lengths, bins=50, color='skyblue', edgecolor='black')
plt.title('Distribution of Sentence Lengths')
plt.xlabel('Sentence Length')
plt.ylabel('Frequency')
plt.show()

print(f"Max sentence length: {max(sentence_lengths)}")
print(f"Median sentence length: {sentence_lengths.median()}")
print(f"Mean sentence length: {sentence_lengths.mean()}")
print(f"90th percentile sentence length: {pd.Series(sentence_lengths).quantile(0.9)}")

In [None]:
def compute_metrics(preds, labels):
    preds = preds.flatten()
    labels = labels.flatten()

    accuracy = accuracy_score(labels.cpu(), preds.cpu())
    precision = precision_score(labels.cpu(), preds.cpu(), average='macro', zero_division=1)  # For binary classification
    recall = recall_score(labels.cpu(), preds.cpu(), average='macro', zero_division=1)
    f1 = f1_score(labels.cpu(), preds.cpu(), average='macro', zero_division=1)

    return accuracy, precision, recall, f1


def calc_and_print_metrics(model, dataloader):
    model.eval()

    test_preds = []
    test_labels = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            logits = outputs.logits

            preds = torch.argmax(logits, dim=-1)

            test_preds.extend(preds.cpu().numpy())
            test_labels.extend(labels.cpu().numpy())

    accuracy, precision, recall, f1 = compute_metrics(torch.tensor(test_preds), torch.tensor(test_labels))

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")

### Training

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
scheduler = ExponentialLR(optimizer, gamma=0.9)

In [None]:
epochs = 20
lr_history = []

for epoch in range(epochs):
    model.train()
    total_loss = 0
    all_preds = []
    all_labels = []

    for batch in train_dataloader:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        logits = outputs.logits

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # the label with the highest probability
        preds = torch.argmax(logits, dim=-1)

        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    scheduler.step()
    lr_history.append(optimizer.param_groups[0]['lr'])

    avg_loss = total_loss / len(train_dataloader)
    accuracy, precision, recall, f1 = compute_metrics(torch.tensor(all_preds), torch.tensor(all_labels))

    print(f"Epoch {epoch+1}")
    print(f"Average Training Loss: {avg_loss:.4f}")
    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

    print("------ on test dataset ------")
    calc_and_print_metrics(model, test_dataloader)
    print("\n")