<a href="https://colab.research.google.com/github/mazen200555/curriculum/blob/master/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import pandas as pd
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=5000):
        super(PositionalEncoding, self).__init__()

        # Create positional encoding matrix
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Apply sine to even indices
        pe[:, 0::2] = torch.sin(position * div_term)
        # Apply cosine to odd indices
        pe[:, 1::2] = torch.cos(position * div_term)

        # Add batch dimension and register as buffer (not a parameter)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # Add positional encoding to input
        return x + self.pe[:, :x.size(1)]

In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads, dropout=0.1):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads

        # Single linear layer for all projections
        self.c_attn = nn.Linear(d_model, 3 * d_model)
        self.c_proj = nn.Linear(d_model, d_model)

        self.attn_dropout = nn.Dropout(dropout)
        self.proj_dropout = nn.Dropout(dropout)

        self.scale = 1 / math.sqrt(self.d_k)

    def forward(self, x, mask=None):
        batch_size, seq_length, _ = x.size()

        # Project inputs to queries, keys, and values
        q, k, v = self.c_attn(x).chunk(3, dim=2)

        # Reshape for multi-head attention
        q = q.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        k = k.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        v = v.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)

        # Compute attention scores
        scores = torch.matmul(q, k.transpose(-2, -1)) * self.scale

        # Apply causal (triangular) mask
        if mask is None:
            mask = torch.tril(torch.ones(seq_length, seq_length)).view(1, 1, seq_length, seq_length).to(x.device)

        # Set masked positions to negative infinity
        scores = scores.masked_fill(mask == 0, -1e10)

        # Apply softmax to get attention weights
        attn_weights = F.softmax(scores, dim=-1)
        attn_weights = self.attn_dropout(attn_weights)

        # Apply attention weights to values
        context = torch.matmul(attn_weights, v)

        # Reshape back to original dimensions
        context = context.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)

        # Final projection
        output = self.c_proj(context)
        output = self.proj_dropout(output)

        return output

In [4]:
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff=2048, dropout=0.1):
        super(FeedForward, self).__init__()
        self.c_fc = nn.Linear(d_model, d_ff)
        self.c_proj = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Apply first linear transformation and GELU activation
        x = F.gelu(self.c_fc(x))
        # Apply second linear transformation and dropout
        x = self.c_proj(x)
        x = self.dropout(x)
        return x

In [5]:
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-12):
        super(LayerNorm, self).__init__()
        self.weight = nn.Parameter(torch.ones(d_model))
        self.bias = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        return self.weight * (x - mean) / (std + self.eps) + self.bias

In [6]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff=2048, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.ln_1 = LayerNorm(d_model)
        self.attn = MultiHeadAttention(d_model, num_heads, dropout)
        self.ln_2 = LayerNorm(d_model)
        self.ff = FeedForward(d_model, d_ff, dropout)

    def forward(self, x, mask=None):
        # Self-attention with residual connection and layer normalization
        x = x + self.attn(self.ln_1(x), mask)
        # Feed-forward with residual connection and layer normalization
        x = x + self.ff(self.ln_2(x))
        return x

In [7]:
class GPTModel(nn.Module):
    def __init__(self, vocab_size, d_model=768, num_heads=12, num_layers=12,
                 max_seq_length=1024, dropout=0.1, d_ff=3072):
        super(GPTModel, self).__init__()

        # Token embeddings
        self.token_embedding = nn.Embedding(vocab_size, d_model)
        # Positional encoding
        self.position_embedding = nn.Embedding(max_seq_length, d_model)

        # Dropout for embeddings
        self.emb_dropout = nn.Dropout(dropout)

        # Stack of transformer blocks
        self.blocks = nn.ModuleList(
            [TransformerBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )

        # Final layer normalization
        self.ln_f = LayerNorm(d_model)

        # Output projection to vocabulary
        self.head = nn.Linear(d_model, vocab_size, bias=False)

        # Initialize weights
        self.apply(self._init_weights)

        # Store dimensions
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.max_seq_length = max_seq_length

    def _init_weights(self, module):
        if isinstance(module, (nn.Linear, nn.Embedding)):
            module.weight.data.normal_(mean=0.0, std=0.02)
            if isinstance(module, nn.Linear) and module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def get_block_size(self):
        return self.max_seq_length

    def forward(self, idx, targets=None):
        b, t = idx.size()
        assert t <= self.max_seq_length, f"Cannot forward sequence of length {t}, max is {self.max_seq_length}"

        # Get token embeddings
        token_embeddings = self.token_embedding(idx)  # (b, t, d_model)

        # Get position embeddings
        positions = torch.arange(0, t, dtype=torch.long, device=idx.device).unsqueeze(0)  # (1, t)
        position_embeddings = self.position_embedding(positions)  # (1, t, d_model)

        # Combine token and position embeddings
        x = token_embeddings + position_embeddings  # (b, t, d_model)
        x = self.emb_dropout(x)

        # Create causal mask for self-attention
        mask = torch.tril(torch.ones(t, t)).view(1, 1, t, t).to(idx.device)

        # Pass through transformer blocks
        for block in self.blocks:
            x = block(x, mask)

        # Apply final layer normalization
        x = self.ln_f(x)

        # Project to vocabulary size
        logits = self.head(x)  # (b, t, vocab_size)

        # Calculate loss if targets are provided
        loss = None
        if targets is not None:
            # Reshape logits and targets for loss calculation
            loss = F.cross_entropy(logits.view(-1, self.vocab_size), targets.view(-1))

        return logits, loss

In [12]:
class PromptResponseDataset(Dataset):
    def __init__(self, csv_file, tokenizer, max_length=1024):
        """
        Args:
            csv_file (str): Path to the CSV file with prompt-response pairs
            tokenizer: Tokenizer to encode the text
            max_length (int): Maximum sequence length
        """
        # Load CSV data
        self.data = pd.read_csv(csv_file)
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # Get prompt and response for the index
        prompt = self.data.iloc[idx]['question']
        response = self.data.iloc[idx]['answer']

        # Combine prompt and response with separators
        full_text = f"{prompt}\n{response}"

        # Tokenize the text
        encodings = self.tokenizer(full_text,
                                  truncation=True,
                                  max_length=self.max_length,
                                  padding="max_length",
                                  return_tensors="pt")

        # Get input_ids and create target labels (shifted right)
        input_ids = encodings['input_ids'].squeeze()

        # For causal language modeling, targets are input_ids shifted right
        targets = input_ids.clone()

        # Shift targets right (first token predicts second token, etc.)
        targets[:-1] = input_ids[1:]
        # Last target is padding (will be masked by loss function)
        targets[-1] = -100

        return {
            'input_ids': input_ids,
            'targets': targets
        }

In [9]:
class SimpleTokenizer:
    def __init__(self, vocab_file=None):
        """
        Simple tokenizer for illustration purposes.
        In practice, you'd use a more sophisticated tokenizer like BPE.

        Args:
            vocab_file (str, optional): Path to vocabulary file
        """
        if vocab_file:
            with open(vocab_file, 'r', encoding='utf-8') as f:
                self.vocab = {token.strip(): i for i, token in enumerate(f)}
        else:
            # Create a basic vocabulary with special tokens
            self.vocab = {
                "<PAD>": 0,
                "<UNK>": 1,
                "<BOS>": 2,
                "<EOS>": 3,
            }
            # Add basic ASCII characters
            for i in range(32, 127):
                self.vocab[chr(i)] = len(self.vocab)

        self.id_to_token = {v: k for k, v in self.vocab.items()}

    def __call__(self, text, truncation=False, max_length=None, padding=None, return_tensors=None):
        """Tokenize text to indices."""
        if isinstance(text, str):
            tokens = list(text)  # Character-level tokenization for simplicity

            # Convert tokens to ids
            ids = [self.vocab.get(token, self.vocab["<UNK>"]) for token in tokens]

            # Truncate if needed
            if truncation and max_length is not None and len(ids) > max_length:
                ids = ids[:max_length]

            # Pad if needed
            if padding == "max_length" and max_length is not None:
                ids = ids + [self.vocab["<PAD>"]] * (max_length - len(ids))

            # Convert to tensor if requested
            if return_tensors == "pt":
                input_ids = torch.tensor(ids, dtype=torch.long).unsqueeze(0)
            else:
                input_ids = ids

            return {"input_ids": input_ids}

    def decode(self, ids):
        """Convert ids back to text."""
        if isinstance(ids, torch.Tensor):
            ids = ids.tolist()

        return "".join([self.id_to_token.get(id, "<UNK>") for id in ids])

    def get_vocab_size(self):
        """Return the size of the vocabulary."""
        return len(self.vocab)

In [10]:
def train_model(model, train_dataset, val_dataset=None, batch_size=4,
                epochs=3, learning_rate=5e-5, device="cuda" if torch.cuda.is_available() else "cpu"):
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size) if val_dataset else None

    # Set model to training mode
    model.to(device)
    model.train()

    # Initialize optimizer and scheduler
    optimizer = AdamW(model.parameters(), lr=learning_rate)

    # Calculate total training steps for scheduler
    num_training_steps = len(train_loader) * epochs
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=int(0.1 * num_training_steps),
        num_training_steps=num_training_steps
    )

    # Training loop
    for epoch in range(epochs):
        total_loss = 0

        for batch in train_loader:
            # Get inputs and targets
            input_ids = batch['input_ids'].to(device)
            targets = batch['targets'].to(device)

            # Forward pass
            _, loss = model(input_ids, targets)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            scheduler.step()

            total_loss += loss.item()

        # Calculate average loss for the epoch
        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_loss:.4f}")

        # Validation
        if val_loader:
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for batch in val_loader:
                    input_ids = batch['input_ids'].to(device)
                    targets = batch['targets'].to(device)
                    _, loss = model(input_ids, targets)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Epoch {epoch+1}/{epochs}, Validation Loss: {avg_val_loss:.4f}")
            model.train()

    return model

In [11]:
def generate_text(model, tokenizer, prompt, max_length=50, temperature=1.0, top_k=50):
    model.eval()
    device = next(model.parameters()).device

    # Tokenize prompt
    prompt_tokens = tokenizer(prompt, return_tensors="pt")
    input_ids = prompt_tokens["input_ids"].to(device)

    # Track generated tokens
    generated = input_ids.clone()

    # Generate until max_length or until EOS token
    with torch.no_grad():
        for _ in range(max_length):
            # Get predictions from model
            logits, _ = model(generated)

            # Focus on last token's prediction
            next_token_logits = logits[:, -1, :] / temperature

            # Apply top-k sampling
            if top_k > 0:
                indices_to_remove = top_k_filtering(next_token_logits, top_k)
                next_token_logits[indices_to_remove] = -float('Inf')

            # Apply softmax to convert logits to probabilities
            probs = F.softmax(next_token_logits, dim=-1)

            # Sample from the distribution
            next_token = torch.multinomial(probs, num_samples=1)

            # Append new token to sequence
            generated = torch.cat([generated, next_token], dim=1)

            # Stop if EOS token is generated
            if next_token.item() == tokenizer.vocab.get("<EOS>", None):
                break

    # Decode the generated text
    output = tokenizer.decode(generated[0])

    return output

def top_k_filtering(logits, k):
    """Keep only the top k tokens with highest probability."""
    values, _ = torch.topk(logits, k)
    min_values = values[:, -1].unsqueeze(1).repeat(1, logits.shape[-1])
    return logits < min_values

In [None]:
def main():
    # Initialize tokenizer and model
    tokenizer = SimpleTokenizer()
    vocab_size = tokenizer.get_vocab_size()

    # Create model
    model = GPTModel(
        vocab_size=vocab_size,
        d_model=256,         # Smaller for demonstration
        num_heads=8,
        num_layers=6,
        max_seq_length=1024,
        dropout=0.1
    )

    # Prepare dataset
    train_dataset = PromptResponseDataset("/content/Conversation.csv", tokenizer)
    val_dataset = PromptResponseDataset("/content/Conversation.csv", tokenizer)

    # Train model
    model = train_model(
        model=model,
        train_dataset=train_dataset,
        val_dataset=val_dataset,
        batch_size=4,
        epochs=3,
        learning_rate=5e-5
    )

    # Save model
    torch.save(model.state_dict(), "gpt_model.pth")

    # Generate text with the trained model
    prompt = "What is machine learning?"
    generated_text = generate_text(model, tokenizer, prompt, max_length=100)
    print(f"Prompt: {prompt}")
    print(f"Generated: {generated_text}")

if __name__ == "__main__":
    main()

Epoch 1/3, Train Loss: 0.3823
Epoch 1/3, Validation Loss: 0.1469
Epoch 2/3, Train Loss: 0.1448
Epoch 2/3, Validation Loss: 0.1371
