## BERT

In [None]:
from abc import ABC, abstractmethod
import pandas as pd


MAIN_DIR = "/kaggle/working/"


class Model(ABC):
    @abstractmethod
    def fit(self, *args) -> None:
        pass

    @abstractmethod
    def evaluate(self, *args) -> float:
        pass

    @abstractmethod
    def make_plots(self, *args) -> None:
        pass

    @abstractmethod
    def create_confusion_matrix(self, *args) -> None:
        pass

### Callbacks

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, Callback
import numpy as np
from sklearn.metrics import f1_score


class F1ScoreCallbackBert(Callback):
    def __init__(self, val_dataset):
        val_data = list(val_dataset.as_numpy_iterator())
        
        self.x_val = {key: np.concatenate([batch[0][key] for batch in val_data]) for key in val_data[0][0].keys()}
        self.y_val = np.concatenate([batch[1] for batch in val_data])
        self.f1_scores = []

    def on_epoch_end(self, epoch, logs=None):
        y_pred_logits = self.model.predict(self.x_val).logits
        
        y_pred = np.argmax(y_pred_logits, axis=1)
        
        f1 = f1_score(self.y_val, y_pred, average='weighted')
        self.f1_scores.append(f1)

    def _implements_train_batch_hooks(self):
        return False

    def _implements_test_batch_hooks(self):
        return False

    def _implements_predict_batch_hooks(self):
        return False

### Imports

In [None]:
from platform import win32_edition

import pandas as pd

import tensorflow as tf
import numpy as np
from imblearn.over_sampling import RandomOverSampler

from transformers import TFAutoModelForSequenceClassification, AutoTokenizer
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from transformers import TFAutoModelForSequenceClassification
from sklearn.utils.class_weight import compute_class_weight

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils.class_weight import compute_class_weight
from tensorflow.python.data.ops.dataset_ops import DatasetV2
from tensorflow.keras.callbacks import History

import matplotlib.pyplot as plt
import seaborn as sns

from tensorflow.keras.callbacks import EarlyStopping, Callback
import numpy as np
from sklearn.metrics import f1_score

### Model Parameters Description

- **model**: The pre-trained model that will be fine-tuned.
- **tokenizer**: Loads a tokenizer that is compatible with the model type (WordPiece Tokenization). It tokenizes sub-words.

In [None]:
MODEL_TYPE = "bert-base-uncased"

class BertModel():

    tokenizer: PreTrainedTokenizerFast
    history: History
    cm: list[list[float]] | None = None

    def __init__(self, model_type: str, model_name: str) -> None:
        self.model_name = model_name
        self.model_type = model_type
        self.model = TFAutoModelForSequenceClassification.from_pretrained(model_type, num_labels=4)
        self.tokenizer = AutoTokenizer.from_pretrained(model_type)

### Data preprocessing

In [None]:
def tokenize_data(self, data: list[list[str]], max_length: int):
        return self.tokenizer(
            data,
            max_length=max_length,
            padding='max_length',
            truncation=True
        )

### Training and precictions

Podczas trenowania modelu, zostały przetestowane dwa różne podejścia, lecz niestety nie przyniosły one spodziewanych rezultatów.

- Tak jak w poprzednich modelach zostały najpierw użyte wagi, aby wyrównać nierówność klas, lecz model ten nie reagował na to podejście
- Po wielu nieudanych próbach użycia wag, podejście zostało zmienione na oversampling danych, lecz niestety także i to okazało się bezskuteczne

In [None]:
def fit(self, train_dataset: DatasetV2, val_dataset: DatasetV2, class_weights_dict: dict | None = None) -> None:
        # labels_array = []
        # for _, label in train_dataset:
        #     labels_array.extend(label.numpy())  # Get labels from tensor
    
        # Converting to numpy
        # labels_array = np.array(labels_array)
    
        # Calculating weights
        # class_weights = compute_class_weight(
        #     class_weight='balanced',
        #     classes=np.unique(labels_array),
        #     y=labels_array
        # )
    
        # Convert to dict and use them in model
        # class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}
        # class_weights_dict = {0: 0.4, 1: 1.1, 2: 0.9, 3: 11}

        print("Class weights:", class_weights_dict)
    
        self.model.compile(
            optimizer='adam',
            loss=SparseCategoricalCrossentropy(from_logits=True),
            metrics=['accuracy']
        )
    
        f1_callback = F1ScoreCallbackBert(val_dataset)
    
        self.history = self.model.fit(
            train_dataset,
            validation_data=val_dataset,
            epochs=30,
            callbacks=[f1_callback]
        )
    
        self.f1_scores = f1_callback.f1_scores



def predict(self, to_predict: list[list[str]]) -> np.ndarray:
        tokenized_data = self.tokenize_data(to_predict, max_length=250)
        test_dataset = tf.data.Dataset.from_tensor_slices(dict(tokenized_data)).batch(8)
        logits = self.model.predict(test_dataset).logits

        # Konwersja logitów na etykiety (największa wartość logitu dla każdej próbki)
        return tf.argmax(logits, axis=1).numpy()

### Making plots and evaluation

In [None]:
def evaluate(self, test_dataset: DatasetV2) -> None:
        loss, accuracy = self.model.evaluate(test_dataset)
        print(f"loss: {loss}, accuracy: {accuracy}")


def create_confusion_matrix(self, x_test: list[list[str]], y_test: np.ndarray) -> None:
        y_pred = self.predict(x_test)
        self.cm = confusion_matrix(y_test, y_pred)


def make_plots(self) -> None:
        history = self.history.history

        plt.figure(figsize=(18, 6))
        plt.subplot(1, 2, 1)
        plt.plot(history['accuracy'], label='Train Accuracy')
        plt.plot(history['val_accuracy'], label='Validation Accuracy')
        plt.title('Accuracy Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()

        plt.subplot(1, 2, 2)
        plt.plot(history['loss'], label='Train Loss')
        plt.plot(history['val_loss'], label='Validation Loss')
        plt.title('Loss Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()

        plt.subplot(1, 3, 3)
        plt.plot(self.f1_scores, label='Validation F1-score')
        plt.title('F1-score Over Epochs')
        plt.xlabel('Epoch')
        plt.ylabel('F1-score')
        plt.legend()

        plt.tight_layout()
        plt.savefig(f"{MAIN_DIR}{self.model_name}-accuracy.png")

        if self.cm is not None:
            plt.figure(figsize=(8, 6))
            labels = list(self.model.config.id2label.values()) if hasattr(self.model.config, "id2label") else None
            sns.heatmap(self.cm, annot=True, fmt='d', cmap='Blues',
                        xticklabels=['legal', 'spam', 'phishing', 'fraud'],
                        yticklabels=['legal', 'spam', 'phishing', 'fraud'])
            plt.xlabel('Predicted')
            plt.ylabel('True')
            plt.title('Confusion Matrix')
            plt.savefig(f"{MAIN_DIR}{self.model_name}-confusion-matrix.png")
        else:
            print("Cannot save confusion-matrix, because it was not previously created")

        with open(f"{MAIN_DIR}{self.model_name}-final-metrics.txt", "w") as file:
            file.write(f"Accuracy: {history['accuracy'][-1]}\n")
            file.write(f"F1-Score: {self.f1_scores[-1]}\n")

### Running model

In [None]:
def train_bert_model(self, train_data: pd.DataFrame, train_labels: pd.DataFrame, max_length: int) -> None:
        X_train, X_temp, Y_train, Y_temp = train_test_split(
            train_data.values.tolist(), 
            train_labels.values.tolist(), 
            test_size=0.3, 
            random_state=28
        )
        X_val, X_test, Y_val, Y_test = train_test_split(
            X_temp, Y_temp, test_size=0.5, random_state=28
        )
    
        # Oversampling
        oversampler = RandomOverSampler(random_state=40)
        X_train_resampled, Y_train_resampled = oversampler.fit_resample(np.array(X_train).reshape(-1, 1), Y_train)
        X_val_resampled, Y_val_resampled = oversampler.fit_resample(np.array(X_val).reshape(-1, 1), Y_val)

        
        X_train_resampled = X_train_resampled.flatten().tolist()
        X_val_resampled = X_val_resampled.flatten().tolist()
        # Data tokenization
        X_test_df: list[list[str]] = X_test
        X_train, X_val, X_test = (
            self.tokenize_data(X_train_resampled, max_length),
            self.tokenize_data(X_val_resampled, max_length),
            self.tokenize_data(X_test, max_length)
        )
    
        train_dataset = tf.data.Dataset.from_tensor_slices((dict(X_train), Y_train_resampled)).batch(8)
        val_dataset = tf.data.Dataset.from_tensor_slices((dict(X_val), Y_val_resampled)).batch(8)
        test_dataset = tf.data.Dataset.from_tensor_slices((dict(X_test), Y_test)).batch(8)
    
        # Calculating weights
        # class_weights = compute_class_weight(
        #     class_weight='balanced', 
        #     classes=np.unique(Y_train_resampled), 
        #     y=Y_train_resampled
        # )
        # class_weights_dict = {i: weight for i, weight in enumerate(class_weights)}
    
        self.fit(train_dataset, val_dataset)
    
        self.evaluate(test_dataset)
        self.create_confusion_matrix(X_test_df, np.array(Y_test))
        self.make_plots()




MODELS_PARAMS = [
    {"model_name": "only-body", "dataset": "/kaggle/input/final-2-csv/final.csv", "add_subject": False, "add_domain": False},
    {"model_name": "stop-words-body", "dataset": "/kaggle/input/final-2-csv/final-with-stop-words.csv", "add_subject": False, "add_domain": False},
    {"model_name": "body-subject-stop", "dataset": "/kaggle/input/final-2-csv/final-with-stop-words.csv", "add_subject": True, "add_domain": False},
    {"model_name": "body-domain-stop", "dataset": "/kaggle/input/final-2-csv/final-with-stop-words-domain-only.csv", "add_subject": False, "add_domain": True},
    {"model_name": "full-data-stop", "dataset": "/kaggle/input/final-2-csv/final-with-stop-words-domain-only.csv", "add_subject": True, "add_domain": True}
]


for model_param in MODELS_PARAMS:
    data = pd.read_csv(f"/kaggle/input/final-2-csv/final.csv").sample(n=16000, random_state=29)
    body = data['body']
    labels = data['label']
    max_length = 250
    
    bert_model = BertModel(MODEL_TYPE, model_param["model_name"])
    bert_model.train_bert_model(body, labels, 250)
    bert_model.save_model()