In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import logging

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# Setup Logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

In [4]:
# Generate Look-ahead Mask
def generate_lookahead_mask(sequence_length):
    mask = torch.triu(torch.ones(sequence_length, sequence_length), diagonal=1)
    return mask.masked_fill(mask==1, float('-inf'))

In [5]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [6]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, heads, dropout, forward_expansion):
        super(TransformerBlock, self).__init__()
        self.embed_size = embed_size
        self.attention = nn.MultiheadAttention(embed_size, heads)
        self.norm1 = nn.LayerNorm(embed_size)
        self.norm2 = nn.LayerNorm(embed_size)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_size, forward_expansion * embed_size),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_size, embed_size),
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, value, key, query, mask):
        # Transpose the first two dimensions
        value = value.transpose(0, 1)
        key = key.transpose(0, 1)
        query = query.transpose(0, 1)
        if value.size(2) != self.embed_size or key.size(2) != self.embed_size or query.size(2) != self.embed_size:
          logger.error("Input tensors have incorrect dimension!")
          raise ValueError("The dimensions of value, key, and query tensors must match the specified embed_size.")

        try:
          attention, _ = self.attention(query, key, value, attn_mask=mask, need_weights=False)
          # Transpose the first two dimensions back
          attention = attention.transpose(0, 1)
          x = self.dropout(self.norm1(attention + query.transpose(0, 1)))
          forward = self.feed_forward(x)
          out = self.dropout(self.norm2(forward + x))
        except Exception as e:
          logger.error(f"Error occurred in TransformerBlock: {str(e)}")
          raise

        return out

class NanoGPT(nn.Module):
    def __init__(self, vocab_size, embed_size, num_layers, heads, device, forward_expansion=4, dropout=0.5, max_length=512):
        super(NanoGPT, self).__init__()
        self.embed_size = embed_size
        self.device = device
        self.word_embedding = nn.Embedding(vocab_size, embed_size)
        self.position_embedding = nn.Embedding(max_length, embed_size)

        self.transformer_blocks = nn.ModuleList(
            [
                TransformerBlock(embed_size, heads, dropout, forward_expansion)
                for _ in range(num_layers)
            ]
        )
        self.fc_out = nn.Linear(embed_size, vocab_size)

    def forward(self, x, mask):
        N, seq_length = x.shape
        if seq_length > self.position_embedding.num_embeddings:
            logger.error("Input sequence length exceeds maximum allowed length!")
            raise ValueError("Input sequence length exceeds the maximum allowed length for positional embeddings.")

        try:
            positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
            out = self.word_embedding(x) + self.position_embedding(positions)
            for layer in self.transformer_blocks:
                out = layer(out, out, out, mask)
            out = self.fc_out(out)
        except Exception as e:
            logger.error(f"Error occurred in NanoGPT: {str(e)}")
            raise

        return out



In [7]:
class BookDataset(Dataset):
    def __init__(self, file_path, vocab, sequence_length):
        with open(file_path, 'r', encoding='utf-8') as f:
            self.text = f.read()
        self.text = self.text.replace('\n', ' ')
        self.vocab = vocab
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.text) - self.sequence_length

    def __getitem__(self, idx):
        sequence = self.text[idx:idx+self.sequence_length]
        input_sequence = torch.tensor([self.vocab[char] for char in sequence[:-1]], dtype=torch.long)
        target_sequence = torch.tensor([self.vocab[char] for char in sequence[1:]], dtype=torch.long)
        return input_sequence, target_sequence

# Create Vocabulary from the Book Text
with open("/content/drive/MyDrive/Harry Potter.txt", 'r', encoding='utf-8') as f:
    text = f.read()
unique_chars = set(text)
vocab = {char: idx for idx, char in enumerate(unique_chars)}


In [None]:
NUM_EPOCHS = 10
LEARNING_RATE = 0.001
BATCH_SIZE = 64

# Model, Loss, Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NanoGPT(vocab_size=len(vocab), embed_size=512, num_layers=2, heads=2, device=device).to(device)
print(f"The model has {count_parameters(model):,} trainable parameters.")
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# DataLoader
dataset = BookDataset("/content/drive/MyDrive/Harry Potter.txt", vocab, sequence_length=100)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

# Training Loop
for epoch in range(NUM_EPOCHS):
    for batch_idx, (data, targets) in enumerate(dataloader):
        data = data.to(device)
        targets = targets.to(device)
        num_heads = 2  # Make sure this matches the number of heads you're using in the MultiheadAttention module
        mask = generate_lookahead_mask(data.shape[1]).unsqueeze(0).expand(num_heads * data.shape[0], -1, -1).to(device)

        # Forward
        try:
            outputs = model(data, mask)
            loss = loss_fn(outputs.view(-1, outputs.shape[2]), targets.view(-1))
        except Exception as e:
            logger.error(f"Error during forward pass at epoch {epoch}, batch {batch_idx}: {str(e)}")
            continue

        # Backward
        try:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        except Exception as e:
            logger.error(f"Error during backward pass at epoch {epoch}, batch {batch_idx}: {str(e)}")
            continue

        # Print loss occasionally
        if batch_idx % 100 == 0:
            logger.info(f"Epoch [{epoch}/{NUM_EPOCHS}] Batch {batch_idx}/{len(dataloader)} Loss: {loss.item()}")

model_path = "/content/drive/MyDrive/"  # Adjust path as needed
torch.save(model.state_dict(), model_path)




The model has 6,649,937 trainable parameters.


In [None]:
def generate_text(model, initial_text, max_length, vocab, device):
    # Convert the initial text to tensor
    initial_text = [vocab[char] for char in initial_text]
    initial_tensor = torch.tensor(initial_text).unsqueeze(0).to(device)

    # Set the model to evaluation mode
    model.eval()
    with torch.no_grad():
        for _ in range(max_length - len(initial_text)):
            # Generate a mask for the current sequence
            mask = generate_lookahead_mask(initial_tensor.shape[1]).to(device)

            # Get the model's predictions
            outputs = model(initial_tensor, mask)
            predictions = outputs[:, -1, :]  # Only get the prediction for the last character

            # Choose the character with the highest probability as the next character
            _, next_char_idx = predictions.topk(1, dim=-1)
            initial_tensor = torch.cat([initial_tensor, next_char_idx], dim=1)  # Add the new character to the sequence

    # Convert the tensor of character indices back to string
    generated_text = ''.join([list(vocab.keys())[list(vocab.values()).index(idx)] for idx in initial_tensor[0].tolist()])

    return generated_text


In [None]:
# Assuming you've defined the NanoGPT class in your script
model = NanoGPT(vocab_size=len(vocab), embed_size=512, num_layers=2, heads=2, device=device)  # Adjust parameters as needed

# Load the state dictionary
model_path = "/content/drive/MyDrive/"  # Adjust path as needed
model.load_state_dict(torch.load(model_path))
model.to(device)  # Move model to the desired device (e.g., CUDA)


In [None]:
initial_text = "once upon a"  # Starting text
generated_length = 100  # The total length of the generated text

generated = generate_text(model, initial_text, generated_length, vocab, device)
print(generated)

In [None]:
question = "Who is the main protagonist in the book?"
prompt = f"Answering a question about the book: {question}"
answer = generate_text(model, prompt, 150, vocab, device)  # 150 is just a chosen length
print(answer)
