In [1]:

# hide
from transformers import AutoTokenizer

# Define model checkpoint
model_ckpt = "bert-base-uncased"

In [14]:
# Initialize tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)

In [15]:
# Load configuration
config = AutoConfig.from_pretrained(model_ckpt)

In [16]:
# Token Embedding layer
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)

## Multi-headed Attention

In [18]:
import torch
from math import sqrt 
import torch.nn.functional as F

# Define scaled dot-product attention function
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

In [19]:
# Define AttentionHead class
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

In [20]:
# Define MultiHeadAttention class
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

In [21]:
# Load BERT model with attentions
model = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

In [22]:
# Define sentences
sentence_a = 'I love programming'
sentence_b = 'PRON VERB NOUN'

In [23]:
# Tokenize inputs for the model
viz_inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt')
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs['token_type_ids'] == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs['input_ids'][0])

In [24]:
# Visualize attention heads
head_view(attention, tokens, sentence_b_start, heads=[8])

<IPython.core.display.Javascript object>

In [25]:
# Define FeedForward class
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [26]:
# Embedding inputs using BERT model
inputs_embeds = model.embeddings(viz_inputs['input_ids'])

# Perform scaled dot-product attention
query = key = value = inputs_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
weights = F.softmax(scores, dim=-1)
attn_outputs = torch.bmm(weights, value)

# Print shapes
print(f'Scores shape: {scores.size()}')
print(f'Weights sum (dim -1): {weights.sum(dim=-1)}')
print(f'Attention outputs shape: {attn_outputs.shape}')

Scores shape: torch.Size([1, 10, 10])
Weights sum (dim -1): tensor([[1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000]], grad_fn=<SumBackward1>)
Attention outputs shape: torch.Size([1, 10, 768])


In [27]:
# FeedForward pass
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_outputs)
print(f'FeedForward outputs shape: {ff_outputs.size()}')

FeedForward outputs shape: torch.Size([1, 10, 768])


In [30]:
import pandas as pd
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from math import sqrt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from transformers import BertTokenizerFast, TFBertModel, AutoConfig, AutoModel
from bertviz import head_view
import nltk
import string

# Download stopwords
nltk.download('stopwords')
stop_words = set(nltk.corpus.stopwords.words('english'))
punctuation = set(string.punctuation)

# Sample DataFrame creation
data = {
    'sentence': ['I love programming', 'Keras is great'],
    'tag': ['PRON VERB NOUN', 'PROPN AUX ADJ']
}
df = pd.DataFrame(data)

# Tokenize sentences and align tags
tokenizer = BertTokenizerFast.from_pretrained('bert-base-uncased')

def tokenize_and_align_labels(sentences, tags, tokenizer, max_length=128):
    tokenized_inputs = tokenizer(sentences, padding='max_length', truncation=True, return_tensors='pt')
    labels = []
    for i, label in enumerate(tags):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            else:
                label_ids.append(-100)
            previous_word_idx = word_idx
        labels.append(label_ids)
    return tokenized_inputs, labels

# Prepare sentences and tags
sentences = df['sentence'].tolist()
tags = [nltk.pos_tag(sentence.split()) for sentence in sentences]
tag_encoder = LabelEncoder()
tags = [tag_encoder.fit_transform([t[1] for t in sentence]) for sentence in tags]

# Tokenize sentences and align tags
tokenized_inputs, labels = tokenize_and_align_labels(sentences, tags, tokenizer)
labels = torch.tensor(labels)

# Split the data into training and test sets
train_inputs, val_inputs, train_labels, val_labels = train_test_split(tokenized_inputs['input_ids'], labels, test_size=0.1, random_state=42)
train_masks, val_masks, _, _ = train_test_split(tokenized_inputs['attention_mask'], labels, test_size=0.1, random_state=42)


[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/linghuang/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [31]:
# Define scaled dot-product attention function
def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

# Define AttentionHead class
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state), self.k(hidden_state), self.v(hidden_state))
        return attn_outputs

# Define MultiHeadAttention class
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

# Define FeedForward class
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

# Define the POS tagging model
class PosTaggingModel(nn.Module):
    def __init__(self, bert_model, config, num_labels):
        super(PosTaggingModel, self).__init__()
        self.bert = bert_model
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        self.classifier = nn.Linear(config.hidden_size, num_labels)
        
    def forward(self, input_ids, attention_mask):
        bert_outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        sequence_output = bert_outputs.last_hidden_state
        attention_output = self.attention(sequence_output)
        ff_output = self.feed_forward(attention_output)
        logits = self.classifier(ff_output)
        return logits


In [32]:
# Load BERT model
config = AutoConfig.from_pretrained('bert-base-uncased')
bert_model = AutoModel.from_pretrained('bert-base-uncased', config=config)

# Define number of labels
num_labels = len(tag_encoder.classes_)

# Initialize model
model = PosTaggingModel(bert_model, config, num_labels)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# Define optimizer and loss function
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
loss_fn = nn.CrossEntropyLoss()

# Training loop
def train(model, train_inputs, train_masks, train_labels, val_inputs, val_masks, val_labels, epochs=3):
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(train_inputs.to(device), train_masks.to(device))
        loss = loss_fn(outputs.view(-1, num_labels), train_labels.view(-1).to(device))
        loss.backward()
        optimizer.step()

        model.eval()
        with torch.no_grad():
            outputs = model(val_inputs.to(device), val_masks.to(device))
            val_loss = loss_fn(outputs.view(-1, num_labels), val_labels.view(-1).to(device))

        print(f'Epoch: {epoch+1}, Loss: {loss.item()}, Val Loss: {val_loss.item()}')

# Train the model
train(model, train_inputs, train_masks, train_labels, val_inputs, val_masks, val_labels, epochs=3)

# Evaluation
model.eval()
with torch.no_grad():
    outputs = model(val_inputs.to(device), val_masks.to(device))
    preds = torch.argmax(outputs, dim=2).cpu().numpy()

# Flatten predictions and labels
pred_labels = preds.flatten()
true_labels = val_labels.cpu().numpy().flatten()

# Filter out -100 values
pred_labels = pred_labels[true_labels != -100]
true_labels = true_labels[true_labels != -100]

# Print classification report
print(classification_report(true_labels, pred_labels, target_names=tag_encoder.classes_))


Epoch: 1, Loss: 1.0994853973388672, Val Loss: 1.098605751991272
Epoch: 2, Loss: 1.100171446800232, Val Loss: 1.098650336265564
Epoch: 3, Loss: 1.0938831567764282, Val Loss: 1.0986303091049194
              precision    recall  f1-score   support

          JJ       0.00      0.00      0.00         1
         NNP       0.33      1.00      0.50         1
         VBZ       0.00      0.00      0.00         1

    accuracy                           0.33         3
   macro avg       0.11      0.33      0.17         3
weighted avg       0.11      0.33      0.17         3



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
