In [37]:
import pandas as pd
import numpy as np
from sklearn.dummy import DummyClassifier
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from datasets import Dataset
import torch
from transformers import (
    AutoModelForSequenceClassification,
    AutoTokenizer,
    DataCollatorWithPadding,
    TrainingArguments,
    Trainer,
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device: %s" % device)

## TASK 1


In [None]:
### Arguments
arguments_training_url = (
    "https://zenodo.org/records/8248658/files/arguments-training.tsv?download=1"
)
arguments_validation_url = (
    "https://zenodo.org/records/8248658/files/arguments-validation.tsv?download=1"
)
arguments_test_url = (
    "https://zenodo.org/records/8248658/files/arguments-test.tsv?download=1"
)

### Human values
labels_training_url = (
    "https://zenodo.org/records/8248658/files/labels-training.tsv?download=1"
)
labels_validation_url = (
    "https://zenodo.org/records/8248658/files/labels-validation.tsv?download=1"
)
labels_test_url = "https://zenodo.org/records/8248658/files/labels-test.tsv?download=1"

In [None]:
### Creating Training dataframe
arguments_tr_df = pd.read_csv(arguments_training_url, sep="\t")
labels_tr_df = pd.read_csv(labels_training_url, sep="\t")

### Merging arguments and labels
train_df_nm = pd.merge(arguments_tr_df, labels_tr_df, on="Argument ID")

### Creating Validation dataframe
arguments_va_df = pd.read_csv(arguments_validation_url, sep="\t")
labels_va_df = pd.read_csv(labels_validation_url, sep="\t")

### Merging arguments and labels
validation_df_nm = pd.merge(arguments_va_df, labels_va_df, on="Argument ID")

### Creating Test dataframe
arguments_te_df = pd.read_csv(arguments_test_url, sep="\t")
labels_te_df = pd.read_csv(labels_test_url, sep="\t")

### Merging arguments and labels
test_df_nm = pd.merge(arguments_te_df, labels_te_df, on="Argument ID")

### Notation
### nm=not merged with logical OR

In [None]:
train_df_nm.describe()

In [None]:
train_df_nm.head()

In [None]:
### Considering category ranges (0,3),(3,7),(7,13),(13,19)
### adding +4, considering the first 4 columns which are not categories
column_ranges = [(4, 7), (7, 11), (11, 17), (17, 23)]
level_3_cat = [
    "Openness_to_change",
    "Self_enhancement",
    "Conversation",
    "Self_transcendence",
]
columns_to_keep = ["Argument ID", "Conclusion", "Stance", "Premise"]

### Creating final dataframes
train_df = pd.DataFrame()
validation_df = pd.DataFrame()
test_df = pd.DataFrame()

### Applying OR to the selected columns using .any(axis=1)
for (start, end), cat in zip(column_ranges, level_3_cat):
    train_df[cat] = train_df_nm.iloc[:, start:end].any(axis=1)
    validation_df[cat] = validation_df_nm.iloc[:, start:end].any(axis=1)
    test_df[cat] = test_df_nm.iloc[:, start:end].any(axis=1)

### Reading the columns to keep
train_df = pd.concat([train_df_nm[columns_to_keep], train_df], axis=1)
validation_df = pd.concat([validation_df_nm[columns_to_keep], validation_df], axis=1)
test_df = pd.concat([test_df_nm[columns_to_keep], test_df], axis=1)

In [None]:
train_df.head()

In [None]:
test_df.describe()

In [None]:
validation_df.head()

In [None]:
test_df["Openness_to_change"]

In [None]:
### Define a mapping for "Stance" column
stance_mapping = {"in favor of": True, "against": False}

### Apply the mapping to convert strings to boolean values
train_df["Stance"] = train_df["Stance"].map(stance_mapping)
validation_df["Stance"] = validation_df["Stance"].map(stance_mapping)
test_df["Stance"] = test_df["Stance"].map(stance_mapping)

In [None]:
train_df.head()

## TASK 2

### Uniform Baseline

In [None]:
np.random.seed(12345678)

In [None]:
clf_list = [DummyClassifier(strategy="uniform") for _ in level_3_cat]
[
    clf.fit(X=train_df[columns_to_keep[1:]], y=train_df[cat])
    for clf, cat in zip(clf_list, level_3_cat)
]
prediction_uniform = np.array(
    [clf.predict(X=test_df[columns_to_keep[1:]]) for clf in clf_list]
).T

### F1 score for Unifrom Baseline

In [None]:
### Evaluate F1 overall
f1_overall = f1_score(
    y_true=test_df[level_3_cat], y_pred=prediction_uniform, average="weighted"
)
print(f"Random Classifier F1 overall weighted : {f1_overall:.4f}")

### Evaluate F1 overall
f1_overall = f1_score(
    y_true=test_df[level_3_cat], y_pred=prediction_uniform, average="macro"
)
print(f"Random Classifier F1 overall macro: {f1_overall:.4f}")


### Evaluate F1 per category
f1_per_cat = [
    f1_score(y_true=test_df[cat], y_pred=prediction_uniform[:, i])
    for i, cat in enumerate(level_3_cat)
]
print(f"Random Classifier F1 per category: {f1_per_cat}")

### Majority Baseline

In [None]:
clf_list = [DummyClassifier(strategy="most_frequent") for _ in level_3_cat]
[
    clf.fit(X=train_df[columns_to_keep[1:]], y=train_df[cat])
    for clf, cat in zip(clf_list, level_3_cat)
]
prediction_majority = np.array(
    [clf.predict(X=test_df[columns_to_keep[1:]]) for clf in clf_list]
).T

### F1 score for Majority Baseline

In [None]:
### Evaluate over all F1
f1_overall = f1_score(
    y_true=test_df[level_3_cat], y_pred=prediction_majority, average="weighted"
)
print(f"Majority Classifier F1 weighted : {f1_overall:.4f}")

### Evaluate over all F1
f1_overall = f1_score(
    y_true=test_df[level_3_cat], y_pred=prediction_majority, average="macro"
)
print(f"Majority Classifier F1 macro: {f1_overall:.4f}")


### Evaluate F1 per category
f1_per_cat = [
    f1_score(y_true=test_df[cat], y_pred=prediction_majority[:, i])
    for i, cat in enumerate(level_3_cat)
]
print(f"Random Classifier F1 per category: {f1_per_cat}")

### BERT Classifier

In [None]:
### Convert dataframes into datasets
train_dataset = Dataset.from_pandas(train_df)
validation_dataset = Dataset.from_pandas(validation_df)
test_dataset = Dataset.from_pandas(test_df)

In [None]:
id2label = {idx: label for idx, label in enumerate(level_3_cat)}
label2id = {label: idx for idx, label in enumerate(level_3_cat)}

In [None]:
model_card = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_card)
model = AutoModelForSequenceClassification.from_pretrained(
    model_card,
    problem_type="multi_label_classification",
    num_labels=len(level_3_cat),
    id2label=id2label,
    label2id=label2id,
)

### Conclusion Only Model

In [None]:
### Encoding for Conclusion only model
def tokenize_conclusion(example):
    ### Tokenize text columns
    text_tokens = tokenizer(
        example["Conclusion"],
        truncation=True,
        padding="max_length",
        max_length=tokenizer.model_max_length,
        return_tensors="pt",
    )

    ### Combine text tokens with non-text features
    encoded_example = {
        "input_ids": text_tokens["input_ids"],
        "token_type_ids": text_tokens["token_type_ids"],
        "attention_mask": text_tokens["attention_mask"],
        "Openness_to_change": torch.tensor(
            example["Openness_to_change"], dtype=torch.float
        ),
        "Self_enhancement": torch.tensor(
            example["Self_enhancement"], dtype=torch.float
        ),
        "Conversation": torch.tensor(example["Conversation"], dtype=torch.float),
        "Self_transcendence": torch.tensor(
            example["Self_transcendence"], dtype=torch.float
        ),
    }
    # print(encoded_example)
    return encoded_example

In [None]:
### Tokenize train, validation, test datasets
ds_list = [
    d.map(tokenize_conclusion, batched=True)
    for d in (train_dataset, validation_dataset, test_dataset)
]

### Set format for train, validation, test tokenized datasets
columns = [
    "input_ids",
    "token_type_ids",
    "attention_mask",
    "Openness_to_change",
    "Self_enhancement",
    "Conversation",
    "Self_transcendence",
]

for d in ds_list:
    d.set_format(type="torch", columns=columns)
train_tokenized_ds, valid_tokenized_ds, test_tokenized_ds = ds_list

# ### Tokenize training data
# test_tokenized_dataset = test_dataset.map(tokenize_conclusion, batched=True)
# test_tokenized_dataset.set_format(
#     "tensorflow",
#     columns=[
#         "input_ids",
#         "token_type_ids",
#         "attention_mask",
#         "Openness_to_change",
#         "Self_enhancement",
#         "Conversation",
#         "Self_transcendence",
#     ],
# )

In [None]:
### Sanity check
print(train_tokenized_ds["Conclusion"][50])
decoded_text = tokenizer.decode(train_tokenized_ds["input_ids"][50])
print(decoded_text)

In [None]:
### Collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
model = AutoModelForSequenceClassification.from_pretrained(
    model_card, num_labels=len(level_3_cat), id2label=id2label, label2id=label2id
)

In [None]:
print(model)

## Metrics

In [36]:
def multi_label_metrics(predictions, labels, threshold=0.5):
    # first, apply sigmoid on predictions which are of shape (batch_size, num_labels)
    sigmoid = torch.nn.Sigmoid()
    probs = sigmoid(torch.Tensor(predictions))
    # next, use threshold to turn them into integer predictions
    y_pred = np.zeros(probs.shape)
    y_pred[np.where(probs >= threshold)] = 1
    # finally, compute metrics
    y_true = labels
    metrics = f1_score(y_true=y_true, y_pred=y_pred, average="macro")
    return metrics


def compute_metrics(prediction):
    preds = (
        prediction.predictions[0]
        if isinstance(prediction.predictions, tuple)
        else prediction.predictions
    )
    result = multi_label_metrics(predictions=preds, labels=prediction.label_ids)
    return result

### Training Arguments

In [None]:
training_args = TrainingArguments(
    output_dir="/Models/BertBaseUncased",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    load_best_model_at_end=True,
    report_to="none",
    save_strategy="epoch",  #'no'
    evaluation_strategy="epoch",
    num_train_epochs=1,  ### fine tuning
    weight_decay=0.01,
    metric_for_best_model="f1",
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized_ds,
    eval_dataset=valid_tokenized_ds,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

## Conclusion - Premise Model

In [25]:
### Encoding for Conclusion - Premise model
def tokenize_conclusion_premise(example):
    ### Tokenize text columns
    text_tokens = tokenizer(
        example["Conclusion"],
        example["Premise"],
        truncation=True,
        padding="max_length",
        max_length=tokenizer.model_max_length,
        return_tensors="pt",
    )

    ### Combine text tokens with non-text features
    encoded_example = {
        "input_ids": text_tokens["input_ids"],
        "token_type_ids": text_tokens["token_type_ids"],
        "attention_mask": text_tokens["attention_mask"],
        "Openness_to_change": torch.tensor(
            example["Openness_to_change"], dtype=torch.bool
        ),
        "Self_enhancement": torch.tensor(example["Self_enhancement"], dtype=torch.bool),
        "Conversation": torch.tensor(example["Conversation"], dtype=torch.bool),
        "Self_transcendence": torch.tensor(
            example["Self_transcendence"], dtype=torch.bool
        ),
    }

    return encoded_example

## Conclusion - Premise - Stance Model

In [26]:
### Encoding for Conclusion - Premise - Stance model
def tokenize_conclusion_premise_stance(example):
    ### Tokenize text columns
    text_tokens = tokenizer(
        example["Conclusion"],
        example["Premise"],
        truncation=True,
        padding="max_length",
        max_length=tokenizer.model_max_length,
        return_tensors="pt",
    )

    ### Combine text tokens with non-text features
    encoded_example = {
        "input_ids": text_tokens["input_ids"],
        "token_type_ids": text_tokens["token_type_ids"],
        "attention_mask": text_tokens["attention_mask"],
        "Stance": torch.tensor(
            example["Stance"], dtype=torch.bool
        ),  ### Assuming 'Stance' is represented as 0 or 1
        "Openness_to_change": torch.tensor(
            example["Openness_to_change"], dtype=torch.bool
        ),
        "Self_enhancement": torch.tensor(example["Self_enhancement"], dtype=torch.bool),
        "Conversation": torch.tensor(example["Conversation"], dtype=torch.bool),
        "Self_transcendence": torch.tensor(
            example["Self_transcendence"], dtype=torch.bool
        ),
    }

    return encoded_example