In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
import torch.optim as optim


# Download data

In [None]:
!wget -q https://www.ssa.gov/oact/babynames/names.zip
!unzip -q names.zip

In [None]:
names_path = "/content/yob2023.txt"

names = []
with open(names_path) as file:
  for line in file:
    name, _, _ = line.lower().strip().split(',')
    names.append("$" + name + "$") # $ is used for start and end tokens

print(f"{len(names)} names retrieved")

31682 names retrieved


# Bigram Model

In [None]:
vocab = "$abcdefghijklmnopqrstuvwxyz"
vocab_size = len(vocab)

# Create two dicts for encoding and decoding
char_to_index = {char: i for i, char in enumerate(vocab)}
index_to_char = {i: char for i, char in enumerate(vocab)}

In [None]:
# Bigram is a 2D matrix of probabilities (ab, ac, etc.)

bigram = torch.zeros((vocab_size, vocab_size))
total = 0
for name in names:
  for ch1, ch2 in zip(name, name[1:]):
    ch1_idx = char_to_index[ch1]
    ch2_idx = char_to_index[ch2]
    bigram[ch1_idx][ch2_idx] += 1
    total += 1
bigram /= total


In [None]:
# Sample characters given probabilities
def gaussian_sampler(bigram_probs):
  generated = "$"
  while True:
    bigram_probs = bigram[char_to_index[generated[-1]]]
    sampled_char = index_to_char[
        torch.multinomial(bigram_probs, 1).item()
    ]
    if sampled_char == "$":
      break
    generated += sampled_char
  return generated[1:]

for i in range(5):
  print(f"name {i+1}: {gaussian_sampler(bigram)}")

name 1: can
name 2: laves
name 3: ly
name 4: bry
name 5: a


Utility functions

In [None]:
def encode(word):
  indices = [char_to_index[char] for char in word]
  return torch.tensor(indices)


def decode(indices_tensor):
  indices_char = [index_to_char[i.item()] for i in indices_tensor]
  name = ''.join(indices_char)
  return name

example_name = "$ada$"
encoded_name = encode(example_name)
decoded_name = decode(encoded_name)
print(encoded_name)
print(decoded_name)

assert example_name == decoded_name, "Encoder or Decoder implemented incorrectly"


tensor([0, 1, 4, 1, 0])
$ada$


### Padding

In [None]:
name_indices = [encode(name) for name in names] # $...$
target_indices = [name_index[1:] for name_index in name_indices] # ...$

max_name_length = max(len(name) for name in names)

X = pad_sequence(name_indices, batch_first=True, padding_value=0) # extend with 0 to match the max_len for each name
target_indices.append(torch.empty((max_name_length), dtype=torch.long)) # in case max_len was not in target_indices append an empty name
Y = pad_sequence(target_indices, batch_first=True, padding_value=-1)[:-1] # remove the empty name after the padding was done
                                                                          # also extend by -1 to match the max_len
                                                                          # the model ignores -1 (no loss update)

print(X[0])
print(Y[0])

tensor([ 0, 15, 12,  9, 22,  9,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0])
tensor([15, 12,  9, 22,  9,  1,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1])


In [None]:
def get_batch(batch_size=64):
  random_idx = torch.randint(0, X.size(0), (batch_size,))
  inputs = X[random_idx]
  labels = Y[random_idx]
  return inputs, labels

inputs, labels = get_batch(3)
print(inputs)
print(labels)

tensor([[ 0, 11,  9, 18,  9,  5,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
        [ 0,  2, 18,  5, 24, 20, 15, 14,  0,  0,  0,  0,  0,  0,  0,  0,  0],
        [ 0,  1, 13,  1, 18,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0]])
tensor([[11,  9, 18,  9,  5,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1],
        [ 2, 18,  5, 24, 20, 15, 14,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1],
        [ 1, 13,  1, 18,  0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1]])


### Embedding

In [None]:
embedding_dim = 3
embedding = nn.Embedding(vocab_size, embedding_dim)

example_input = torch.tensor([[1, 1, 0, 2], [2, 1, 2, 4]])
input_embd = embedding(example_input)
print(input_embd.shape)
input_embd

torch.Size([2, 4, 3])


tensor([[[ 0.7031,  0.0419, -0.0223],
         [ 0.7031,  0.0419, -0.0223],
         [ 0.1891,  0.8083,  1.4786],
         [-0.5859,  0.9281,  1.5602]],

        [[-0.5859,  0.9281,  1.5602],
         [ 0.7031,  0.0419, -0.0223],
         [-0.5859,  0.9281,  1.5602],
         [ 1.4030, -0.7524,  0.7533]]], grad_fn=<EmbeddingBackward0>)

# A Simple Deep Neural Network

In [None]:
class SequenceMLP(nn.Module):
  def __init__(self, vocab_size, max_sequence_length, embedding_dim, hidden_dim=32):
    super().__init__()
    self.vocab_size = vocab_size
    self.max_sequence_length = max_sequence_length
    self.embedding_dim = embedding_dim
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    self.linear = nn.Linear(embedding_dim * max_sequence_length, hidden_dim)
    self.relu = nn.ReLU()
    self.out = nn.Linear(hidden_dim, vocab_size)

  def forward(self, x):
    batch_size, seq_len = x.shape
    sequence_embeddings = torch.zeros(batch_size, seq_len, self.max_sequence_length * self.embedding_dim) # batch_size x current_word_len x (longest_word_we_want * 3)
    for i in range(seq_len):
      subsequence = torch.zeros(batch_size, self.max_sequence_length, dtype=torch.int)
      prefix = x[:, :i+1] # part of name [0:i+1] for each name in batch
      subsequence[:, :i+1] = prefix
      emb = self.embedding(subsequence)
      sequence_embeddings[:, i, :] = emb.view(batch_size, -1)
    x = self.linear(sequence_embeddings)
    x = nn.Linear(128, 256)(x)
    x = nn.Linear(256, 128)(x)
    x = self.relu(x)
    x = self.out(x)
    return x


In [None]:
def train(model, optimizer, num_steps=10000, loss_report_interval=1000):
  losses = []
  for i in range(1, num_steps):
    inputs, labels = get_batch()
    optimizer.zero_grad()
    logits = model(inputs)
    loss = F.cross_entropy(logits.view(-1, logits.shape[-1]), labels.view(-1), ignore_index=-1) # Ignore padding -1 from target
    losses.append(loss.item())

    if i % loss_report_interval == 0:
      print(f"Average loss at step {i+1}: {sum(losses[-loss_report_interval:]) / loss_report_interval:.4f}")

    loss.backward()
    optimizer.step()

In [None]:
embedding_dim = 3
max_sequence_length = X.shape[1]
model = SequenceMLP(vocab_size, max_sequence_length, embedding_dim, hidden_dim=128)
optimizer = optim.SGD(model.parameters(), lr=0.01)
train(model, optimizer)

Average loss at step 1001: 3.0485
Average loss at step 2001: 2.8698
Average loss at step 3001: 2.8460
Average loss at step 4001: 2.8370
Average loss at step 5001: 2.8340
Average loss at step 6001: 2.8276
Average loss at step 7001: 2.8216
Average loss at step 8001: 2.8150
Average loss at step 9001: 2.8058


In [None]:
def generate_samples(model, num_samples=1, max_len=max_name_length):
  sequences = torch.zeros((num_samples, 1)).int()
  for _ in range(max_len):
    logits = model(sequences)
    logits = logits[:, -1, :]
    probs = F.softmax(logits, dim=-1)
    idx_next = torch.multinomial(probs, num_samples=1)
    sequences = torch.cat((sequences, idx_next), dim=1)

  for sequence in sequences:
    indices = torch.where(sequence == 0)[0]
    end = indices[1] if len(indices) > 1 else max_len
    sequence = sequence[1:end]
    print(decode(sequence))

generate_samples(model, num_samples=10)

tkes
a

aainj
tlkary

llwynyy
kdpivora
niorslisawwe
ahebk


# Attention Namer

In [None]:
class AttentionMLP(nn.Module):
  def __init__(self, n_embd, vocab_size, block_size, n_hidden=64):
    super().__init__()
    self.tok_embd = nn.Embedding(vocab_size, n_embd)
    self.attn_weights = None

    self.query_proj = nn.Linear(n_embd, n_embd)
    self.key_proj = nn.Linear(n_embd, n_embd)
    self.value_proj = nn.Linear(n_embd, n_embd)

    self.register_buffer("mask", torch.tril(torch.ones((block_size, block_size)), diagonal=0)) # Not parameter

    self.mlp = nn.Sequential(
        nn.Linear(n_embd, n_hidden),
        nn.ReLU(),
        nn.Linear(n_hidden, n_embd)
    )

    self.output_proj = nn.Linear(n_embd, vocab_size)


  def forward(self, x):
    x = self.tok_embd(x)
    batch_size, seq_len, embd_dim = x.shape

    q = self.query_proj(x)
    k = self.key_proj(x)
    v = self.value_proj(x)

    attn_weights = q @ k.transpose(1, 2)
    attn_weights = attn_weights.masked_fill(self.mask[:seq_len, :seq_len] == 0, value=float('-inf'))
    attn_weights = attn_weights / torch.sqrt(torch.tensor(k.shape[-1]).float())
    self.attn_weights = F.softmax(attn_weights, dim=-1)
    x = self.attn_weights @ v
    x = self.mlp(x)

    x = self.output_proj(x)
    return x

In [None]:
model = AttentionMLP(32, vocab_size, max_name_length)
optimizer = optim.SGD(model.parameters(), lr=0.01)
train(model, optimizer, num_steps=10_001, loss_report_interval=1_000)

Average loss at step 1001: 2.9397
Average loss at step 2001: 2.6714
Average loss at step 3001: 2.6117
Average loss at step 4001: 2.5923
Average loss at step 5001: 2.5735
Average loss at step 6001: 2.5541
Average loss at step 7001: 2.5230
Average loss at step 8001: 2.4802
Average loss at step 9001: 2.4489
Average loss at step 10001: 2.4314


In [None]:
generate_samples(model, 10)

disauni
pdihyu
dacad
rsakuu
kaver
zanr
parria
ziie
umliinh
haerao


# Attention Transformer


In [None]:
class TransformerBlock(nn.Module):
  def __init__(self, n_embd, num_heads=4, n_hidden=64):
    super().__init__()
    assert n_embd % num_heads == 0, "Embedding dimension must be divisible by the number of heads"

    self.num_heads = num_heads
    self.head_dim = n_embd // num_heads

    self.query_proj = nn.Linear(n_embd, n_embd)
    self.key_proj = nn.Linear(n_embd, n_embd)
    self.value_proj = nn.Linear(n_embd, n_embd)

    self.mlp = nn.Sequential(
        nn.Linear(n_embd, n_hidden),
        nn.ReLU(),
        nn.Linear(n_hidden, n_embd)
    )

    # Layernorms
    self.norm_1 = nn.LayerNorm(n_embd)
    self.norm_2 = nn.LayerNorm(n_embd)

  def forward(self, x):
    batch_size, sequence_length, _ = x.shape

    q = self.query_proj(x)
    k = self.key_proj(x)
    v = self.value_proj(x)

    # multihead attention
    q = q.view(batch_size, sequence_length, self.num_heads, self.head_dim).transpose(1, 2)
    k = k.view(batch_size, sequence_length, self.num_heads, self.head_dim).transpose(1, 2)
    v = v.view(batch_size, sequence_length, self.num_heads, self.head_dim).transpose(1, 2)

    # attention
    attended_v = F.scaled_dot_product_attention(q, k, v, is_causal=True)  # personally feel like calling the final output weights is wrong

    # multiple head concatenation
    attended_v = attended_v.transpose(1, 2).contiguous().view(batch_size, sequence_length, -1)

    # norm and residual connections
    x = self.norm_1(x + attended_v)
    x = self.norm_2(x + self.mlp(x))
    return x

In [None]:
class Transformer(nn.Module):
  def __init__(self, n_embd, vocab_size, block_size, num_blocks=6):
    super().__init__()
    self.char_embedding = nn.Embedding(vocab_size, n_embd)
    self.positional_embedding = nn.Embedding(block_size, n_embd)

    self.transformer_blocks = nn.Sequential(
        *[TransformerBlock(n_embd) for _ in range(num_blocks)]
    )

    self.output_proj = nn.Linear(n_embd, vocab_size)

  def forward(self, x):
    _, seq_len = x.shape

    pos_embd = self.positional_embedding(torch.arange(seq_len))
    char_embd = self.char_embedding(x)
    x = char_embd + pos_embd
    x = self.transformer_blocks(x)
    x = self.output_proj(x)

    return x

In [None]:
n_embd = 64
model = Transformer(n_embd, vocab_size, block_size=max_name_length)
optimizer = optim.SGD(model.parameters(), lr=0.1)
train(model, optimizer, num_steps=10_001, loss_report_interval=1_000)

Average loss at step 1001: 2.2994
Average loss at step 2001: 2.1621
Average loss at step 3001: 2.1104
Average loss at step 4001: 2.0712
Average loss at step 5001: 2.0407
Average loss at step 6001: 2.0196
Average loss at step 7001: 1.9930
Average loss at step 8001: 1.9708
Average loss at step 9001: 1.9527
Average loss at step 10001: 1.9338


In [None]:
generate_samples(model,num_samples=10)

elizara
ahmari
samaykei
ahnabett
hunton
seraya
machi
taulins
hode
wilmon
