# Klasifikacija tokena

## Uvoz potrebnih biblioteka

In [None]:
from datasets import load_dataset
from datasets import Features, ClassLabel, Sequence, Value
from datasets import Dataset, DatasetDict
from datasets.arrow_dataset import NonExistentDatasetError

from ast import literal_eval

import pandas as pd
import numpy as np
import random

from sklearn.metrics import classification_report
from sklearn.metrics import precision_recall_fscore_support
from sklearn.metrics import roc_auc_score, roc_curve 
import matplotlib.pyplot as plt 

from transformers import AutoTokenizer, AutoModel, AutoConfig, AutoModelForTokenClassification, Trainer, TrainingArguments, DataCollatorForTokenClassification
from transformers.modeling_outputs import TokenClassifierOutput

import torch
import torch.nn as nn
from torch.nn.functional import cross_entropy

import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["WANDB_DISABLED"] = "true"

## Definiranje argumenata, odabir jezičnog modela

In [None]:
# Slučajni broj
seed = 16
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

# Značajke skupa podataka
label_names = ["0", "1"]
features = Features({"id": Value("int64"),
                    "tokens": Sequence(Value("string")),
                     "label": Sequence(ClassLabel(num_classes=2, names=label_names)),
                     "pos": Sequence(Value("string"))
                      })

# Odabir jezičnog modela i parametri modela
# EMBEDDIA/crosloengual-bert, bert-base-multilingual-uncased, classla/bcms-bertic, classla/xlm-r-bertic, xlm-roberta-base, xlm-roberta-large
model_ckpt = "classla/xlm-r-bertic"
train_batch_size = 32
eval_batch_size = 16
learning_rate = 3e-5
num_train_epoch = 3
warmup_steps = 0

## Učitavanje skupa podataka

In [None]:
# Odabir skupa podataka za treniranje i testiranje
df_train = pd.read_csv('data/fold_7/train_7.csv', encoding="UTF-8")
df_train["tokens"] = df_train["tokens"].apply(literal_eval)
df_train["label"] = df_train["label"].apply(literal_eval)
df_train["pos"] = df_train["pos"].apply(literal_eval)

df_test = pd.read_csv('data/fold_7/test_7.csv', encoding="UTF-8")
df_test["tokens"] = df_test["tokens"].apply(literal_eval)
df_test["label"] = df_test["label"].apply(literal_eval)
df_test["pos"] = df_test["pos"].apply(literal_eval)

dataset = DatasetDict({
    "train": Dataset.from_pandas(df_train, features=features),
    "test": Dataset.from_pandas(df_test, features=features)
    })

label_list = dataset["train"].features[f"label"].feature.names
print(dataset)

## Tokenizacija skupa podataka

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

max_length = 235

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], is_split_into_words=True, padding=True, truncation=True, max_length=max_length)

    labels = []
    for i, label in enumerate(examples[f"label"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100) 
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True, remove_columns=['tokens','label', 'pos'])

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

id2label = {
    0: "0", 
    1: "1",
}
label2id = {
    "0": 0,
    "1": 1,
}

## Izračun metrika i prikaz izvještaja klasifikacije

In [None]:
def compute_metrics(p):
    predictions, labels = p
    predictions_for_roc = predictions.reshape(-1, predictions.shape[-1])
    labels_for_roc = labels.reshape(-1)

    valid_indices = labels_for_roc != -100
    valid_labels_for_roc = labels_for_roc[valid_indices]
    valid_predictions_for_roc = predictions_for_roc[valid_indices]

    y_scores = valid_predictions_for_roc[:, 1] 

    # Izračun ROC-AUC
    roc_auc = None
    if len(np.unique(valid_labels_for_roc)) > 1:
        try:
            roc_auc = roc_auc_score(valid_labels_for_roc, y_scores)
        except ValueError as e:
            print(f"Greška u izračunu ROC AUC: {e}")
            roc_auc = 0.0 

    predictions = np.argmax(predictions, axis=2)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    flat_labels = [item for sublist in true_labels for item in sublist]
    flat_predictions = [item for sublist in true_predictions for item in sublist]

    print(classification_report(flat_labels, flat_predictions, digits=6))
    precision, recall, f1, _ = precision_recall_fscore_support(flat_labels, flat_predictions, average="weighted", zero_division=0)

    metrics_results = {
        "precision": precision,
        "recall": recall,
        "f1": f1
    }
    if roc_auc is not None:
        metrics_results["roc_auc"] = roc_auc

    return metrics_results


## Treniranje i evaluacija modela za klasifikaciju tokena

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForTokenClassification.from_pretrained(
    model_ckpt,
    num_labels=len(label_list), 
    id2label=id2label,
    label2id=label2id
).to(device)

logging_steps = len(tokenized_dataset['train']) // train_batch_size
model_name = f"{model_ckpt}-finetuned-metaphor"

training_args = TrainingArguments(
    output_dir = model_name,
    overwrite_output_dir = True,
    num_train_epochs = num_train_epoch,
    learning_rate = learning_rate,
    per_device_train_batch_size = train_batch_size,
    per_device_eval_batch_size = eval_batch_size,
    weight_decay = 0.01,
    warmup_steps = warmup_steps,
    evaluation_strategy = "no",
    #save_strategy = "epoch",
    disable_tqdm = False,
    logging_steps = logging_steps,
    push_to_hub = False,
    log_level = "error",
    #load_best_model_at_end = True,
    metric_for_best_model = "f1",
    greater_is_better = True,
    seed = seed
)

trainer = Trainer(
    model = model,
    args = training_args,
    compute_metrics = compute_metrics,
    train_dataset = tokenized_dataset['train'],
    eval_dataset = tokenized_dataset['test'], 
    data_collator = data_collator,
    tokenizer = tokenizer
)

trainer.train();

# Evaluacija modela za klasifikaciju tokena
eval_results = trainer.evaluate()
print(eval_results)


## Prikaz krivulje ROC

In [None]:
# Krivulja ROC
predictions_output = trainer.predict(tokenized_dataset['test'])
logits = predictions_output.predictions
labels = predictions_output.label_ids

# Flatten predictions and labels, and filter out -100
flat_logits = logits.reshape(-1, logits.shape[-1])
flat_labels = labels.reshape(-1)

valid_indices = flat_labels != -100
valid_flat_labels = flat_labels[valid_indices]
valid_flat_logits = flat_logits[valid_indices]

y_scores = torch.softmax(torch.tensor(valid_flat_logits), dim=1)[:, 1].numpy()

fpr, tpr, thresholds = roc_curve(valid_flat_labels, y_scores)
roc_auc = eval_results.get('eval_roc_auc', None) 

plt.figure(figsize=(10, 8))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC krivulja (AUC = {roc_auc:.4f})' if roc_auc else 'ROC curve')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Stopa lažno pozitivnih rezultata')
plt.ylabel('Stopa točno pozitivnih rezultata')
plt.title('Krivulja ROC - stopa učenja 3e-5')
plt.legend(loc="lower right")
#plt.savefig('roc_curve_3e5-7.png') 
plt.show()



## Analiza grešaka

In [None]:
def forward_pass_with_label(batch):
    # Convert dict of lists to list of dicts suitable for data collator
    features = [dict(zip(batch, t)) for t in zip(*batch.values())]
    # Pad inputs and labels and put all tensors on device
    batch = data_collator(features)
    id = batch["id"].to(device)
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)
    labels = batch["labels"].to(device)
    with torch.no_grad():
        # Pass data through model
        output = trainer.model(input_ids, attention_mask)
        # Logit.size: [batch_size, sequence_length, classes]
        # Predict class with largest logit value on classes axis
        predicted_label = torch.argmax(output.logits, axis=-1).cpu().numpy()

    return {"predicted_label": predicted_label}


# hide_output
valid_set = tokenized_dataset["test"]
valid_set = valid_set.map(forward_pass_with_label, batched=True, batch_size=32)
df = valid_set.to_pandas()

# hide_output
id2label[-100] = "0"
df["input_tokens"] = df["input_ids"].apply(
    lambda x: tokenizer.convert_ids_to_tokens(x))
df["predicted_label"] = df["predicted_label"].apply(
    lambda x: [id2label[i] for i in x])
df["labels"] = df["labels"].apply(
    lambda x: [id2label[i] for i in x])
df['predicted_label'] = df.apply(
    lambda x: x['predicted_label'][:len(x['input_ids'])], axis=1)

df.to_csv('results/xlm-r-bertic-7-3e5.csv')


## Ispis broja parametara modela

In [None]:
total_params = sum(
	param.numel() for param in model.parameters()
)

print(f"Broj parametara modela: {total_params}")