In [None]:
# Imports

import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import BertTokenizerFast, BertForTokenClassification, AdamW
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from tqdm import tqdm
from sklearn.model_selection import KFold
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Preprocessing the data: Loading, tokenizing, and labeling the text data

def load_data(filepath):
    with open(filepath, 'r') as f:
        lines = f.readlines()

    tokens, labels = [], []
    for line in lines[5:]:
        line = line.strip()
        if not line or line.startswith("#Text="):
            continue
        parts = line.split()
        if len(parts) != 5:
            continue
        _, _, token, entity_label, chunk_label = parts
        if entity_label == "Animated":
            label = chunk_label
            if label == "B":
                label = "B-Animated"
            elif label == "I":
                label = "I-Animated"
        else:
            label = "O"
        tokens.append(token)
        labels.append(label)

    return tokens, labels

In [None]:
# Set up paths and tag values, load and split data, and initialize the German BERT model for token classification

BASE_DIR = '/kaggle/input/animacyba/Metonym/'
NEW_TEST_DIR = '/kaggle/input/bttestset'
TWITTER_TEST_DIR = '/kaggle/input/twitterset/Twitter'
TORE_TEST_DIR = '/kaggle/input/toredataset/'

tokenizer = BertTokenizerFast.from_pretrained('bert-base-german-cased')
tag_values = ["B-Animated", "I-Animated", "O", "PAD"]
tag2id = {t: i for i, t in enumerate(tag_values)}

all_files = os.listdir(BASE_DIR)
np.random.shuffle(all_files)

train_files = all_files[:20000]
val_files = all_files[20000:22500]
test_files = all_files[22500:]

model = BertForTokenClassification.from_pretrained(
    "bert-base-german-cased",
    num_labels=len(tag2id),
    output_attentions=False,
    output_hidden_states=False
)

In [None]:
# Preprocessing the data for NER and suitable format for BERT

class EntityDataset(Dataset):
    def __init__(self, texts, tags, tag2id, tokenizer):
        self.texts = texts
        self.tags = tags
        self.tag2id = tag2id
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, item):
        text = self.texts[item]
        tags = self.tags[item]

        inputs = self.tokenizer.encode_plus(
            text,
            is_split_into_words=True,
            add_special_tokens=True,
            max_length=128,
            truncation=True,
            padding='max_length',
            return_attention_mask=True
        )

        input_ids = inputs['input_ids']
        attention_mask = inputs['attention_mask']

        labels = []
        for word, label in zip(text, tags):
            tokenized_word = self.tokenizer.tokenize(word)
            n_subwords = len(tokenized_word)
            labels.extend([self.tag2id.get(label, self.tag2id["O"])] * n_subwords)
        labels = labels[:128 - 2]
        labels = [self.tag2id["O"]] + labels + [self.tag2id["O"]]
        labels = labels + (128 - len(labels)) * [self.tag2id["PAD"]]

        return {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'attention_mask': torch.tensor(attention_mask, dtype=torch.long),
            'labels': torch.tensor(labels, dtype=torch.long)
        }

In [None]:
# Loading and preparing the training data

BATCH_SIZE = 32

all_data = []
for file in tqdm(all_files, desc="Loading data"):
    tokens, labels = load_data(os.path.join(BASE_DIR, file))
    all_data.append({"tokens": tokens, "labels": labels})

num_train = int(len(all_data) * 0.8)
num_val = int(len(all_data) * 0.1)
num_test = len(all_data) - num_train - num_val

train_data, val_data, test_data = random_split(all_data, [num_train, num_val, num_test])

train_texts, train_tags = zip(*[(data_dict["tokens"], data_dict["labels"]) for data_dict in train_data])
val_texts, val_tags = zip(*[(data_dict["tokens"], data_dict["labels"]) for data_dict in val_data])
test_texts, test_tags = zip(*[(data_dict["tokens"], data_dict["labels"]) for data_dict in test_data])

train_dataset = EntityDataset(train_texts, train_tags, tag2id, tokenizer)
val_dataset = EntityDataset(val_texts, val_tags, tag2id, tokenizer)
test_dataset = EntityDataset(test_texts, test_tags, tag2id, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
# Training on initial training data with k-fold-cross validation

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

EPOCHS = 2
class_weights = torch.tensor([1, 1, 0.9, 0.1]).to(device)
loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights)

k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True)
results = {}

optimizer = AdamW(model.parameters(), lr=2e-5)

for fold, (train_ids, test_ids) in enumerate(kfold.split(train_dataset)):
    print(f'Validation Fold: {fold}')

    trainloader = torch.utils.data.DataLoader(
        train_dataset, batch_size=10, sampler=torch.utils.data.SubsetRandomSampler(train_ids))
    valloader = torch.utils.data.DataLoader(
        train_dataset, batch_size=10, sampler=torch.utils.data.SubsetRandomSampler(test_ids))

    model.train()
    total_loss = 0

    # Training loop
    for _, data in tqdm(enumerate(trainloader), total=len(trainloader)):
        inputs = {
            "input_ids": data['input_ids'].to(device), 
            "attention_mask": data['attention_mask'].to(device), 
            "labels": data['labels'].to(device)
        }
       
        optimizer.zero_grad()
        outputs = model(**inputs)
        loss = loss_fct(outputs.logits.view(-1, model.config.num_labels), inputs["labels"].view(-1))
        loss.backward()
       
        total_loss += loss.item()
        optimizer.step()
       
    print("Average train loss: {}".format(total_loss / len(trainloader)))

    model.eval()
    eval_loss = 0

    for _, data in enumerate(valloader, 0):
        inputs = {
            "input_ids": data['input_ids'].to(device), 
            "attention_mask": data['attention_mask'].to(device), 
            "labels": data['labels'].to(device)
        }
        with torch.no_grad():
            outputs = model(**inputs)
            eval_loss += outputs[0].item()  # [0] because we just need the loss from the outputs
           
    print("Validation loss: {}".format(eval_loss / len(valloader)))
    results[fold] = eval_loss / len(valloader)

print(f'Results for: {k_folds} folds')
print(f'Average: {sum(results.values())/k_folds}')

for key, value in results.items():
    print(f'Fold {key}: {value}')


In [None]:
# Evaluation on the test set from the training data and creation of confusion matrix

model.eval()

predictions, true_labels = [], []

for batch in test_loader:
    batch = {key: val.to(device) for key, val in batch.items()}
    
    with torch.no_grad():
        outputs = model(**batch)
        
    logits = outputs.logits.detach().cpu().numpy()
    label_ids = batch['labels'].to('cpu').numpy()

    predictions.extend(np.argmax(logits, axis=2))
    true_labels.extend(label_ids)

PAD_TOKEN_ID = tag2id["PAD"]
predictions_no_pad = [pred for pred, true in zip(np.hstack(predictions), np.hstack(true_labels)) if true != PAD_TOKEN_ID]
true_labels_no_pad = [true for true in np.hstack(true_labels) if true != PAD_TOKEN_ID]

accuracy = accuracy_score(true_labels_no_pad, predictions_no_pad)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels_no_pad, predictions_no_pad, average='weighted')

def recode_labels(label_list):
    return [0 if label == tag2id["O"] else 1 for label in label_list]

recoded_predictions = recode_labels(predictions_no_pad)
recoded_true_labels = recode_labels(true_labels_no_pad)
recoded_cm = confusion_matrix(recoded_true_labels, recoded_predictions)

plt.figure(figsize=(10, 7))
sns.heatmap(recoded_cm, annot=True, fmt='d')
plt.title('Recoded Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

id2tag = {i: t for t, i in tag2id.items()}
num_samples_to_display = 10
for i in range(num_samples_to_display):
    print(f"Text: {tokenizer.decode(test_loader.dataset[i]['input_ids'])}")
    print(f"True labels: {' '.join([id2tag[id] for id in true_labels[i]])}")
    print(f"Predicted labels: {' '.join([id2tag[id] for id in predictions[i]])}")
    print("\n")


In [None]:
# Evaluation on the seen test set of german bundestag debates and creation of confusion matrix

model = model.to(device)

test_files = os.listdir(NEW_TEST_DIR)
test_data = []
for file in tqdm(test_files, desc="Loading test data"):
    tokens, labels = load_data(os.path.join(NEW_TEST_DIR, file))
    test_data.append({"tokens": tokens, "labels": labels})

test_texts, test_tags = zip(*[(data_dict["tokens"], data_dict["labels"]) for data_dict in test_data])
test_dataset = EntityDataset(test_texts, test_tags, tag2id, tokenizer)
testbt_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

model.eval()
bt_predictions, bt_true_labels = [], []

for batch in testbt_loader:
    batch = {key: val.to(device) for key, val in batch.items()}

    with torch.no_grad():
        outputs = model(**batch)

    logits = outputs.logits.detach().cpu().numpy()
    label_ids = batch['labels'].to('cpu').numpy()
    
    bt_predictions.extend(np.argmax(logits, axis=2))
    bt_true_labels.extend(label_ids)

PAD_TOKEN_ID = tag2id["PAD"]
bt_predictions_no_pad = [pred for pred, true in zip(np.hstack(bt_predictions), np.hstack(bt_true_labels)) if true != PAD_TOKEN_ID]
bt_true_labels_no_pad = [true for true in np.hstack(bt_true_labels) if true != PAD_TOKEN_ID]

bt_accuracy = accuracy_score(bt_true_labels_no_pad, bt_predictions_no_pad)
bt_precision, bt_recall, bt_f1, _ = precision_recall_fscore_support(bt_true_labels_no_pad, bt_predictions_no_pad, average='weighted', zero_division=1)

bt_recoded_predictions = recode_labels(bt_predictions_no_pad)
bt_recoded_true_labels = recode_labels(bt_true_labels_no_pad)
bt_recoded_cm = confusion_matrix(bt_recoded_true_labels, bt_recoded_predictions)

plt.figure(figsize=(10, 7))
sns.heatmap(bt_recoded_cm, annot=True, fmt='d')
plt.title('Recoded Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

id2tag = {i: t for t, i in tag2id.items()}
num_samples_to_display = 10
for i in range(num_samples_to_display):
    print(f"Text: {tokenizer.decode(testbt_loader.dataset[i]['input_ids'])}")
    print(f"True labels: {' '.join([id2tag[id] for id in bt_true_labels[i]])}")
    print(f"Predicted labels: {' '.join([id2tag[id] for id in bt_predictions[i]])}")
    print("\n")

In [None]:
# Evaluation on the unseen test set of Tweets and creation of confusion matrix

model = model.to(device)
test_files_path = os.listdir(TWITTER_TEST_DIR)
tw_test_data = []

for file in tqdm(test_files_path, desc="Loading test data"):
    file_tokens, file_labels = load_data(os.path.join(TWITTER_TEST_DIR, file))
    tw_test_data.append({"tokens": file_tokens, "labels": file_labels})

tw_test_text, tw_test_tags = zip(*[(data["tokens"], data["labels"]) for data in tw_test_data])
tw_test_dataset = EntityDataset(tw_test_text, tw_test_tags, tag2id, tokenizer)
tw_test_dataloader = DataLoader(tw_test_dataset, batch_size=BATCH_SIZE)

model.eval()

tw_predictions, tw_true_labels = [], []

for batch in tw_test_dataloader:
    batch = {key: val.to(device) for key, val in batch.items()}

    with torch.no_grad():
        outputs = model(**batch)

    logits = outputs.logits.detach().cpu().numpy()
    label_ids = batch['labels'].to('cpu').numpy()

    tw_predictions.extend(np.argmax(logits, axis=2))
    tw_true_labels.extend(label_ids)

PAD_TOKEN_ID = tag2id["PAD"]
tw_predictions_no_pad = [pred for pred, true in zip(np.hstack(tw_predictions), np.hstack(tw_true_labels)) if true != PAD_TOKEN_ID]
tw_true_labels_no_pad = [true for true in np.hstack(tw_true_labels) if true != PAD_TOKEN_ID]

tw_accuracy = accuracy_score(tw_true_labels_no_pad, tw_predictions_no_pad)
tw_precision, tw_recall, tw_f1, _ = precision_recall_fscore_support(tw_true_labels_no_pad, tw_predictions_no_pad, average='weighted', zero_division=1)

tw_recoded_predictions = recode_labels(tw_predictions_no_pad)
tw_recoded_true_labels = recode_labels(tw_true_labels_no_pad)
tw_recoded_cm = confusion_matrix(tw_recoded_true_labels, tw_recoded_predictions)

plt.figure(figsize=(10, 7))
sns.heatmap(tw_recoded_cm, annot=True, fmt='d')
plt.title('Recoded Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

id2tag = {i: t for t, i in tag2id.items()}
num_samples_to_display = 10
for i in range(num_samples_to_display):
    print(f"Text: {tokenizer.decode(tw_test_dataloader.dataset[i]['input_ids'])}")
    print(f"True labels: {' '.join([id2tag[id] for id in tw_true_labels[i]])}")
    print(f"Predicted labels: {' '.join([id2tag[id] for id in tw_predictions[i]])}")
    print("\n")

In [None]:
# Evaluation on the unseen test set of german bundestag debates and creation of confusion matrix

model = model.to(device)
test_files_path = os.listdir(NEW_TEST_DIR)
test_data = []

for file in tqdm(test_files_path, desc="Loading test data"):
    file_tokens, file_labels = load_data(os.path.join(TEST_DIR, file))
    test_data.append({"tokens": file_tokens, "labels": file_labels})

# Prepare test data for DataLoader
test_text, test_tags = zip(*[(data["tokens"], data["labels"]) for data in test_data])
test_dataset = EntityDataset(test_text, test_tags, tag2id, tokenizer)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

model.eval()
predictions, true_labels = [], []

for batch in test_dataloader:
    batch = {key: val.to(device) for key, val in batch.items()}

    with torch.no_grad():
        outputs = model(**batch)

    logits = outputs.logits.detach().cpu().numpy()
    label_ids = batch['labels'].to('cpu').numpy()

    predictions.extend(np.argmax(logits, axis=2))
    true_labels.extend(label_ids)

PAD_TOKEN_ID = tag2id["PAD"]
predictions_no_pad = [pred for pred, true in zip(np.hstack(predictions), np.hstack(true_labels)) if true != PAD_TOKEN_ID]
true_labels_no_pad = [true for true in np.hstack(true_labels) if true != PAD_TOKEN_ID]

accuracy = accuracy_score(true_labels_no_pad, predictions_no_pad)
precision, recall, f1, _ = precision_recall_fscore_support(true_labels_no_pad, predictions_no_pad, average='weighted', zero_division=1)

recoded_predictions = recode_labels(predictions_no_pad)
recoded_true_labels = recode_labels(true_labels_no_pad)
recoded_cm = confusion_matrix(recoded_true_labels, recoded_predictions)

plt.figure(figsize=(10, 7))
sns.heatmap(recoded_cm, annot=True, fmt='d')
plt.title('Recoded Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()

id2tag = {i: t for t, i in tag2id.items()}
num_samples_to_display = 10
for i in range(num_samples_to_display):
    print(f"Text: {tokenizer.decode(test_dataloader.dataset[i]['input_ids'])}")
    print(f"True labels: {' '.join([id2tag[id] for id in true_labels[i]])}")
    print(f"Predicted labels: {' '.join([id2tag[id] for id in predictions[i]])}")
    print("\n")
