In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import urllib.request
import os


- This notebook uses a project Guttenber text 
- Try replacing that with the Epicurious Recipes Dataset
```bash
bash scripts/download_kaggle_data.sh hugodarwood epirecipes
```

In [None]:

def download_corpus(url, filename):
    if not os.path.exists(filename):
        urllib.request.urlretrieve(url, filename)

corpus_url = "https://gutenberg.org/ebooks/11.txt.utf-8"
corpus_filename = "alice_in_wonderland.txt"
download_corpus(corpus_url, corpus_filename)

with open(corpus_filename, 'r') as f:
    text = f.read()

In [None]:
# Strip out non-ASCII characters
text = text.encode('ascii', 'ignore').decode('utf-8')

def split(text):
  text = text.lower()
  for punct in ',.!?\'"-;:':
    text = text.replace(punct, ' '+punct+' ')
  # Should be smarter but I am impatient for this demo
  return text

text = split(text)

# Tokenize the text (whitespace boundaries, not BPE)
tokens = text.split()
vocab = ['<BOS>', '<UNK>', '<PAD>', '<EOS>'] + list(set(tokens) )
vocab_size = len(vocab)

# Map tokens to indices
word_to_ix = {word: i for i, word in enumerate(vocab)}
ix_to_word = {i: word for word, i in word_to_ix.items()}

In [None]:
word_to_ix['alice']

In [None]:
ix_to_word[_]

In [None]:
BOS = word_to_ix['<BOS>']
PAD = word_to_ix['<PAD>']
UNK = word_to_ix['<UNK>']
EOS = word_to_ix['<EOS>']

In [None]:
# Define sequence length
seq_length = 50
sequences = []
targets = []

# Essentially the ngrams, but with a much larger context
for i in range(len(tokens) - seq_length-1):
    seq = ['<BOS>'] + tokens[i:i + seq_length-1]
    target = tokens[i:i + seq_length] # Input shifted left
    sequences.append([word_to_ix[word] for word in seq])
    targets.append([word_to_ix[word] for word in target])

In [None]:
print('Context: ', sequences[4000])
print('Target: ', targets[4000])

In [None]:
def encode(text):
  text = split(text)
  return [word_to_ix.get(wd, 0) for wd in text.split()]

def decode(sequence):
  return ' '.join([ix_to_word[ix] for ix in sequence])

In [None]:
encode("well, children, what's up?")

In [None]:
decode(_)

In [None]:
decode(sequences[4000])

In [None]:
sequences = torch.tensor(sequences)
targets = torch.tensor(targets)

In [None]:
embedding_dim = 128
hidden_dim = 256

embedding = nn.Embedding(vocab_size, embedding_dim)

In [None]:
print(embedding.weight.shape)
print("Words:", len(vocab))
print("Embedding of 'alice':")
print("   Index: ", word_to_ix['alice'])
print("   Vector:", embedding(torch.tensor([[word_to_ix['alice']]])))

In [None]:
print('Embedding shape: ', embedding(sequences[4000]).shape)

from matplotlib.pylab import plt

plt.imshow(embedding(sequences[4000]).detach(), cmap='gray')
plt.ylabel('token index')
plt.xlabel('embedding')
plt.show()

In [None]:
lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=2, batch_first=True)

In [None]:
outputs, (h, c) = lstm(embedding(sequences[4000].unsqueeze(0)))

In [None]:
print(outputs.shape)  # N, L, hidden_dim

In [None]:
print(h.shape) # num_layers, N, hidden_dim

In [None]:
fc = nn.Linear(hidden_dim, vocab_size)  # hidden-> word

In [None]:
lg_probs = fc(outputs)
wds = lg_probs.argmax(1)
wds

In [None]:
decode(wds.squeeze(0).detach().numpy())

In [None]:
# lg_probs.shape - N, L, C
# targets.shape - N, L
# cross_entropy expects:   N, C, L  for logits.

loss = torch.nn.functional.cross_entropy(lg_probs.permute(0, 2, 1), targets[4000].unsqueeze(0))
loss

In [None]:
# Putting it together

class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(LSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm(x)
        x = self.fc(x)
        return x

lstm_model = LSTMModel(vocab_size, embedding_dim, hidden_dim)

In [None]:
lstm_model(sequences[4000].unsqueeze(0)).argmax(1)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)

In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    for i in range(0, len(sequences), 32):  # Batch size of 32
        inputs = sequences[i:i+32]
        labels = targets[i:i+32]

        optimizer.zero_grad()
        outputs = lstm_model(inputs)
        loss = loss_fn(outputs.permute(0,2,1), labels)
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

In [None]:
def generate_text(model, start_text='', length=100, temperature=1.0):
    model.eval()
    input_seq = [BOS]+encode(start_text)
    input_seq = torch.tensor(input_seq, dtype=torch.long).unsqueeze(0)

    for _ in range(length):
        with torch.no_grad():
            output = model(input_seq)
            output = output[:, -1, :]  # Take last output of the sequence
            output = output / temperature  # Apply temperature
            probabilities = torch.nn.functional.softmax(output, dim=-1).squeeze()

            next_word_id = torch.multinomial(probabilities, 1).item()
            next_word = ix_to_word[next_word_id]

            yield next_word, probabilities

            input_seq = torch.cat((input_seq, torch.tensor([[next_word_id]])), dim=1)



In [None]:
line = []
for w, p in generate_text(lstm_model):
  line += ' '
  line += w
  if len(line) > 60:
    print(''.join(line))
    line = []
print()

(It's okay that we still get nonesense, it _is_ alice in wonderland. More importantly, it takes a LOT more training to get good results)

# Substituting a transformer

In [None]:
# Repeating the embedding snipped above -- to remind ourselves
embedding_dim = 128
embedding = nn.Embedding(vocab_size, embedding_dim)

In [None]:
# We will be doing a GPT style decoder only, however we will be using the pytorch _encoder_ class
# This is because the pytorch decoder includes cross-attention, which GPT style decoders dont use
# Also,  a decoder is essentially an encoder with masked attention

def generate_causal_mask(size):
    # Creates a triangular mask that blocks future tokens
    mask = torch.triu(torch.ones(size, size) * float('-inf'), diagonal=1)
    return mask

plt.figure(facecolor='w')
plt.imshow(torch.exp(generate_causal_mask(10)), cmap='gray', vmin=0, vmax=1)
plt.xticks([])
plt.yticks([])
plt.title('Causal Mask (w=1, k=0)')
plt.show()

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dim, max_len=200):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, embedding_dim)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-np.log(10000.0) / embedding_dim))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(1))  # Shape (max_len, 1, embedding_dim)

    def forward(self, x):
        # Ensure `pe` matches the input length in sequence dimension
        return x + self.pe[:x.size(0), :]

pos_encoder = PositionalEncoding(embedding_dim)

In [None]:
plt.imshow(pos_encoder.pe.detach().squeeze())  # This is what is added to each embedding
plt.yticks([0,200])
plt.xticks([0,embedding_dim])
plt.ylabel('Position (index) in input')
plt.xlabel('Embedding')

In [None]:
encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=8)
encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)

In [None]:
encoder(embedding(sequences[4000].unsqueeze(0))).shape

In [None]:
class TransformerEncoderModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_layers=2, nhead=8, max_len=200):
        super(TransformerEncoderModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.pos_encoder = PositionalEncoding(embedding_dim, max_len)
        encoder_layer = nn.TransformerEncoderLayer(d_model=embedding_dim, nhead=nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embedding_dim, vocab_size)  # Output layer to predict next token

    def forward(self, x, mask=None):
        x = self.embedding(x) * np.sqrt(x.size(-1))
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x, mask=mask)
        x = self.fc(x)
        return x

transformer_model = TransformerEncoderModel(vocab_size, embedding_dim)

In [None]:
lg_probs = transformer_model(sequences[4000].unsqueeze(0))

lg_probs.argmax(1)

In [None]:
decode(lg_probs.squeeze(0).argmax(1).detach().numpy())

In [None]:
num_epochs = 10
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)
loss_fn = nn.CrossEntropyLoss()

for epoch in range(num_epochs):  # Number of epochs
    for i in range(0, len(sequences), 32):  # Batch size of 32
        inputs = sequences[i:i+32]
        labels = targets[i:i+32]

        optimizer.zero_grad()
        causal_mask = generate_causal_mask(inputs.size(0)).to(inputs.device)
        outputs = transformer_model(inputs, mask=causal_mask)

        loss = loss_fn(outputs.view(-1, vocab_size), labels.view(-1))
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}, Loss: {loss.item():.4f}")

In [None]:
line = []
for w, p in generate_text(transformer_model):
  line += ' '
  line += w
  if len(line) > 60:
    print(''.join(line))
    line = []
print()