### Google Colab Drive Mounting and Directory Change:

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Change directory to the project directory

import os
os.chdir('/content/drive/MyDrive/')

### Installation of Packages

In [None]:
!pip install accelerate -U

In [None]:
!pip install transformers

In [None]:
!pip install datasets

In [None]:
!pip install evaluate


### Data Processing and Experiment Automation Functions:

In [None]:
import os.path as osp
import re
import datetime
from collections import Counter

import pandas as pd
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, Trainer, \
    TrainingArguments
from datasets import Dataset
import evaluate

from masked_absa_concat_model import MaskedABSAConcatModel


def tokenize(example, tokenizer):
    st = list(map(lambda e: re.sub(re.escape(e[1]), '[MASK]', e[0]), zip(example['Subtext'], example['Entity'])))

    return tokenizer(st, return_token_type_ids=True, padding='max_length', truncation=True)


def map_label(example, labels_dict):
    example['label'] = labels_dict[example['label']]
    return example


def lowercase(example):
    example['Subtext'] = example['Subtext'].lower()
    example['Entity'] = example['Entity'].lower()
    return example


def remove_special_characters(example):
    example['Subtext'] = re.sub(r'[®°™£]', '', example['Subtext'])
    example['Entity'] = re.sub(r'[®°™£]', '', example['Entity'])
    return example


def remove_whitespace_code(example):
    example['Subtext'] = re.sub(r'_x000D_', '', example['Subtext'])


def load_and_process_data(ds_type, label_name, tokenizer, cased=False) -> Dataset:
    """
        Data loading and preprocessing. Does lowercase if necessary, label mapping, special character removal
        and tokenization
        @param ds_type: dataset type (train, validation, test)
        @param use_QA: flag whether to use QA or just target for second sequence
        @param label_name: original name of the label column
        @param tokenizer: pretrained tokenizer
        @param cased: whether the model was trained on cased or uncased text
        @return: processed HuggingFace dataset
        """
    if ds_type == 'train':
        df = pd.read_csv("dataset/train_dataset.csv", sep=';', encoding='utf-8')
        y = df[label_name]
        X = df.drop([label_name], axis=1)
        X[label_name] = y
        dataset = Dataset.from_pandas(X)
    else:
        dataset = Dataset.from_csv(f'dataset/{ds_type}_dataset.csv', sep=';', encoding='utf-8')

    #if not cased:
        #dataset = dataset.map(lowercase)
    dataset = dataset.rename_column(original_column_name=label_name, new_column_name='label')
    labels_dict = {'negativ': 0, 'neutral': 1, 'positiv': 2, 'ambivalent': 1}   # ambivalent class removed, samples merged with neutral
    dataset = dataset.map(lambda e: map_label(e, labels_dict))
    dataset = dataset.map(remove_whitespace_code)
    dataset = dataset.map(remove_special_characters)
    dataset = dataset.filter(lambda e: e['Entity'] in e['Subtext'])
    dataset = dataset.map(lambda s: tokenize(s, tokenizer), batched=True)

    return dataset




def experiments_automation() -> None:
    """
        Automates the experiments so multiple experiments can be run one after the other.
    """
    checkpoint_configs = [('bert-base-german-cased', True, 16, 4)]
    model_types = ['concat']
    experiment_name = 'target_sentiment_DE'
    for model_type in model_types:
        for checkpoint, cased, batch_size, accumulation_steps in checkpoint_configs:
            training_experiment(checkpoint=checkpoint,
                                experiment_name=experiment_name,
                                cased=cased,
                                batch_size=batch_size,
                                accumulation_steps=accumulation_steps,
                                use_custom_model=True,
                                model_type=model_type)


### Training Experiment Function:

In [None]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm
from collections import Counter
from sklearn.metrics import f1_score
from transformers import TrainingArguments, Trainer, DataCollatorWithPadding

def training_experiment(checkpoint, num_labels=3, experiment_name='Default', cased=False,
                        batch_size=16, accumulation_steps=4, use_custom_model=False, model_type='') -> None:
    """
    The full training experiment: data loading and preprocessing, model instantiation, training and evaluation.
    @param checkpoint: Huggingface checkpoint for fine-tuning
    @param num_labels: number of labels in target
    @param experiment_name: name of the experiment
    @param cased: whether the checkpoint was trained on cased or uncased text
    @param batch_size: batch size for training
    @param accumulation_steps: number of steps for gradient accumulation
    @param use_custom_model: whether to use Huggingface AutoModel or custom model that uses the base from Huggingface
    @param model_type: type of the model (base for a regular classification model, mul and concat for TD-BERT variations)
    """
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    target_sequence = 'entity'
    run_name = f'{model_type}-{target_sequence}-{checkpoint}'

    tokenizer = AutoTokenizer.from_pretrained(checkpoint)

    train = load_and_process_data('train', label_name='Sentiments', tokenizer=tokenizer, cased=cased)
    validation = load_and_process_data('validation', label_name='Sentiments', tokenizer=tokenizer, cased=cased)
    test = load_and_process_data('test', label_name='Sentiments', tokenizer=tokenizer, cased=cased)
    test = test.remove_columns([c for c in test.column_names if c not in
                                ['input_ids', 'token_type_ids', 'attention_mask', 'label']])

    label_counts = Counter(train['label'])
    class_weights = [max(label_counts.values()) / label_counts[cls] for cls in sorted(set(train['label']))]

    custom_models = {'concat': MaskedABSAConcatModel}
    if use_custom_model and model_type in custom_models:
        cls_id, mask_id, sep_id = tokenizer.encode('[MASK]')
        model = custom_models[model_type](checkpoint, num_labels=num_labels, class_weights=class_weights,
                                          cls_id=cls_id, sep_id=sep_id, mask_id=mask_id)
    else:
        model = AutoModelForSequenceClassification.from_pretrained(checkpoint, num_labels=num_labels)

    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    model_dir = osp.join('models', 'custom', model_type + '1', '_'.join([experiment_name, run_name]))
    print(f'model_dir: {model_dir}')
    training_args = TrainingArguments(
        model_dir,
        per_device_train_batch_size=batch_size,
        num_train_epochs=1,
        evaluation_strategy='steps',
        logging_steps=200,
        save_steps=200,
        save_total_limit=10,
        load_best_model_at_end=True,
        gradient_accumulation_steps=accumulation_steps,
        metric_for_best_model='eval_loss',
        resume_from_checkpoint=os.path.join('models/custom/concat1/target_sentiment_DE_concat-entity-bert-base-german-cased/', 'checkpoint-5600')
    )
    trainer = Trainer(
        model,
        training_args,
        train_dataset=train,
        eval_dataset=validation,
        data_collator=data_collator,
        tokenizer=tokenizer
    )
    try:
        trainer.train(resume_from_checkpoint=True)
    except ValueError:
        trainer.train()

    trainer.save_model("models/custom/concat1/target_sentiment_DE_concat-entity-bert-base-german-cased/")
    #torch.save(model, "models/bestmodel.pth")
    torch.save(model.state_dict(), "models/bestmodel_state_dict.pth")
    
    # Evaluation

    model.eval()
    test.set_format('torch')
    # Load metrics for evaluation
    metrics = {
        'precision': evaluate.load('precision'),
        'recall': evaluate.load('recall'),
        'f1': evaluate.load('f1')
    }
    train_predictions = trainer.predict(train).predictions.argmax(axis=1)
    train_labels = train['label']
    train_f1 = f1_score(train_labels, train_predictions, average='weighted')

    # Calculate F1 score for testing data
    test_predictions = trainer.predict(test).predictions.argmax(axis=1)
    test_labels = test['label']
    test_f1 = f1_score(test_labels, test_predictions, average='weighted')

    # Print or log the F1 scores
    print(f'Training F1 Score: {train_f1}')
    print(f'Testing F1 Score: {test_f1}')


    dataloader = DataLoader(test, batch_size=64)
    for batch in tqdm(dataloader):
        with torch.no_grad():
            outputs = model(**{k: v.to(device) for k, v in batch.items()
                               if k in ['input_ids', 'token_type_ids', 'attention_mask']}).logits.argmax(dim=1)
        for v in metrics.values():
            v.add_batch(predictions=outputs,
                        references=batch['label'])
    labels = ['negative', 'neutral', 'positive']
    # Print metrics (replace with your preferred logging or result handling)
    for k, v in metrics.items():
        metric_values = v.compute(average=None)
        for i, mv in enumerate(metric_values[k]):
            print(f'{k}.{labels[i]}: {mv}')



### Running the Training Experiment:

In [None]:
#checkpoint_path = 'bert-base-german-cased'
checkpoint_path='bert-base-german-cased'
# Run the training experiment
training_experiment(checkpoint=checkpoint_path ,
                    experiment_name='target_sentiment_DE_long',
                    cased=True,
                    batch_size=16,
                    accumulation_steps=4,
                    use_custom_model=True,
                    model_type='concat')