# BERT

## Imports

In [None]:
import matplotlib.pyplot as plt
import optuna
import torch
import torch.nn as nn
from datasets import load_dataset
from seqeval.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score
from torch.nn.utils.rnn import pad_sequence
from torch.optim import AdamW
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer

## Dataset

In [None]:
# Load the CoNLL-2003 dataset
dataset = load_dataset("conll2003")
label_list = dataset["train"].features["ner_tags"].feature.names
num_labels = len(label_list)

In [None]:
# Tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")


# Tokenization and alignment
def tokenize_and_align_labels(batch):
    tokenized_inputs = tokenizer(batch["tokens"], truncation=True, is_split_into_words=True)
    labels = []
    for i, label in enumerate(batch["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        aligned_labels = [-100 if word_id is None else label[word_id] for word_id in word_ids]
        labels.append(aligned_labels)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs


tokenized_datasets = dataset.map(tokenize_and_align_labels, batched=True)

In [None]:
# DataLoader collate function
def collate_fn(batch):
    input_ids = [torch.tensor(x["input_ids"]) for x in batch]
    attention_mask = [torch.tensor(x["attention_mask"]) for x in batch]
    labels = [torch.tensor(x["labels"]) for x in batch]

    input_ids_padded = pad_sequence(input_ids, batch_first=True, padding_value=tokenizer.pad_token_id)
    attention_mask_padded = pad_sequence(attention_mask, batch_first=True, padding_value=0)
    labels_padded = pad_sequence(labels, batch_first=True, padding_value=-100)

    return {
        "input_ids": input_ids_padded,
        "attention_mask": attention_mask_padded,
        "labels": labels_padded,
    }


train_loader = DataLoader(tokenized_datasets["train"], batch_size=128, collate_fn=collate_fn)
val_loader = DataLoader(tokenized_datasets["validation"], batch_size=128, collate_fn=collate_fn)

## Model

In [None]:
class BertForNERWithLayerAttention(nn.Module):
    def __init__(self, pretrained_model_name, num_labels):
        super(BertForNERWithLayerAttention, self).__init__()
        self.bert = AutoModel.from_pretrained(pretrained_model_name, output_hidden_states=True)
        self.num_hidden_layers = self.bert.config.num_hidden_layers + 1
        self.layer_weights = nn.Parameter(torch.ones(self.num_hidden_layers))
        self.dropout = nn.Dropout(0.3)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        hidden_states = torch.stack(outputs.hidden_states, dim=0)  # (num_layers, batch_size, seq_len, hidden_size)
        weighted_hidden_states = torch.sum(self.layer_weights[:, None, None, None] * hidden_states, dim=0)
        sequence_output = self.dropout(weighted_hidden_states)
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = logits.view(-1, logits.size(-1))[active_loss]
            active_labels = labels.view(-1)[active_loss]
            loss = loss_fn(active_logits, active_labels)

        return {"loss": loss, "logits": logits}

In [None]:
# Instantiate model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertForNERWithLayerAttention(pretrained_model_name="bert-base-cased", num_labels=num_labels).to(device)
# Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

## Training

In [None]:
# Training Loop
epochs = 3
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}"):
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs["loss"]
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch + 1} Loss: {total_loss / len(train_loader)}")

## Evaluation

In [None]:
model.eval()
predictions, true_labels = [], []
with torch.no_grad():
    for batch in tqdm(val_loader, desc="Evaluating"):
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=-1)

        for pred, label in zip(preds.cpu().numpy(), labels.cpu().numpy()):
            predictions.append([label_list[p] for p, l in zip(pred, label) if l != -100])
            true_labels.append([label_list[l] for p, l in zip(pred, label) if l != -100])

print(classification_report(true_labels, predictions))

## Hyperparameter Tuning

### Optuna Study

In [None]:
def objective(trial):
    # Suggest hyperparameters
    learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 1e-3)
    dropout_rate = trial.suggest_uniform("dropout_rate", 0.1, 0.5)
    batch_size = 64
    num_epochs = 3

    # Define model with trial's hyperparameters
    class BertForNER(nn.Module):
        def __init__(self, pretrained_model_name, num_labels):
            super(BertForNER, self).__init__()
            self.bert = AutoModel.from_pretrained(pretrained_model_name, output_hidden_states=True)
            self.num_hidden_layers = self.bert.config.num_hidden_layers + 1
            self.layer_weights = nn.Parameter(torch.ones(self.num_hidden_layers))
            self.dropout = nn.Dropout(dropout_rate)  # Use trial's dropout rate
            self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

        def forward(self, input_ids, attention_mask, labels=None):
            outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
            hidden_states = torch.stack(outputs.hidden_states, dim=0)  # (num_layers, batch_size, seq_len, hidden_size)
            weighted_hidden_states = torch.sum(self.layer_weights[:, None, None, None] * hidden_states, dim=0)
            sequence_output = self.dropout(weighted_hidden_states)
            logits = self.classifier(sequence_output)

            loss = None
            if labels is not None:
                loss_fn = nn.CrossEntropyLoss()
                active_loss = attention_mask.view(-1) == 1
                active_logits = logits.view(-1, logits.size(-1))[active_loss]
                active_labels = labels.view(-1)[active_loss]
                loss = loss_fn(active_logits, active_labels)

            return {"loss": loss, "logits": logits}

    # Initialize model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = BertForNER(pretrained_model_name="bert-base-cased", num_labels=num_labels).to(device)

    # Define optimizer
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # Prepare data loaders
    train_loader = DataLoader(tokenized_datasets["train"], batch_size=batch_size, collate_fn=collate_fn)
    val_loader = DataLoader(tokenized_datasets["validation"], batch_size=batch_size, collate_fn=collate_fn)

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        for batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}"):
            optimizer.zero_grad()
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs["loss"]
            loss.backward()
            optimizer.step()

    # Evaluation
    model.eval()
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            logits = outputs["logits"]
            preds = torch.argmax(logits, dim=-1)

            for pred, label in zip(preds.cpu().numpy(), labels.cpu().numpy()):
                predictions.append([label_list[p] for p, l in zip(pred, label) if l != -100])
                true_labels.append([label_list[l] for p, l in zip(pred, label) if l != -100])

    # Compute F1-score
    f1 = f1_score(
        [label for seq in true_labels for label in seq],
        [label for seq in predictions for label in seq],
        average="weighted",
    )
    return f1

In [None]:
# Create Optuna study
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=20)  # Try 20 different combinations

# Print best parameters
print("Best hyperparameters:", study.best_params)

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, classification_report, f1_score
from tqdm import tqdm
from transformers import AdamW


# Define the final model
class BertForFinalNER(nn.Module):
    def __init__(self, pretrained_model_name, num_labels, dropout_rate):
        super(BertForFinalNER, self).__init__()
        self.bert = AutoModel.from_pretrained(pretrained_model_name, output_hidden_states=True)
        self.num_hidden_layers = self.bert.config.num_hidden_layers + 1
        self.layer_weights = nn.Parameter(torch.ones(self.num_hidden_layers))
        self.dropout = nn.Dropout(dropout_rate)
        self.classifier = nn.Linear(self.bert.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        hidden_states = torch.stack(outputs.hidden_states, dim=0)
        weighted_hidden_states = torch.sum(self.layer_weights[:, None, None, None] * hidden_states, dim=0)
        sequence_output = self.dropout(weighted_hidden_states)
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            active_loss = attention_mask.view(-1) == 1
            active_logits = logits.view(-1, logits.size(-1))[active_loss]
            active_labels = labels.view(-1)[active_loss]
            loss = loss_fn(active_logits, active_labels)

        return {"loss": loss, "logits": logits}

In [None]:
# Specify hyperparameters directly
learning_rate = 4.369484270668493e-05
dropout_rate = 0.33027803609683487
batch_size = 64

In [None]:
# Instantiate the model with specified dropout rate
model = BertForFinalNER(pretrained_model_name="bert-base-cased", num_labels=num_labels, dropout_rate=dropout_rate).to(
    device
)

# Define the optimizer with the specified learning rate
optimizer = AdamW(model.parameters(), lr=learning_rate)

# Prepare data loaders with the specified batch size
train_loader = DataLoader(tokenized_datasets["train"], batch_size=batch_size, collate_fn=collate_fn)
val_loader = DataLoader(tokenized_datasets["validation"], batch_size=batch_size, collate_fn=collate_fn)

# Track loss values
train_losses = []
val_losses = []

# Training and validation loops
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    for batch in tqdm(train_loader, desc=f"Training Epoch {epoch + 1}"):
        optimizer.zero_grad()
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs["loss"]
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()

    avg_train_loss = total_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    print(f"Epoch {epoch + 1} Training Loss: {avg_train_loss:.4f}")

    # Validation loop
    model.eval()
    total_val_loss = 0
    predictions, true_labels = [], []
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs["loss"]
            total_val_loss += loss.item()

            logits = outputs["logits"]
            preds = torch.argmax(logits, dim=-1)

            for pred, label in zip(preds.cpu().numpy(), labels.cpu().numpy()):
                predictions.append([label_list[p] for p, l in zip(pred, label) if l != -100])
                true_labels.append([label_list[l] for p, l in zip(pred, label) if l != -100])

    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    print(f"Epoch {epoch + 1} Validation Loss: {avg_val_loss:.4f}")

In [None]:
# Compute evaluation metrics
flattened_preds = [label for seq in predictions for label in seq]
flattened_labels = [label for seq in true_labels for label in seq]

accuracy = accuracy_score(flattened_labels, flattened_preds)
f1 = f1_score(flattened_labels, flattened_preds, average="weighted")

# Print the complete classification report
print("Classification Report:")
print(classification_report(flattened_labels, flattened_preds, target_names=label_list))

# Print accuracy and F1 score
print(f"Validation Accuracy: {accuracy:.4f}")
print(f"Validation F1 Score: {f1:.4f}")

# Plot training and validation loss curves
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(train_losses) + 1), train_losses, label="Training Loss")
plt.plot(range(1, len(val_losses) + 1), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Training and Validation Loss Curves")
plt.legend()
plt.grid()
plt.savefig("results/bert/loss_curves.png")
plt.show()

## Test Set Evaluation

In [None]:
test_dataset = NERDataset(dataset["test"], tokenizer)
test_loader = DataLoader(test_dataset, batch_size=64)

model.eval()
test_predictions, test_true_labels = [], []
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        logits = outputs["logits"]
        preds = torch.argmax(logits, dim=-1)

        for pred, label in zip(preds.cpu().numpy(), labels.cpu().numpy()):
            test_predictions.append([label_list[p] for p, l in zip(pred, label) if l != -100])
            test_true_labels.append([label_list[l] for p, l in zip(pred, label) if l != -100])

flattened_test_preds = [label for seq in test_predictions for label in seq]
flattened_test_labels = [label for seq in test_true_labels for label in seq]

test_accuracy = accuracy_score(flattened_test_labels, flattened_test_preds)
test_f1 = f1_score(flattened_test_labels, flattened_test_preds, average="weighted")

print("Test Set Classification Report:")
print(classification_report(flattened_test_labels, flattened_test_preds, target_names=label_list))

print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")