In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

# Set the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')



Using device: cuda


In [2]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        
        # Create a matrix of shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        
        # Position indices (0 to max_len - 1)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        
        # Compute the positional encodings once in log space
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)  # Apply sin to even indices
        pe[:, 1::2] = torch.cos(position * div_term)  # Apply cos to odd indices
        
        pe = pe.unsqueeze(0)  # Shape: (1, max_len, d_model)
        self.register_buffer('pe', pe)  # Not a parameter, but should persist with the model

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :].to(x.device)  # Add positional encoding
        return x


In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads  # Dimension of each head
        
        # Define linear layers for Q, K, V
        self.W_Q = nn.Linear(d_model, d_model)
        self.W_K = nn.Linear(d_model, d_model)
        self.W_V = nn.Linear(d_model, d_model)
        
        # Final linear layer after concatenation
        self.linear = nn.Linear(d_model, d_model)
        
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # Linear projections
        Q = self.W_Q(Q)  # Shape: (batch_size, seq_len_q, d_model)
        K = self.W_K(K)
        V = self.W_V(V)
        
        # Split into multiple heads
        Q = Q.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)  # Shape: (batch_size, num_heads, seq_len_q, depth)
        K = K.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        
        # Scaled Dot-Product Attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.depth)  # Shape: (batch_size, num_heads, seq_len_q, seq_len_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        
        attention_weights = F.softmax(scores, dim=-1)  # Shape: (batch_size, num_heads, seq_len_q, seq_len_k)
        
        output = torch.matmul(attention_weights, V)  # Shape: (batch_size, num_heads, seq_len_q, depth)
        
        # Concatenate heads
        output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)  # Shape: (batch_size, seq_len_q, d_model)
        
        # Final linear layer
        output = self.linear(output)  # Shape: (batch_size, seq_len_q, d_model)
        
        return output, attention_weights


In [4]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        
        self.linear1 = nn.Linear(d_model, d_ff)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(d_ff, d_model)
        
    def forward(self, x):
        x = self.linear1(x)  # Shape: (batch_size, seq_len, d_ff)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)  # Shape: (batch_size, seq_len, d_model)
        return x


In [5]:
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerBlock, self).__init__()
        
        self.multi_head_attention = MultiHeadAttention(d_model, num_heads)
        self.dropout1 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        
        self.feed_forward = PositionwiseFeedForward(d_model, d_ff, dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(d_model)
        
    def forward(self, x, mask=None):
        # Multi-Head Attention with Residual Connection
        attn_output, _ = self.multi_head_attention(x, x, x, mask)
        x = x + self.dropout1(attn_output)
        x = self.norm1(x)
        
        # Feed-Forward Network with Residual Connection
        ff_output = self.feed_forward(x)
        x = x + self.dropout2(ff_output)
        x = self.norm2(x)
        
        return x


In [6]:
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len=5000, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        
        self.d_model = d_model
        
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        
        self.layers = nn.ModuleList([
            TransformerBlock(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, src, mask=None):
        # Embedding and positional encoding
        x = self.embedding(src) * math.sqrt(self.d_model)
        x = self.positional_encoding(x)
        x = self.dropout(x)
        
        # Pass through transformer blocks
        for layer in self.layers:
            x = layer(x, mask)
        
        return x


In [7]:
class TransformerLanguageModel(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, max_len=5000, dropout=0.1):
        super(TransformerLanguageModel, self).__init__()
        
        self.encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout)
        self.output_layer = nn.Linear(d_model, vocab_size)
        
    def forward(self, src, mask=None):
        encoder_output = self.encoder(src, mask)
        logits = self.output_layer(encoder_output)  # Shape: (batch_size, seq_len, vocab_size)
        return logits


In [8]:
import string

# Define the character set (e.g., lowercase letters, digits, punctuation)
characters = string.ascii_lowercase + string.digits + string.punctuation + ' '

# Create mappings from characters to indices and vice versa
char2idx = {char: idx for idx, char in enumerate(characters)}
idx2char = {idx: char for idx, char in enumerate(characters)}

# Update the vocabulary size
vocab_size = len(characters)
print(f"Vocabulary Size: {vocab_size}")


Vocabulary Size: 69


In [9]:
# Hyperparameters for the character-level model
d_model = 256       # Embedding dimension
num_layers = 8      # Number of transformer blocks
num_heads = 4       # Number of attention heads
d_ff = 256          # Feed-forward network dimension
max_len = 500       # Maximum sequence length (longer sequences for character-level)
dropout = 0.1


In [10]:
with open("jokes.txt") as f:
    text_data = f.read()

In [11]:
text_data[:100]

'What did one pirate say to the other when he beat him at chess?<>Checkmatey.\nI burned 2000 calories '

In [12]:
# Convert text data to indices
data_indices = [char2idx[char] for char in text_data if char in char2idx]

print("Data Indices:", data_indices[:100])

Data Indices: [7, 0, 19, 68, 3, 8, 3, 68, 14, 13, 4, 68, 15, 8, 17, 0, 19, 4, 68, 18, 0, 24, 68, 19, 14, 68, 19, 7, 4, 68, 14, 19, 7, 4, 17, 68, 22, 7, 4, 13, 68, 7, 4, 68, 1, 4, 0, 19, 68, 7, 8, 12, 68, 0, 19, 68, 2, 7, 4, 18, 18, 56, 53, 55, 7, 4, 2, 10, 12, 0, 19, 4, 24, 49, 68, 1, 20, 17, 13, 4, 3, 68, 28, 26, 26, 26, 68, 2, 0, 11, 14, 17, 8, 4, 18, 68, 19, 14, 3, 0]


In [13]:
sequence_length = 256  # Adjust based on your data and memory constraints

# Create input and target sequences
def create_sequences(data, seq_length):
    inputs = []
    targets = []
    for i in range(len(data) - seq_length):
        input_seq = data[i:i+seq_length]
        target_seq = data[i+1:i+seq_length+1]
        inputs.append(input_seq)
        targets.append(target_seq)
    return inputs, targets

inputs, targets = create_sequences(data_indices, sequence_length)

print(f"Number of sequences: {len(inputs)}")


Number of sequences: 16677


In [14]:
len(inputs), len(inputs[0])

(16677, 256)

In [15]:
from torch.utils.data import Dataset, DataLoader

class CharDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets
    
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        input_seq = torch.tensor(self.inputs[idx], dtype=torch.long)
        target_seq = torch.tensor(self.targets[idx], dtype=torch.long)
        return input_seq, target_seq

dataset = CharDataset(inputs, targets)
batch_size = 16

dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


In [16]:
# Instantiate the model
model = TransformerLanguageModel(vocab_size, d_model, num_layers, num_heads, d_ff, max_len, dropout).to(device)


In [17]:
# Loss function
criterion = nn.CrossEntropyLoss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [18]:
# Number of epochs
epochs = 10

for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch_inputs, batch_targets in dataloader:
        # Move data to device
        batch_inputs = batch_inputs.to(device)
        batch_targets = batch_targets.to(device)
        
        optimizer.zero_grad()
        
        # Forward pass
        logits = model(batch_inputs)  # Shape: (batch_size, seq_length, vocab_size)
        
        # Compute loss
        loss = criterion(logits.view(-1, vocab_size), batch_targets.view(-1))
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(dataloader)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}")




Epoch 1/10, Loss: 2.2032
Epoch 2/10, Loss: 1.9236
Epoch 3/10, Loss: 1.8821
Epoch 4/10, Loss: 1.8648
Epoch 5/10, Loss: 1.8566
Epoch 6/10, Loss: 1.8500
Epoch 7/10, Loss: 1.8455
Epoch 8/10, Loss: 1.8406
Epoch 9/10, Loss: 1.8385
Epoch 10/10, Loss: 1.8355


In [23]:
model.eval()

def generate_text(model, start_string, generation_length=100):
    model.eval()
    input_indices = [char2idx[char] for char in start_string if char in char2idx]
    input_seq = torch.tensor(input_indices, dtype=torch.long).unsqueeze(0).to(device)  # Shape: (1, seq_length)
    
    generated_text = start_string
    
    for _ in range(generation_length):
        # Get logits from the model
        logits = model(input_seq)  # Shape: (1, seq_length, vocab_size)
        next_token_logits = logits[:, -1, :]  # Get logits for the last character
        # Apply softmax to get probabilities
        probabilities = F.softmax(next_token_logits, dim=-1)
        # Sample from the distribution or take the argmax
        next_token = torch.argmax(probabilities, dim=-1).item()
        # Append the predicted character to the generated text
        next_char = idx2char[next_token]
        generated_text += next_char
        # Update the input sequence
        next_token_tensor = torch.tensor([[next_token]], dtype=torch.long).to(device)
        input_seq = torch.cat([input_seq, next_token_tensor], dim=1)
        # Optionally, keep only the last `sequence_length` tokens
        input_seq = input_seq[:, -sequence_length:]
    
    return generated_text


# Example usage
start_string = "Ok..."
generated_text = generate_text(model, start_string, generation_length=100)
print("Generated Text:")
print(generated_text)


Generated Text:
Ok.......................................................................................................
