In [1]:
!pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
!pip install scikit-learn
!pip install datasets

Looking in indexes: https://download.pytorch.org/whl/cu121



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
exit()

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import math
import numpy as np
from tqdm import tqdm
import random
import os
from datasets import load_dataset

In [2]:
# Custom Transformer implementation
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=5000):
        super().__init__()
        
        # Create positional encoding matrix
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        
        # Register as buffer (not a parameter but part of the module)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class CustomTransformerModel(nn.Module):
    def __init__(self, vocab_size, d_model=256, nhead=8, num_encoder_layers=6, 
                 num_decoder_layers=6, dim_feedforward=1024, dropout=0.1):
        super().__init__()
        
        # Embedding layers
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model)
        
        # Transformer architecture
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, 
                                                  dim_feedforward=dim_feedforward, 
                                                  dropout=dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_encoder_layers)
        
        decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead,
                                                  dim_feedforward=dim_feedforward,
                                                  dropout=dropout, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)
        
        # Output layer
        self.output_layer = nn.Linear(d_model, vocab_size)
        
        self.d_model = d_model
        self.vocab_size = vocab_size
        
    def create_mask(self, src, tgt):
        src_seq_len = src.shape[1]
        tgt_seq_len = tgt.shape[1]
        
        # Create masks
        src_mask = torch.zeros((src_seq_len, src_seq_len), dtype=torch.bool, device=src.device)
        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len).to(tgt.device)
        
        # Create padding masks
        src_padding_mask = (src == 0)
        tgt_padding_mask = (tgt == 0)
        
        return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
    
    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask
    
    def forward(self, src, tgt):
        # Embedding and positional encoding
        src_emb = self.positional_encoding(self.token_embedding(src) * math.sqrt(self.d_model))
        tgt_emb = self.positional_encoding(self.token_embedding(tgt) * math.sqrt(self.d_model))
        
        # Create masks
        src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = self.create_mask(src, tgt)
        
        # Transformer encoding and decoding
        memory = self.transformer_encoder(src_emb, src_key_padding_mask=src_padding_mask)
        output = self.transformer_decoder(tgt_emb, memory, 
                                         tgt_mask=tgt_mask,
                                         tgt_key_padding_mask=tgt_padding_mask,
                                         memory_key_padding_mask=src_padding_mask)
        
        # Project to vocabulary size
        return self.output_layer(output)
    
    def encode(self, src):
        src_emb = self.positional_encoding(self.token_embedding(src) * math.sqrt(self.d_model))
        src_padding_mask = (src == 0)
        return self.transformer_encoder(src_emb, src_key_padding_mask=src_padding_mask)
    
    def decode(self, tgt, memory):
        tgt_emb = self.positional_encoding(self.token_embedding(tgt) * math.sqrt(self.d_model))
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(1)).to(tgt.device)
        tgt_padding_mask = (tgt == 0)
        
        output = self.transformer_decoder(tgt_emb, memory, tgt_mask=tgt_mask)
        return self.output_layer(output)

# Custom tokenizer
class SimpleTokenizer:
    def __init__(self, tokenization_type='word'):
        self.word_to_idx = {}
        self.idx_to_word = {}
        self.tokenization_type = tokenization_type  # 'word' or 'char'
        
        # Special tokens
        self.pad_token = '[PAD]'
        self.unk_token = '[UNK]'
        self.bos_token = '[BOS]'
        self.eos_token = '[EOS]'
        
        # Add special tokens
        self.add_special_tokens()
    
    def add_special_tokens(self):
        self.word_to_idx = {
            self.pad_token: 0,
            self.unk_token: 1,
            self.bos_token: 2,
            self.eos_token: 3
        }
        self.idx_to_word = {v: k for k, v in self.word_to_idx.items()}
    
    def tokenize(self, text):
        if self.tokenization_type == 'word':
            return text.split()
        else:  # char tokenization
            return list(text)
    
    def fit(self, texts):
        vocab = set()
        
        # Extract all unique tokens
        for text in texts:
            tokens = self.tokenize(text)
            vocab.update(tokens)
        
        # Add tokens to vocabulary
        for token in sorted(vocab):
            if token not in self.word_to_idx:
                self.word_to_idx[token] = len(self.word_to_idx)
                self.idx_to_word[len(self.idx_to_word)] = token
    
    def encode(self, text, add_special_tokens=True):
        tokens = self.tokenize(text)
        
        # Add special tokens if needed
        if add_special_tokens:
            tokens = [self.bos_token] + tokens + [self.eos_token]
        
        # Convert tokens to IDs
        ids = [self.word_to_idx.get(token, self.word_to_idx[self.unk_token]) for token in tokens]
        return ids
    
    def decode(self, ids, skip_special_tokens=True):
        tokens = [self.idx_to_word.get(id, self.unk_token) for id in ids]
        
        # Remove special tokens if needed
        if skip_special_tokens:
            tokens = [token for token in tokens if token not in [self.pad_token, self.unk_token, self.bos_token, self.eos_token]]
        
        # Join tokens
        if self.tokenization_type == 'word':
            return ' '.join(tokens)
        else:  # char tokenization
            return ''.join(tokens)
    
    def vocab_size(self):
        return len(self.word_to_idx)

# Dataset for text generation using TinyStories
class TinyStoriesDataset(Dataset):
    def __init__(self, dataset, tokenizer, seq_length=64, split="train"):
        self.tokenizer = tokenizer
        self.seq_length = seq_length
        self.dataset = dataset[split]
        
        # Tokenize the stories
        print(f"Tokenizing {split} dataset...")
        self.tokenized_stories = []
        
        # Process a subset for faster training (adjust as needed)
        num_samples = min(10000, len(self.dataset))
        for i in tqdm(range(num_samples)):
            story = self.dataset[i]["text"]
            tokens = tokenizer.encode(story, add_special_tokens=True)
            self.tokenized_stories.append(tokens)
    
    def __len__(self):
        return len(self.tokenized_stories)
    
    def __getitem__(self, idx):
        tokens = self.tokenized_stories[idx]
        
        # Ensure tokens are the right length
        if len(tokens) <= self.seq_length + 1:
            # Pad to sequence length
            tokens = tokens + [0] * (self.seq_length + 1 - len(tokens))
        else:
            # Choose a random starting point to fit sequence length
            start = random.randint(0, len(tokens) - self.seq_length - 1)
            tokens = tokens[start:start + self.seq_length + 1]
        
        src = torch.tensor(tokens[:-1])
        tgt = torch.tensor(tokens[1:])
        
        return src, tgt

# Training function
def train_custom_model(model, train_dataloader, val_dataloader, optimizer, scheduler, device, epochs=10):
    model.train()
    best_val_loss = float('inf')
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        epoch_loss = 0
        progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
        
        for batch_idx, (src, tgt) in enumerate(progress_bar):
            src, tgt = src.to(device), tgt.to(device)
            
            # Forward pass
            optimizer.zero_grad()
            output = model(src, src)  # Using teacher forcing
            
            # Reshape output and target for loss calculation
            output_flat = output.view(-1, model.vocab_size)
            target_flat = tgt.contiguous().view(-1)
            
            # Calculate loss
            loss = F.cross_entropy(output_flat, target_flat, ignore_index=0)
            
            # Backward pass and optimize
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()
            
            # Update loss
            epoch_loss += loss.item()
            
            # Update progress bar
            progress_bar.set_postfix({"loss": loss.item()})
        
        avg_train_loss = epoch_loss / len(train_dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Training loss: {avg_train_loss:.4f}")
        
        # Validation phase
        model.eval()
        val_loss = 0
        progress_bar = tqdm(val_dataloader, desc=f"Epoch {epoch+1}/{epochs} [Val]")
        
        with torch.no_grad():
            for batch_idx, (src, tgt) in enumerate(progress_bar):
                src, tgt = src.to(device), tgt.to(device)
                
                # Forward pass
                output = model(src, src)
                
                # Calculate loss
                output_flat = output.view(-1, model.vocab_size)
                target_flat = tgt.contiguous().view(-1)
                loss = F.cross_entropy(output_flat, target_flat, ignore_index=0)
                
                # Update loss
                val_loss += loss.item()
                
                # Update progress bar
                progress_bar.set_postfix({"loss": loss.item()})
        
        avg_val_loss = val_loss / len(val_dataloader)
        print(f"Epoch {epoch+1}/{epochs}, Validation loss: {avg_val_loss:.4f}")
        
        # Save best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save({
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "val_loss": best_val_loss,
            }, "best_model.pt")
            print(f"Saved new best model with validation loss: {best_val_loss:.4f}")
    
    return model

# Text generation function
def generate_text(model, tokenizer, prompt, max_length=100, temperature=1.0, device="cuda"):
    model.eval()
    
    # Encode prompt
    input_ids = tokenizer.encode(prompt)
    input_tensor = torch.tensor([input_ids], dtype=torch.long).to(device)
    
    # Initialize output sequence
    output_ids = input_ids.copy()
    
    # Generate one token at a time
    for _ in range(max_length):
        # Prepare input (truncate if too long)
        curr_input = torch.tensor([output_ids[-min(len(output_ids), model.d_model):]], dtype=torch.long).to(device)
        
        # Generate memory from encoder
        memory = model.encode(curr_input)
        
        # Create target input (last token)
        tgt_input = torch.tensor([[output_ids[-1]]], dtype=torch.long).to(device)
        
        # Get prediction
        with torch.no_grad():
            output = model.decode(tgt_input, memory)
            
        # Apply temperature and get probabilities
        logits = output[0, -1, :] / temperature
        probs = F.softmax(logits, dim=-1)
        
        # Sample from the distribution
        next_token_id = torch.multinomial(probs, 1).item()
        
        # Add to output sequence
        output_ids.append(next_token_id)
        
        # Stop if end of sequence
        if next_token_id == tokenizer.word_to_idx[tokenizer.eos_token]:
            break
    
    # Decode output sequence
    generated_text = tokenizer.decode(output_ids)
    return generated_text

# Main function
def main():
    # Check for GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Load TinyStories dataset
    print("Loading TinyStories dataset...")
    ds = load_dataset("roneneldan/TinyStories")
    
    # Create tokenizer
    tokenizer = SimpleTokenizer(tokenization_type='word')
    
    # Fit tokenizer on a subset of the data
    print("Fitting tokenizer on dataset...")
    sample_size = 1000  # Adjust based on your needs
    sample_texts = [ds["train"][i]["text"] for i in range(sample_size)]
    tokenizer.fit(sample_texts)
    
    vocab_size = tokenizer.vocab_size()
    print(f"Vocabulary size: {vocab_size}")
    
    # Create datasets and dataloaders
    seq_length = 128
    train_dataset = TinyStoriesDataset(ds, tokenizer, seq_length=seq_length, split="train")
    val_dataset = TinyStoriesDataset(ds, tokenizer, seq_length=seq_length, split="validation")
    
    batch_size = 32  # Adjust based on your GPU memory
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size)
    
    # Create model
    model = CustomTransformerModel(
        vocab_size=vocab_size,
        d_model=256,      # Embedding dimension
        nhead=8,          # Number of attention heads
        num_encoder_layers=4,
        num_decoder_layers=4,
        dim_feedforward=1024
    ).to(device)
    
    # Setup optimizer and scheduler
    optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=len(train_dataloader) * 10)
    
    # Train model
    trained_model = train_custom_model(
        model, 
        train_dataloader,
        val_dataloader,
        optimizer,
        scheduler,
        device,
        epochs=10  # Adjust based on your needs
    )
    
    # Load best model
    checkpoint = torch.load("best_model.pt")
    model.load_state_dict(checkpoint["model_state_dict"])
    
    # Generate text
    prompt = "Once upon a time"
    generated_text = generate_text(
        model,
        tokenizer,
        prompt,
        max_length=200,
        temperature=0.8,
        device=device
    )
    
    print(f"Generated text:\n{generated_text}")
    
    # Save final model
    output_dir = "./tinystories_model"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    torch.save({
        "model_state_dict": model.state_dict(),
        "tokenizer": {
            "word_to_idx": tokenizer.word_to_idx,
            "idx_to_word": tokenizer.idx_to_word,
            "tokenization_type": tokenizer.tokenization_type
        },
        "config": {
            "d_model": model.d_model,
            "vocab_size": model.vocab_size,
        }
    }, f"{output_dir}/model.pt")
    print(f"Model saved to {output_dir}/model.pt")

if __name__ == "__main__":
    main()

Using device: cuda
Loading TinyStories dataset...
Fitting tokenizer on dataset...
Vocabulary size: 10698
Tokenizing train dataset...


100%|██████████████████████████████████████████████████████████████████████████| 10000/10000 [00:01<00:00, 5936.78it/s]


Tokenizing validation dataset...


100%|██████████████████████████████████████████████████████████████████████████| 10000/10000 [00:01<00:00, 6418.37it/s]
Epoch 1/10 [Train]: 100%|█████████████████████████████████████████████████| 313/313 [11:18<00:00,  2.17s/it, loss=5.22]


Epoch 1/10, Training loss: 6.2434


  output = torch._nested_tensor_from_mask(
Epoch 1/10 [Val]: 100%|███████████████████████████████████████████████████| 313/313 [05:20<00:00,  1.02s/it, loss=4.78]


Epoch 1/10, Validation loss: 5.1652
Saved new best model with validation loss: 5.1652


Epoch 2/10 [Train]:   1%|▍                                                   | 3/313 [00:14<25:13,  4.88s/it, loss=5.2]


KeyboardInterrupt: 