In [1]:
%pip install evaluate

Note: you may need to restart the kernel to use updated packages.


In [1]:
import gc
import torch
import pandas as pd
import ast  # for list-like labels, if needed
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from transformers import (
    DebertaTokenizer,
    DebertaForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)
from evaluate import load
import numpy as np
import optuna  # Using optuna for hyperparameter optimization

In [2]:
# Clean up GPU memory
gc.collect()
torch.cuda.empty_cache()

In [3]:
import re

In [None]:
# Initialize the tokenizer and model
tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")

In [4]:
# -------------------------------
# Data Preparation and Tokenization
# -------------------------------

# Define column names and load datasets
columns = ['par_id', 'art_id', 'keyword', 'country_code', 'text', 'label']
df_texts = pd.read_csv("dontpatronizeme_pcl.tsv", sep="\t", header=None, names=columns)
df_labels = pd.read_csv("train_semeval_parids-labels.csv")

# Ensure the IDs are strings for merging
df_texts["par_id"] = df_texts["par_id"].astype(str)
df_labels["par_id"] = df_labels["par_id"].astype(str)

# Drop unnecessary label column from df_labels and create binary labels in df_texts
df_labels = df_labels.drop(columns=["label"])
df_texts["binary_label"] = df_texts["label"].apply(lambda x: 1 if x >= 2 else 0)
df_texts = df_texts.drop(columns=["label"])

# Merge datasets on paragraph ID and rename the binary label to "label"
df = df_labels.merge(df_texts, on="par_id", how="left")
df.rename(columns={"binary_label": "label"}, inplace=True)
df = df.dropna(subset=["text", "label"])  # Drop rows with missing data


def clean_text(text):
    # Normalize whitespace
    text = re.sub(r"\s+", " ", text)
    
    return text.strip()
df["text"] = df["text"].astype(str).apply(clean_text)
print("Preprocessing and balancing complete!")



# Tokenization function
def tokenize_function(text):
    return tokenizer(text, truncation=True, padding="max_length", max_length=512, return_tensors="pt")

# Tokenize the texts and stack tensors
df["tokenized"] = df["text"].apply(lambda x: tokenize_function(x))
input_ids = torch.cat([t["input_ids"] for t in df["tokenized"]], dim=0)
attention_masks = torch.cat([t["attention_mask"] for t in df["tokenized"]], dim=0)
labels = torch.tensor(df["label"].values, dtype=torch.long)

Preprocessing and balancing complete!


Some weights of DebertaForSequenceClassification were not initialized from the model checkpoint at microsoft/deberta-base and are newly initialized: ['classifier.bias', 'classifier.weight', 'pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [5]:
# -------------------------------
# Create Dataset and Sampler
# -------------------------------

class PCLDataset(Dataset):
    def __init__(self, input_ids, attention_masks, labels):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_masks[idx],
            "labels": self.labels[idx],
        }

# Split data into training and validation sets using indices
indices = list(range(len(df)))
train_idx, val_idx = train_test_split(indices, test_size=0.2, random_state=42)

train_ids, val_ids = input_ids[train_idx], input_ids[val_idx]
train_masks, val_masks = attention_masks[train_idx], attention_masks[val_idx]
train_labels, val_labels = labels[train_idx], labels[val_idx]

train_dataset = PCLDataset(train_ids, train_masks, train_labels)
val_dataset = PCLDataset(val_ids, val_masks, val_labels)


In [6]:
# -------------------------------
# Define a Custom Trainer with Weighted Sampling
# -------------------------------

class WeightedSamplerTrainer(Trainer):
    def get_train_dataloader(self):
        if self.train_dataset is None:
            raise ValueError("Trainer: training requires a train_dataset.")
        # Compute class counts from the dataset labels
        labels_tensor = torch.tensor(self.train_dataset.labels)
        class_counts = torch.bincount(labels_tensor)
        class_weights = 1.0 / class_counts.float()
        sample_weights = class_weights[labels_tensor.numpy()]
        sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(self.train_dataset), replacement=True)
        return DataLoader(
            self.train_dataset,
            batch_size=self.args.per_device_train_batch_size,
            sampler=sampler,
        )

In [7]:
# -------------------------------
# Define Metric Computation
# -------------------------------

def compute_metrics(eval_pred):
    """Computes Accuracy and F1 Score"""
    accuracy_metric = load("accuracy")
    f1_metric = load("f1")
    predictions = np.argmax(eval_pred.predictions, axis=1)
    references = eval_pred.label_ids
    accuracy_score = accuracy_metric.compute(predictions=predictions, references=references)
    f1_score = f1_metric.compute(predictions=predictions, references=references)
    return {"accuracy": accuracy_score["accuracy"], "f1": f1_score["f1"]}


**TEST WEIGHTED RANDOM SAMPLER WITH KEYWORD AND COUNTRY CODE WITH DEBERTA**

In [8]:
model = DebertaForSequenceClassification.from_pretrained("./deberta__keyword_WS_model")
tokenizer = DebertaTokenizer.from_pretrained("./deberta__keyword_WS_model")
columns = ['par_id', 'art_id', 'keyword', 'country_code', 'text', 'label']
def tokenize_function(text):
    return tokenizer(text, truncation=True, padding="max_length", max_length=512, return_tensors="pt")

df_dev = pd.read_csv("dev_semeval_parids-labels.csv")
df_texts = pd.read_csv("dontpatronizeme_pcl.tsv", sep="\t", header=None, names=columns)

df_dev["par_id"] = df_dev["par_id"].astype(str)
df_texts["par_id"] = df_texts["par_id"].astype(str)

df_dev = df_dev.drop(columns=["label"])
df_texts["binary_label"] = df_texts["label"].apply(lambda x: 1 if x >= 2 else 0)
df_texts = df_texts.drop(columns=["label"])

# Merge datasets on paragraph ID and rename the binary label to "label"
df_dev = df_dev.merge(df_texts, on="par_id", how="left")
df_dev.rename(columns={"binary_label": "label"}, inplace=True)

df_dev["text"] = df_dev["text"].fillna("")
def clean_text(text):

    text = re.sub(r"\s+", " ", text)
    return text.strip()

# Ensure text column has no NaN values before applying text cleaning
df_dev["text"] = df_dev["text"].astype(str).apply(clean_text)
def tokenize_function(text):
    return tokenizer(text, truncation=True, padding="max_length", max_length=512, return_tensors="pt")

# Tokenize the texts and stack tensors
df_dev["tokenized"] = df_dev["text"].apply(lambda x: tokenize_function(x))

dev_input_ids = torch.cat([t["input_ids"] for t in df_dev["tokenized"]], dim=0)
dev_attention_masks = torch.cat([t["attention_mask"] for t in df_dev["tokenized"]], dim=0)
dev_labels = torch.tensor(df_dev["label"].values, dtype=torch.long)

class PCLDataset(Dataset):
    def __init__(self, input_ids, attention_masks, labels):
        self.input_ids = input_ids
        self.attention_masks = attention_masks
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return {
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_masks[idx],
            "labels": self.labels[idx],
        }

# Create a Dataset instance (assuming you have a PCLDataset defined similarly)
dev_dataset = PCLDataset(dev_input_ids, dev_attention_masks, dev_labels)
final_training_args = TrainingArguments(
       output_dir="./results",
       learning_rate=9.468961305919929e-06,
       optim="adamw_torch",
       warmup_ratio=0.1,
       evaluation_strategy="epoch",
       save_strategy="no", # change to no
       per_device_train_batch_size=4,  # Reduce from 8 or 16 to 4 or even 2
       per_device_eval_batch_size=4,   # Match train batch size
       num_train_epochs=2,
       weight_decay=0.03430359963348227,
       logging_dir="./logs",
       logging_steps=50,
       save_total_limit=2,
       lr_scheduler_type="linear",
       load_best_model_at_end=False,
       metric_for_best_model="f1",
       report_to="none",
       fp16=False,  # Enables mixed precision training (reduces memory usage)
       bf16=False,  # Keep this False unless on Ampere GPUs (A100, RTX 30xx)
   )

final_trainer = WeightedSamplerTrainer(
    model=model,
    args=final_training_args,
    train_dataset=train_dataset,
    eval_dataset=dev_dataset,
    compute_metrics=compute_metrics

)

final_results = final_trainer.evaluate()
print("Final evaluation results:", final_results)




Final evaluation results: {'eval_loss': 0.558684766292572, 'eval_model_preparation_time': 0.003, 'eval_accuracy': 0.9293218720152817, 'eval_f1': 0.5956284153005464, 'eval_runtime': 119.5852, 'eval_samples_per_second': 17.511, 'eval_steps_per_second': 4.382}


**Generate labels for the dev dataset**

In [9]:
# Get predictions from the trainer on the dev_dataset
predictions_output = final_trainer.predict(dev_dataset)
# For classification, get the index of the maximum logit for each sample
predicted_labels = predictions_output.predictions.argmax(axis=1)

# Save each predicted label on a new line in a text file
with open("dev.txt", "w") as f:
    for label in predicted_labels:
        f.write(f"{label}\n")


In [23]:
# Load the predicted labels from the file
with open("dev.txt", "r") as f:
    loaded_labels = [int(line.strip()) for line in f.readlines()]

# Extract true labels from your dev_dataset.
# If dev_dataset.labels is a tensor, you can convert it to a list:
true_labels = dev_dataset.labels.tolist()

# Ensure the lengths match
assert len(loaded_labels) == len(true_labels), "Number of predictions does not match the number of true labels!"

# Compare predictions with true labels; for example, compute accuracy
correct = sum(1 for pred, true in zip(loaded_labels, true_labels) if pred == true)
accuracy = correct / len(true_labels)
print("Accuracy: {:.2f}%".format(accuracy * 100))

Accuracy: 92.93%


In [12]:
print(len(loaded_labels))

2094


**Generate labels for the test set**

In [13]:
columns = ['par_id', 'art_id', 'keyword', 'country_code', 'text']
df_test = pd.read_csv("task4_test.tsv", sep="\t", header=None, names=columns)
df_test["par_id"] = df_test["par_id"].astype(str)
def clean_text(text):
    # Normalize whitespace
    text = re.sub(r"\s+", " ", text)
    
    return text.strip()
df_test["text"] = df_test["text"].astype(str).apply(clean_text)
print("Preprocessing and balancing complete!")

# Tokenization function
def tokenize_function(text):
    return tokenizer(text, truncation=True, padding="max_length", max_length=512, return_tensors="pt")

# Create a custom Dataset class for the test set (without labels)
class PCLDatasetTest(Dataset):
    def __init__(self, input_ids, attention_masks):
        self.input_ids = input_ids
        self.attention_masks = attention_masks

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return {
            "input_ids": self.input_ids[idx],
            "attention_mask": self.attention_masks[idx],
        }

# Tokenize the test texts as you already did
df_test["tokenized"] = df_test["text"].apply(lambda x: tokenize_function(x))
test_input_ids = torch.cat([t["input_ids"] for t in df_test["tokenized"]], dim=0)
test_attention_masks = torch.cat([t["attention_mask"] for t in df_test["tokenized"]], dim=0)

# Create a test dataset using the custom Dataset class
test_dataset = PCLDatasetTest(test_input_ids, test_attention_masks)

# Use the trainer's predict method to generate predictions on the test dataset
predictions_output_test = final_trainer.predict(test_dataset)
predicted_labels_test = predictions_output_test.predictions.argmax(axis=1)

# Save the predicted labels into a text file (one prediction per line)
with open("test.txt", "w") as f:
    for label in predicted_labels_test:
        f.write(f"{label}\n")


Preprocessing and balancing complete!


In [14]:
print(len(predicted_labels_test))

3832
