In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset
from sklearn.metrics import accuracy_score
import random
import numpy as np
from transformers import BertTokenizer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# BERT (Bidirectional Encoder Representations from Transformers)

BERT is a language model introduced in October 2018 by researchers at Google. It learns to represent text as a sequence of vectors using self-supervised learning. It uses the encoder-only transformer architecture. It is notable for its dramatic improvement over previous state-of-the-art models, and as an early example of a large language model. As of 2020, BERT is a ubiquitous baseline in natural language processing (NLP) experiments.

Key hightlights for BERT include:
- Only encoder layers are used (no decoder).
- For pretraining tasks, we have
  - Masked Language Modeling (MLM) – Predict masked tokens.
  - Next Sentence Prediction (NSP) – Classify whether two sentences follow each other.

The key innovation is its truly bidirectional nature, allowing it to consider context from both directions simultaneously during pre-training.

## Model Variants

- BERT-Base: 12 transformer layers, 12 attention heads, 768 hidden dimensions (110M parameters)
- BERT-Large: 24 transformer layers, 16 attention heads, 1024 hidden dimensions (340M parameters)



## Model architecture


In [2]:
# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = 10000 ** (-torch.arange(0, d_model, 2, dtype=torch.float) / d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :].to(x.device)  # No addition here!

# Multi-Head Attention
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.fc = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        batch_size = q.shape[0]

        # batch, seq_len, d_model -> batch, seq_len, num_heads, d_k
        q = self.w_q(q).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        k = self.w_k(k).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        v = self.w_v(v).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(q, k.transpose(-2, -1)) / (self.d_k ** 0.5)
        
        if mask is not None:
            mask = mask.unsqueeze(1).unsqueeze(2)  # Change shape to (batch_size, 1, 1, seq_len)
            scores = scores.masked_fill(mask == 0, float('-inf'))
        attn = torch.softmax(scores, dim=-1)

        output = torch.matmul(attn, v).transpose(1, 2).contiguous().view(batch_size, -1, self.d_k * self.num_heads)
        return self.fc(output)


# Feed Forward Layer
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        return self.fc2(F.relu(self.fc1(x)))


# Encoder Layer
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        attn_out = self.attention(x, x, x, mask)
        x = self.norm1(x + attn_out)
        ff_out = self.ff(x)
        return self.norm2(x + ff_out)


# BERT Model
class BERTModel(nn.Module):
    def __init__(self, vocab_size, d_model=128, num_heads=4, num_layers=2, d_ff=256, max_len=5000):
        super().__init__()
        self.word_embedding = nn.Embedding(vocab_size, d_model)
        self.segment_embedding = nn.Embedding(2, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
        self.mlm_head = nn.Linear(d_model, vocab_size)
        self.nsp_head = nn.Linear(d_model, 2)

    def forward(self, x, segment_ids, attention_mask):
        x = self.word_embedding(x) + self.segment_embedding(segment_ids)
        x = self.pos_encoding(x)
        for layer in self.encoder_layers:
            x = layer(x, attention_mask)
        x = self.norm(x)
        cls_token_output = x[:, 0, :]
        return self.mlm_head(x), self.nsp_head(cls_token_output)


In [3]:
base_bert_model = BERTModel(vocab_size=30000, d_model=768, num_heads=12, num_layers=12, d_ff=3072)
total_params = sum(p.numel() for p in base_bert_model.parameters())
print(f"Number of parameters: {total_params/1000000}M")

Number of parameters: 131.169074M


In [4]:
large_bert_model = BERTModel(vocab_size=30000, d_model=1024, num_heads=16, num_layers=24, d_ff=4096)

total_params = sum(p.numel() for p in large_bert_model.parameters())
print(f"Number of parameters: {total_params/1000000}M")

Number of parameters: 363.785522M


## Pretraining

In [5]:
def prepare_pretrain_data(sentence_pairs, tokenizer, seq_len=32, mask_prob=0.15):
    input_ids, segment_ids, attention_masks, mlm_labels, nsp_labels = [], [], [], [], []

    for pair in sentence_pairs:
        sentence1 = tokenizer.encode(pair["sentence1"], add_special_tokens=False)
        sentence2 = tokenizer.encode(pair["sentence2"], add_special_tokens=False)

        # Add [CLS] and [SEP] tokens
        tokens = [101] + sentence1 + [102] + sentence2 + [102]
        segment_id = [0] * (len(sentence1) + 2) + [1] * (len(sentence2) + 1)

        # Pad sequences to fixed length
        padding_length = seq_len - len(tokens)
        tokens += [0] * padding_length
        segment_id += [0] * padding_length

        # Prepare MLM labels (default -100 means no prediction)
        mlm_labels_example = [-100] * seq_len  

        # Refer to section 3.1 of the paper - Pre-training BERT for masking details
        # Select 15% of tokens to be masked
        candidate_indexes = [i for i in range(1, len(tokens) - 1) if tokens[i] != 0]  # Exclude CLS and SEP
        num_to_mask = max(1, int(len(candidate_indexes) * mask_prob))
        mask_indexes = random.sample(candidate_indexes, num_to_mask)

        for idx in mask_indexes:
            prob = random.random()

            if prob < 0.8:  # 80% replace with [MASK]
                mlm_labels_example[idx] = tokens[idx]
                tokens[idx] = 103  # [MASK] token
            elif prob < 0.9:  # 10% replace with a random token
                mlm_labels_example[idx] = tokens[idx]
                tokens[idx] = random.randint(1, tokenizer.vocab_size - 1)
            else:  # 10% keep unchanged
                mlm_labels_example[idx] = tokens[idx]

        # Prepare attention mask
        attention_mask = [1 if token != 0 else 0 for token in tokens]

        # Append processed data
        input_ids.append(torch.tensor(tokens))
        segment_ids.append(torch.tensor(segment_id))
        attention_masks.append(torch.tensor(attention_mask))
        mlm_labels.append(torch.tensor(mlm_labels_example))
        nsp_labels.append(torch.tensor(pair["is_next"]))

    return {
        'input_ids': torch.stack(input_ids),
        'segment_ids': torch.stack(segment_ids),
        'attention_masks': torch.stack(attention_masks),
        'mlm_labels': torch.stack(mlm_labels),
        'nsp_labels': torch.tensor(nsp_labels)
    }

In [6]:
def pretrain_bert(model, dataloader, optimizer, criterion_mlm, criterion_nsp, epochs=5):
    model.train()

    for epoch in range(epochs):
        total_loss = 0.0

        for batch in dataloader:
            input_ids, segment_ids, attention_masks, mlm_labels, nsp_labels = [b.to(device) for b in batch]

            optimizer.zero_grad()

            # Forward pass
            mlm_outputs, nsp_outputs = model(input_ids, segment_ids, attention_masks)

            # Compute loss
            mlm_loss = criterion_mlm(mlm_outputs.view(-1, tokenizer.vocab_size), mlm_labels.view(-1))
            nsp_loss = criterion_nsp(nsp_outputs, nsp_labels)

            loss = mlm_loss + nsp_loss  # Combined loss

            # Backward pass
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}")

    return model

In [7]:
# Define sentence pairs
sentence_pairs = [
    {"sentence1": "The weather is nice today.", "sentence2": "Let's go for a walk.", "is_next": 1},
    {"sentence1": "He went to the market.", "sentence2": "The sky looks cloudy.", "is_next": 0},
    {"sentence1": "I enjoy reading books.", "sentence2": "I love science fiction.", "is_next": 1},
    {"sentence1": "She prepared a delicious meal.", "sentence2": "The guests were impressed.", "is_next": 1},
    {"sentence1": "He studies very hard.", "sentence2": "The exams are approaching.", "is_next": 1},
    {"sentence1": "My dog is very playful.", "sentence2": "I need to buy some groceries.", "is_next": 0},
    {"sentence1": "The concert was amazing.", "sentence2": "We had a great time.", "is_next": 1},
    {"sentence1": "The coffee machine is broken.", "sentence2": "I need to call a technician.", "is_next": 1},
    {"sentence1": "The restaurant is famous.", "sentence2": "The service was terrible.", "is_next": 0},
    {"sentence1": "He likes to play soccer.", "sentence2": "His team won the championship.", "is_next": 1},
]

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Load pretraining data
pretrain_data = prepare_pretrain_data(sentence_pairs, tokenizer)



# Create DataLoader
batch_size = 2
pretrain_dataset = TensorDataset(pretrain_data["input_ids"], pretrain_data["segment_ids"],
                                 pretrain_data["attention_masks"], pretrain_data["mlm_labels"],
                                 pretrain_data["nsp_labels"])
pretrain_loader = DataLoader(pretrain_dataset, batch_size=batch_size, shuffle=True)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
vocab_size = tokenizer.vocab_size

# Initialize model and optimizer
bert_model = BERTModel(vocab_size).to(device)
optimizer = optim.Adam(bert_model.parameters(), lr=5e-5)
criterion_mlm = nn.CrossEntropyLoss(ignore_index=-100)  # Ignore padding for MLM
criterion_nsp = nn.CrossEntropyLoss()

# Start pretraining
pretrained_bert_model = pretrain_bert(bert_model, pretrain_loader, optimizer, criterion_mlm, criterion_nsp, epochs=30)

Epoch [1/30], Loss: 10.8917
Epoch [2/30], Loss: 10.7370
Epoch [3/30], Loss: 10.6251
Epoch [4/30], Loss: 10.4796
Epoch [5/30], Loss: 10.3927
Epoch [6/30], Loss: 10.2092
Epoch [7/30], Loss: 10.1402
Epoch [8/30], Loss: 9.9670
Epoch [9/30], Loss: 9.9057
Epoch [10/30], Loss: 9.7889
Epoch [11/30], Loss: 9.7430
Epoch [12/30], Loss: 9.5793
Epoch [13/30], Loss: 9.4266
Epoch [14/30], Loss: 9.3539
Epoch [15/30], Loss: 9.2606
Epoch [16/30], Loss: 9.1283
Epoch [17/30], Loss: 9.0709
Epoch [18/30], Loss: 9.0162
Epoch [19/30], Loss: 8.8572
Epoch [20/30], Loss: 8.7454
Epoch [21/30], Loss: 8.6200
Epoch [22/30], Loss: 8.6164
Epoch [23/30], Loss: 8.4726
Epoch [24/30], Loss: 8.4096
Epoch [25/30], Loss: 8.2672
Epoch [26/30], Loss: 8.2396
Epoch [27/30], Loss: 8.1575
Epoch [28/30], Loss: 8.0223
Epoch [29/30], Loss: 7.9591
Epoch [30/30], Loss: 7.9420


# Fine-tuning

In [8]:
pretrained_bert_model.nsp_head = nn.Linear(128, 2)
pretrained_bert_model.to(device)

BERTModel(
  (word_embedding): Embedding(30522, 128)
  (segment_embedding): Embedding(2, 128)
  (pos_encoding): PositionalEncoding()
  (encoder_layers): ModuleList(
    (0-1): 2 x EncoderLayer(
      (attention): MultiHeadAttention(
        (w_q): Linear(in_features=128, out_features=128, bias=True)
        (w_k): Linear(in_features=128, out_features=128, bias=True)
        (w_v): Linear(in_features=128, out_features=128, bias=True)
        (fc): Linear(in_features=128, out_features=128, bias=True)
      )
      (ff): FeedForward(
        (fc1): Linear(in_features=128, out_features=256, bias=True)
        (fc2): Linear(in_features=256, out_features=128, bias=True)
      )
      (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    )
  )
  (norm): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
  (mlm_head): Linear(in_features=128, out_features=30522, bias=True)
  (nsp_head): Linear(in_features=128, 

In [9]:
# Prepare fine-tuning data
fine_tune_data = [
    {"text": "I love this movie!", "label": 1,},
    {"text": "This product is terrible.", "label": 0,},
    {"text": "The food was fantastic.", "label": 1,},
    {"text": "It was a boring experience.", "label": 0,},
]

# Prepare fine-tuning dataset
def prepare_finetune_data(fine_tune_data, tokenizer, seq_len=32):
    input_ids, attention_masks, segments, labels = [], [], [], []
    for item in fine_tune_data:
        tokens = tokenizer.encode(item["text"], max_length=seq_len, padding="max_length", truncation=True)
        input_ids.append(torch.tensor(tokens))
        attention_masks.append(torch.tensor([1 if token != 0 else 0 for token in tokens]))
        segments.append(torch.tensor([0 for _ in range(seq_len)]))
        labels.append(torch.tensor(item["label"]))
    return {
        'input_ids': torch.stack(input_ids),
        'attention_masks': torch.stack(attention_masks),
        'segments': torch.stack(segments), 
        'labels': torch.tensor(labels)
    }

def finetune_model(model, dataloader, optimizer, criterion, epochs=3):
    model.train()
    for epoch in range(epochs):
        total_loss = 0.0

        for batch in dataloader:
            input_ids, attention_masks, segments, labels = [b.to(device) for b in batch]

            optimizer.zero_grad()

            # Forward pass
            _, logits = model(input_ids, segments, attention_masks)

            # Compute loss
            loss = criterion(logits, labels)

            # Backward pass
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss / len(dataloader)
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {avg_loss:.4f}")

    return model




In [10]:
fine_tune_dataset = prepare_finetune_data(fine_tune_data, tokenizer)

fine_tune_dataloader = DataLoader(
    TensorDataset(
        fine_tune_dataset['input_ids'],
        fine_tune_dataset['attention_masks'],
        fine_tune_dataset['segments'],
        fine_tune_dataset['labels']
    ),
    batch_size=16,
    shuffle=True
)

optimizer = optim.Adam(pretrained_bert_model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

finetuned_model = finetune_model(pretrained_bert_model, fine_tune_dataloader, optimizer, criterion, epochs=200)

Epoch [1/200], Loss: 0.6980
Epoch [2/200], Loss: 0.6943
Epoch [3/200], Loss: 0.6911
Epoch [4/200], Loss: 0.6882
Epoch [5/200], Loss: 0.6856
Epoch [6/200], Loss: 0.6831
Epoch [7/200], Loss: 0.6807
Epoch [8/200], Loss: 0.6784
Epoch [9/200], Loss: 0.6760
Epoch [10/200], Loss: 0.6736
Epoch [11/200], Loss: 0.6711
Epoch [12/200], Loss: 0.6685
Epoch [13/200], Loss: 0.6660
Epoch [14/200], Loss: 0.6634
Epoch [15/200], Loss: 0.6609
Epoch [16/200], Loss: 0.6584
Epoch [17/200], Loss: 0.6559
Epoch [18/200], Loss: 0.6533
Epoch [19/200], Loss: 0.6508
Epoch [20/200], Loss: 0.6482
Epoch [21/200], Loss: 0.6456
Epoch [22/200], Loss: 0.6429
Epoch [23/200], Loss: 0.6402
Epoch [24/200], Loss: 0.6374
Epoch [25/200], Loss: 0.6346
Epoch [26/200], Loss: 0.6318
Epoch [27/200], Loss: 0.6289
Epoch [28/200], Loss: 0.6260
Epoch [29/200], Loss: 0.6231
Epoch [30/200], Loss: 0.6201
Epoch [31/200], Loss: 0.6171
Epoch [32/200], Loss: 0.6141
Epoch [33/200], Loss: 0.6109
Epoch [34/200], Loss: 0.6078
Epoch [35/200], Loss: 0

## Evaluate

In [11]:
# Evaluate model
def evaluate_model(model, dataloader):
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for batch in dataloader:
            # input_ids, attention_masks, labels = batch
            input_ids, attention_masks, segments, labels = [b.to(device) for b in batch]
            
            _, logits = model(input_ids, segments, attention_masks)
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    prec = precision_score(all_labels, all_preds)
    rec = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    print(f"Evaluation Results:\nAccuracy: {acc:.4f}\nPrecision: {prec:.4f}\nRecall: {rec:.4f}\nF1 Score: {f1:.4f}")

In [12]:
evaluate_data = [
    {"text": "I love this film!", "label": 1,},
    {"text": "This product is bad.", "label": 0,},
    {"text": "The food was excellent.", "label": 1,},
    {"text": "It was a dull experience.", "label": 0,},

    # {"text": "I love this movie!", "label": 1,},
    # {"text": "This product is terrible.", "label": 0,},
    # {"text": "The food was fantastic.", "label": 1,},
    # {"text": "It was a boring experience.", "label": 0,},
]

evaluate_dataset = prepare_finetune_data(evaluate_data, tokenizer)

evaluate_dataloader = DataLoader(
    TensorDataset(
        evaluate_dataset['input_ids'],
        evaluate_dataset['attention_masks'],
        evaluate_dataset['segments'],
        evaluate_dataset['labels']
    ),
    batch_size=2,
    shuffle=True
)

evaluate_model(finetuned_model, evaluate_dataloader)

Evaluation Results:
Accuracy: 1.0000
Precision: 1.0000
Recall: 1.0000
F1 Score: 1.0000
