In [1]:
import sys
print(sys.executable)

d:\hcmus\HK4\MathAI\final\code\.my-env\Scripts\python.exe


In [None]:
!{sys.executable} -m pip install torch transformers

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import numpy as np
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
import random

  from .autonotebook import tqdm as notebook_tqdm


In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, n_heads, dropout=0.1):
        super().__init__()
        assert d_model % n_heads == 0
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            scores.masked_fill_(mask == 0, -1e9)
            
        attention_weights = F.softmax(scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        output = torch.matmul(attention_weights, V)
        return output, attention_weights
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)
        seq_len = query.size(1)
        
        # Linear transformations
        Q = self.w_q(query)
        K = self.w_k(key)
        V = self.w_v(value)
        
        # Reshape for multi-head attention
        Q = Q.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.n_heads, self.d_k).transpose(1, 2)
        
        # Apply attention
        attention_output, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Concatenate heads
        attention_output = attention_output.transpose(1, 2).contiguous().view(
            batch_size, seq_len, self.d_model
        )
        
        # Final linear layer
        output = self.w_o(attention_output)
        return output

class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super().__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        return self.linear2(self.dropout(F.gelu(self.linear1(x))))

class TransformerBlock(nn.Module):
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attention = MultiHeadAttention(d_model, n_heads, dropout)
        self.feed_forward = FeedForward(d_model, d_ff, dropout)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask=None):
        # Self-attention with residual connection
        attn_output = self.attention(x, x, x, mask)
        x = self.layer_norm1(x + self.dropout(attn_output))
        
        # Feed-forward with residual connection
        ff_output = self.feed_forward(x)
        x = self.layer_norm2(x + self.dropout(ff_output))
        
        return x

class BertEmbeddings(nn.Module):
    def __init__(self, vocab_size, d_model, max_position_embeddings=512, 
                 type_vocab_size=2, dropout=0.1):
        super().__init__()
        self.word_embeddings = nn.Embedding(vocab_size, d_model, padding_idx=0)
        self.position_embeddings = nn.Embedding(max_position_embeddings, d_model)
        self.token_type_embeddings = nn.Embedding(type_vocab_size, d_model)
        
        self.layer_norm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, input_ids, token_type_ids=None, position_ids=None):
        seq_length = input_ids.size(1)
        
        if position_ids is None:
            position_ids = torch.arange(seq_length, dtype=torch.long, 
                                      device=input_ids.device)
            position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
            
        if token_type_ids is None:
            token_type_ids = torch.zeros_like(input_ids)
            
        words_embeddings = self.word_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        token_type_embeddings = self.token_type_embeddings(token_type_ids)
        
        embeddings = words_embeddings + position_embeddings + token_type_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        
        return embeddings

class BertEncoder(nn.Module):
    def __init__(self, n_layers, d_model, n_heads, d_ff, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, n_heads, d_ff, dropout)
            for _ in range(n_layers)
        ])
        
    def forward(self, x, attention_mask=None):
        for layer in self.layers:
            x = layer(x, attention_mask)
        return x

class BertPooler(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.dense = nn.Linear(d_model, d_model)
        self.activation = nn.Tanh()
        
    def forward(self, hidden_states):
        # Pool the [CLS] token representation
        first_token_tensor = hidden_states[:, 0]
        pooled_output = self.dense(first_token_tensor)
        pooled_output = self.activation(pooled_output)
        return pooled_output

class BertForMaskedLM(nn.Module):
    def __init__(self, vocab_size, d_model):
        super().__init__()
        self.dense = nn.Linear(d_model, d_model)
        self.layer_norm = nn.LayerNorm(d_model)
        self.decoder = nn.Linear(d_model, vocab_size)
        
    def forward(self, hidden_states):
        hidden_states = self.dense(hidden_states)
        hidden_states = F.gelu(hidden_states)
        hidden_states = self.layer_norm(hidden_states)
        hidden_states = self.decoder(hidden_states)
        return hidden_states

class BertForNextSentencePrediction(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.seq_relationship = nn.Linear(d_model, 2)
        
    def forward(self, pooled_output):
        seq_relationship_score = self.seq_relationship(pooled_output)
        return seq_relationship_score

class BERT(nn.Module):
    def __init__(self, vocab_size, d_model=768, n_layers=12, n_heads=12, 
                 d_ff=3072, max_position_embeddings=512, type_vocab_size=2, 
                 dropout=0.1):
        super().__init__()
        
        self.embeddings = BertEmbeddings(
            vocab_size=vocab_size,
            d_model=d_model,
            max_position_embeddings=max_position_embeddings,
            type_vocab_size=type_vocab_size,
            dropout=dropout
        )
        
        self.encoder = BertEncoder(
            n_layers=n_layers,
            d_model=d_model,
            n_heads=n_heads,
            d_ff=d_ff,
            dropout=dropout
        )
        
        self.pooler = BertPooler(d_model)
        
        # Pre-training heads
        self.cls = BertForMaskedLM(vocab_size, d_model)
        self.nsp = BertForNextSentencePrediction(d_model)
        
        self.apply(self._init_weights)
        
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=0.02)
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)
    
    def forward(self, input_ids, token_type_ids=None, attention_mask=None, 
                masked_lm_labels=None, next_sentence_label=None):
        
        # Create attention mask if not provided
        if attention_mask is None:
            attention_mask = (input_ids != 0).unsqueeze(1).unsqueeze(2)
        else:
            attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
        
        # Embeddings
        embedding_output = self.embeddings(input_ids, token_type_ids)
        
        # Encoder
        encoder_output = self.encoder(embedding_output, attention_mask)
        
        # Pooler for [CLS] token
        pooled_output = self.pooler(encoder_output)
        
        # Pre-training outputs
        prediction_scores = self.cls(encoder_output)
        seq_relationship_score = self.nsp(pooled_output)
        
        outputs = {
            'last_hidden_state': encoder_output,
            'pooler_output': pooled_output,
            'prediction_logits': prediction_scores,
            'seq_relationship_logits': seq_relationship_score
        }
        
        # Calculate losses if labels provided
        total_loss = 0
        if masked_lm_labels is not None:
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            masked_lm_loss = loss_fct(
                prediction_scores.view(-1, prediction_scores.size(-1)),
                masked_lm_labels.view(-1)
            )
            total_loss += masked_lm_loss
            outputs['masked_lm_loss'] = masked_lm_loss
            
        if next_sentence_label is not None:
            loss_fct = nn.CrossEntropyLoss()
            next_sentence_loss = loss_fct(
                seq_relationship_score.view(-1, 2),
                next_sentence_label.view(-1)
            )
            total_loss += next_sentence_loss
            outputs['next_sentence_loss'] = next_sentence_loss
            
        if total_loss > 0:
            outputs['loss'] = total_loss
            
        return outputs

# Dataset class for BERT pre-training
class BertPretrainingDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=512, mlm_probability=0.15):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.mlm_probability = mlm_probability
        
    def __len__(self):
        return len(self.texts)
    
    def create_masked_lm_predictions(self, tokens):
        """Create masked language model predictions"""
        output_tokens = tokens.copy()
        output_labels = [-100] * len(tokens)
        
        for i in range(len(tokens)):
            if random.random() < self.mlm_probability:
                prob = random.random()
                if prob < 0.8:
                    # 80% replace with [MASK]
                    output_tokens[i] = self.tokenizer.mask_token_id
                elif prob < 0.9:
                    # 10% replace with random token
                    output_tokens[i] = random.randint(1, self.tokenizer.vocab_size - 1)
                # 10% keep original
                
                output_labels[i] = tokens[i]
                
        return output_tokens, output_labels
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        
        # Tokenize
        tokens = self.tokenizer.encode(text, add_special_tokens=True, 
                                     max_length=self.max_length, 
                                     truncation=True, padding='max_length')
        
        # Create masked LM predictions
        masked_tokens, mlm_labels = self.create_masked_lm_predictions(tokens)
        
        return {
            'input_ids': torch.tensor(masked_tokens, dtype=torch.long),
            'attention_mask': torch.tensor([1 if t != 0 else 0 for t in tokens], 
                                         dtype=torch.long),
            'masked_lm_labels': torch.tensor(mlm_labels, dtype=torch.long),
            'next_sentence_label': torch.tensor(0, dtype=torch.long)  # Simplified
        }

# Training function
def train_bert(model, dataloader, optimizer, device, epochs=1):
    model.train()
    total_loss = 0
    
    for epoch in range(epochs):
        for batch_idx, batch in enumerate(dataloader):
            # Move to device
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            masked_lm_labels = batch['masked_lm_labels'].to(device)
            next_sentence_labels = batch['next_sentence_label'].to(device)
            
            # Forward pass
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                masked_lm_labels=masked_lm_labels,
                next_sentence_label=next_sentence_labels
            )
            
            loss = outputs['loss']
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            
            total_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    return total_loss / len(dataloader)

# Example usage for fine-tuning on classification
class BertForSequenceClassification(nn.Module):
    def __init__(self, bert_model, num_labels):
        super().__init__()
        self.bert = bert_model
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(bert_model.pooler.dense.out_features, num_labels)
        
    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled_output = outputs['pooler_output']
        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)
        
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, logits.size(-1)), labels.view(-1))
            return {'loss': loss, 'logits': logits}
        
        return {'logits': logits}

# Initialize model
def create_bert_model(vocab_size=30522):
    model = BERT(
        vocab_size=vocab_size,
        d_model=768,
        n_layers=12,
        n_heads=12,
        d_ff=3072,
        max_position_embeddings=512,
        type_vocab_size=2,
        dropout=0.1
    )
    return model

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


Starting BERT training...
Epoch 0, Batch 0, Loss: 11.5031
Average training loss: 11.5031
BERT model created successfully!
Total parameters: 133,547,324


In [3]:
if __name__ == "__main__":
    # Initialize tokenizer and model
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = create_bert_model(vocab_size=tokenizer.vocab_size)
    
    # Example texts for training
    texts = [
        "The quick brown fox jumps over the lazy dog.",
        "BERT is a transformer-based machine learning technique.",
        "Natural language processing is a subfield of AI.",
        # Add more texts for actual training
        
    ]
    
    # Create dataset and dataloader
    dataset = BertPretrainingDataset(texts, tokenizer)
    dataloader = DataLoader(dataset, batch_size=4, shuffle=True)
    
    # Setup training
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    
    # Train model
    print("Starting BERT training...")
    avg_loss = train_bert(model, dataloader, optimizer, device, epochs=1)
    print(f"Average training loss: {avg_loss:.4f}")
    
    # Example: Create classification model
    classification_model = BertForSequenceClassification(model, num_labels=2)
    print("BERT model created successfully!")
    print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

NameError: name 'BertTokenizer' is not defined