In [3]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import MultiLabelBinarizer
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
import torch
import matplotlib.pyplot as plt
import seaborn as sns

sns.set(style="whitegrid")


In [4]:
def classify_issue(issue):
    stance_groups = {
        'brexit': ['pro_brexit', 'anti_brexit'],
        'ClimateChangeUK': ['pro_climateAction', 'anti_climateAction'],
        'HealthcareUK': ['pro_NHS', 'anti_NHS'],
        'IsraelPalestineUK': ['pro_israel', 'pro_palestine'],
        'TaxationUK': ['pro_company_taxation', 'pro_worker_taxation']
    }
    
    if issue not in stance_groups:
        raise ValueError(f"Unknown issue: {issue}")
    
    targets = stance_groups[issue] + ['neutral', 'irrelevant']

    file_path = '/Users/adamzulficar/Documents/year3/Bachelor Project/Thesis/Automated Annotation/Training Data/UK/{}_training.csv'.format(issue)
    df = pd.read_csv(file_path)

    mlb = MultiLabelBinarizer(classes=targets)
    df[targets] = mlb.fit_transform(df[targets])

    tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

    # Combine text_raw and context_raw with [SEP] token
    df['combined_text'] = df['text_raw'].fillna('') + " [SEP] " + df['context_raw'].fillna('')

    def tokenize_function(examples):
        return tokenizer(examples['combined_text'], padding='max_length', truncation=True, max_length=512)

    tokenized_datasets = df['combined_text'].apply(lambda x: tokenize_function({'combined_text': x}))
    labels = df[targets].values

    # Convert tokenized datasets to the format expected by the Dataset class
    tokenized_datasets = {key: torch.tensor([d[key] for d in tokenized_datasets]) for key in tokenized_datasets[0]}

    return tokenized_datasets, labels, targets

tokenized_datasets, labels, targets = classify_issue('ClimateChangeUK')




ValueError: Length of values (4) does not match length of index (400)

In [None]:
def train_roberta(tokenized_datasets, labels, targets, model_name='roberta-base', batch_size=8, epochs=3):
    # Convert the dataset to torch Dataset
    class Dataset(torch.utils.data.Dataset):
        def __init__(self, encodings, labels):
            self.encodings = encodings
            self.labels = labels

        def __getitem__(self, idx):
            item = {key: val[idx] for key, val in self.encodings.items()}
            item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
            return item

        def __len__(self):
            return len(self.labels)

    # Prepare the dataset
    dataset = Dataset(tokenized_datasets, labels)

    # Split the dataset
    train_size = 0.8
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [int(train_size * len(dataset)), len(dataset) - int(train_size * len(dataset))])

    # Load model
    model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=len(targets))

    training_args = TrainingArguments(
        output_dir='./results',
        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10,
        evaluation_strategy="epoch"
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset
    )

    # Train the model
    trainer.train()

    # Evaluate the model
    predictions = trainer.predict(test_dataset)
    preds = np.argmax(predictions.predictions, axis=1)

    return model, preds, predictions.label_ids

model, preds, true_labels = train_roberta(tokenized_datasets, labels, targets)

# Calculate performance metrics
accuracy = accuracy_score(true_labels, preds)
precision = precision_score(true_labels, preds, average='weighted', zero_division=0)
recall = recall_score(true_labels, preds, average='weighted', zero_division=0)
f1 = f1_score(true_labels, preds, average='weighted', zero_division=0)

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")


In [None]:
def plot_learning_curve(model_name, tokenized_datasets, labels, train_sizes=np.linspace(0.1, 1.0, 5), batch_size=8, epochs=3):
    class Dataset(torch.utils.data.Dataset):
        def __init__(self, encodings, labels):
            self.encodings = encodings
            self.labels = labels

        def __getitem__(self, idx):
            item = {key: val[idx] for key, val in self.encodings.items()}
            item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
            return item

        def __len__(self):
            return len(self.labels)

    dataset = Dataset(tokenized_datasets, labels)
    train_size = 0.8
    train_dataset, test_dataset = torch.utils.data.random_split(
        dataset, [int(train_size * len(dataset)), len(dataset) - int(train_size * len(dataset))]
    )

    train_results = []
    test_results = []

    for size in train_sizes:
        subset_size = int(size * len(train_dataset))
        subset_dataset = torch.utils.data.Subset(train_dataset, range(subset_size))

        model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=len(targets))
        training_args = TrainingArguments(
            output_dir='./results',
            num_train_epochs=epochs,
            per_device_train_batch_size=batch_size,
            per_device_eval_batch_size=batch_size,
            warmup_steps=500,
            weight_decay=0.01,
            logging_dir='./logs',
            logging_steps=10,
            evaluation_strategy="epoch"
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=subset_dataset,
            eval_dataset=test_dataset
        )

        trainer.train()
        train_results.append(trainer.evaluate(subset_dataset)['eval_f1'])
        test_results.append(trainer.evaluate(test_dataset)['eval_f1'])

    plt.figure(figsize=(12, 8))
    plt.plot(train_sizes, train_results, label='Training score', marker='o')
    plt.plot(train_sizes, test_results, label='Cross-validation score', marker='o')
    plt.title('Learning Curve')
    plt.xlabel('Training Size')
    plt.ylabel('F1 Score')
    plt.legend(loc='best')
    plt.grid()
    plt.show()

plot_learning_curve('roberta-base', tokenized_datasets, labels)
