In [1]:
!apt-get update
!apt-get install -y libmariadb-dev libmariadb-dev-compat build-essential
!pip install mariadb


0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Connected to cloud.r-pr                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as reposit

In [2]:
!pip install transformers datasets scikit-learn sqlalchemy mariadb --quiet
!git clone https://github.com/Horizontal-Labs/Argument-Mining.git
import sys
sys.path.append("/content/Argument-Mining")


fatal: destination path 'Argument-Mining' already exists and is not an empty directory.


In [3]:
!pip install mysql-connector-python




In [4]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from db.models import ADU, Relationship
import pandas as pd

# Database credentials
DB = "argument-mining"
HOST = "argumentmining.ddns.net:3306"
USER = "guidedproject"
PW = "guidedproject"
DB_URI = f'mysql+mysqlconnector://{USER}:{PW}@{HOST}/{DB}'

# Set up session
engine = create_engine(DB_URI)
Session = sessionmaker(bind=engine)
session = Session()

# Import query helpers
from db.queries import get_training_data, get_test_data

# Fetch training data
claims_train, premises_train, stances_train = get_training_data()

# Limit to first N pairs (e.g. 1000 claim-premise pairs = 2000 ADUs)
LIMIT = 1000
claims_train = claims_train[:LIMIT]
premises_train = premises_train[:LIMIT]
stances_train = stances_train[:LIMIT]





# Convert to dataframe rows
def adu_to_dict(adu_obj, label_type, stance):
    return {
        'text': adu_obj.text,
        'type': label_type,
        'stance': 1 if stance == 'stance_pro' else 0
    }

train_rows = []
for claim, premise, stance in zip(claims_train, premises_train, stances_train):
    if claim and premise:
        train_rows.append(adu_to_dict(claim, 1, stance))
        train_rows.append(adu_to_dict(premise, 0, stance))

# Final dataframe
df = pd.DataFrame(train_rows).dropna().reset_index(drop=True)




In [5]:
claims_test, premises_test, stances_test = get_test_data()

claims_test = claims_test[:5000]
premises_test = premises_test[:5000]
stances_test = stances_test[:5000]

In [6]:
from transformers import AutoTokenizer
from datasets import Dataset

tokenizer = AutoTokenizer.from_pretrained('microsoft/deberta-v3-base')
dataset = Dataset.from_pandas(df)

def tokenize(example):
    return tokenizer(example['text'], padding='max_length', truncation=True, max_length=128)

dataset = dataset.map(tokenize, batched=True)

dataset = dataset.remove_columns(['text', '__index_level_0__'] if '__index_level_0__' in dataset.column_names else ['text'])
encoded_train = dataset

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Map:   0%|          | 0/2000 [00:00<?, ? examples/s]

In [7]:
test_rows = []
for claim, premise, stance in zip(claims_test, premises_test, stances_test):
    if claim is not None:
        test_rows.append(adu_to_dict(claim, 1, stance))
    if premise is not None:
        test_rows.append(adu_to_dict(premise, 0, stance))

df_test = pd.DataFrame(test_rows)

# Convert test dataframe to Hugging Face Dataset
test_dataset = Dataset.from_pandas(df_test)

# Tokenize the test dataset
encoded_test = test_dataset.map(tokenize, batched=True)

# Remove original text column and potential index column
encoded_test = encoded_test.remove_columns(['text', '__index_level_0__'] if '__index_level_0__' in encoded_test.column_names else ['text'])


Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

In [8]:
import torch
from torch import nn
from transformers import AutoModel

class MultiTaskModel(nn.Module):
    def __init__(self, model_name):
        super().__init__()
        self.bert = AutoModel.from_pretrained(model_name)
        hidden_size = self.bert.config.hidden_size
        self.classifier_type = nn.Linear(hidden_size, 2)
        self.classifier_stance = nn.Linear(hidden_size, 2)

    def forward(self, input_ids, attention_mask=None, token_type_ids=None, type=None, stance=None): # Include token_type_ids
        # Pass token_type_ids to the BERT model if provided
        outputs = self.bert(input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        pooled = outputs.last_hidden_state[:, 0]
        return {
            'logits_type': self.classifier_type(pooled),
            'logits_stance': self.classifier_stance(pooled)
        }

model = MultiTaskModel('microsoft/deberta-v3-base')


In [9]:

from transformers import TrainingArguments, Trainer
from sklearn.metrics import accuracy_score

class MultiTaskTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # Access 'type' and 'stance' directly from inputs
        labels_type = inputs.get("type")
        labels_stance = inputs.get("stance")
        outputs = model(**inputs)
        loss_fn = nn.CrossEntropyLoss()
        loss_type = loss_fn(outputs["logits_type"], labels_type)
        loss_stance = loss_fn(outputs["logits_stance"], labels_stance)
        return (loss_type + loss_stance, outputs) if return_outputs else loss_type + loss_stance




In [10]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""


In [11]:
def adu_to_dict(adu_obj, label_type, stance):
    return {
        'text': adu_obj.text,
        'type': label_type,
        'stance': 1 if stance == 'stance_pro' else 0
    }

# Prepare test data
test_rows = []
for claim, premise, stance in zip(claims_test, premises_test, stances_test):
    if claim is not None:
        test_rows.append(adu_to_dict(claim, 1, stance))
    if premise is not None:
        test_rows.append(adu_to_dict(premise, 0, stance))

df_test = pd.DataFrame(test_rows)

In [1]:
encoded_test = tokenizer(
    list(df_test['text']),
    padding=True,
    truncation=True,
    return_tensors='pt'
)

# Add labels
encoded_test['type'] = torch.tensor(df_test['type'].values)
encoded_test['stance'] = torch.tensor(df_test['stance'].values)


NameError: name 'tokenizer' is not defined

In [None]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(eval_pred):
    logits, labels = eval_pred

    # If you have multiple heads, adjust this logic accordingly
    if isinstance(logits, tuple):
        # For multitask setup (e.g., type and stance)
        type_logits, stance_logits = logits
        type_preds = type_logits.argmax(axis=-1)
        stance_preds = stance_logits.argmax(axis=-1)

        type_labels = labels["type"]
        stance_labels = labels["stance"]

        # Type classification metrics
        type_acc = accuracy_score(type_labels, type_preds)
        type_precision, type_recall, type_f1, _ = precision_recall_fscore_support(
            type_labels, type_preds, average="weighted", zero_division=0
        )

        # Stance classification metrics
        stance_acc = accuracy_score(stance_labels, stance_preds)
        stance_precision, stance_recall, stance_f1, _ = precision_recall_fscore_support(
            stance_labels, stance_preds, average="weighted", zero_division=0
        )

        return {
            "type_accuracy": type_acc,
            "type_precision": type_precision,
            "type_recall": type_recall,
            "type_f1": type_f1,
            "stance_accuracy": stance_acc,
            "stance_precision": stance_precision,
            "stance_recall": stance_recall,
            "stance_f1": stance_f1,
        }

    else:
        # For single-task case (just one head)
        preds = logits.argmax(axis=-1)
        acc = accuracy_score(labels, preds)
        precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average="weighted", zero_division=0)

        return {
            "accuracy": acc,
            "precision": precision,
            "recall": recall,
            "f1": f1
        }


In [None]:
training_args = TrainingArguments(
    output_dir='./results',  # Directory to save results
    num_train_epochs=3,      # Number of training epochs
    per_device_train_batch_size=16, # Batch size for training
    per_device_eval_batch_size=16,  # Batch size for evaluation
    warmup_steps=500,        # Number of warmup steps
    weight_decay=0.01,       # Strength of weight decay
    logging_dir='./logs',    # Directory for storing logs
    logging_steps=10,
)

trainer = MultiTaskTrainer(
    model=model,
    args=training_args,
    train_dataset=encoded_train,
    eval_dataset=encoded_test,  # ← test set
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

trainer.train()

In [None]:
trainer.evaulate()
