In [3]:
import torch

alphabet = {chr(i): i - 97 for i in range(97, 123)}
alphabet["S"] = 26; alphabet["E"] = 27
rev_alphabet = {v: k for k, v in alphabet.items()}

def get_bigram_counts_with_delay(delay: int, names: list[torch.Tensor]):
    counts = torch.zeros(len(alphabet), len(alphabet), dtype=torch.float)
    for name in names:
        for bigram in zip(name, name[delay:]):
            counts[bigram[0], bigram[1]] += 1
    counts = counts / counts.sum(dim=1, keepdim=True)
    return counts

def build_dataset(max_delay: int):
    names = ["S" * max_delay + name.strip().lower() + "E" for name in open("data/names.txt").readlines()]
    names = list(filter(lambda x: "-" not in x and " " not in x, names))

    # Build dataset
    names = ["S" + name.strip().lower() + "E" for name in open("data/names.txt", "r").readlines()]
    names = list(filter(lambda x: "-" not in x, names))

    # Convert names to tensors
    names = [torch.tensor([alphabet[char] for char in name]) for name in names]
    
    # Create the counts matrix for each delay
    counts_per_delay = [get_bigram_counts_with_delay(delay, names) for delay in range(1, max_delay + 1)]

    return names, counts_per_delay

def display_name(name: list[int]):
    return "".join([rev_alphabet[i] for i in name]).replace("S", "").replace("E", "")

bigram_names, bigram_counts = build_dataset(1)
delay_names, delay_counts = build_dataset(3)

In [4]:
# A series of models that give a probability distribution over the next character
from functools import reduce
from torch import nn

class BigramModel(nn.Module):
    def __init__(self, counts):
        super().__init__()
        self.counts = counts

    def forward(self, name: list[int]):
        probs = self.counts[0][name[-1], :]
        return nn.functional.normalize(probs, p=1.0, dim=0)

class DelayModel(nn.Module):
    def __init__(self, counts):
        super().__init__()
        self.counts = counts
    
    def forward(self, name: list[int]):
        rows = [
            self.counts[i][name[-(i + 1)], :]
            for i in range(len(self.counts))
        ]

        probs = reduce(lambda a, b: a * b, rows)
        return nn.functional.normalize(probs, p=1.0, dim=0)

class NNDelayModel(nn.Module):
    def __init__(self, counts):
        super().__init__()
        self.counts = counts
    
    def forward(self, name: list[int]):
        rows = [
            self.counts[i][name[-(i + 1)], :]
            for i in range(len(self.counts))
        ]

        probs = reduce(lambda a, b: a * b, rows)
        return nn.functional.normalize(probs, p=1.0, dim=0)


In [30]:
# Given a model, do some testing of it
def generate_name(model: torch.nn.Module, initial: list[int]):
    name = initial
    i = 0
    while name[-1] != alphabet["E"] and i < 25:
        probs = model(name)
        next_letter_idx = torch.multinomial(probs, 1).item()
        name.append(next_letter_idx)
        i += 1
    return name[1:-1]

def name_nll(model: torch.nn.Module, name: list[int], s_buffer: list[int]):
    nll = 0
    name += [alphabet["E"]]
    full_name = s_buffer + name
    s_buffer_len = len(full_name) - len(name)
    for i in range(len(name)):
        probs = model(full_name[:s_buffer_len + i])
        nll += -torch.log(0.00001 + probs[name[i]])
        # nll += torch.nn.CrossEntropyLoss()(probs.unsqueeze(0), torch.tensor([name[i]]))
    return nll.item()

In [6]:
bigram_model = BigramModel(bigram_counts)
for i in range(5):
    name = generate_name(bigram_model, [alphabet["S"]])
    print(display_name(name))
    print(name_nll(bigram_model, name, [alphabet["S"]]))

nck
10.174710273742676
ct
9.338656425476074
cene
10.919326782226562
jerunthie
22.907596588134766
hymax
18.223127365112305


In [7]:
delay_model = DelayModel(delay_counts)
for i in range(5):
    name = generate_name(delay_model, 3*[alphabet["S"]])
    print(display_name(name))
    print(name_nll(delay_model, name, 3*[alphabet["S"]]))

rrin
29.41181755065918
rel
27.647615432739258
ram
29.513378143310547
antr
33.9973258972168
eee
31.983089447021484


In [31]:
# RNN
def one_hot_encode_letter(letter: int):
    one_hot = torch.zeros(len(alphabet))
    one_hot[letter] = 1
    return one_hot
    
class RNN(nn.Module):
    def __init__(self, param_size: int):
        super().__init__()
        self.U = nn.Parameter(torch.randn(param_size, param_size))
        self.W = nn.Parameter(torch.randn(param_size, param_size))
        self.V = nn.Parameter(torch.randn(param_size, param_size))
        self.h = torch.randn(param_size)
        self.bias_h = nn.Parameter(torch.randn(param_size))
        self.bias_o = nn.Parameter(torch.randn(param_size))
        self.softmax = nn.Softmax(dim=0)

    # x should be a one-hot vector
    def forward(self, x):
        x = one_hot_encode_letter(x)
        self.h = torch.tanh(self.U @ x + self.W @ self.h + self.bias_h)
        return self.softmax(self.bias_o + self.V @ self.h)

    def detach_h(self):
        self.h = self.h.detach()

rnn_model = RNN(len(alphabet))
for i in range(5):
    name = generate_name(rnn_model, [alphabet["S"]])
    print(display_name(name))
    print(name_nll(rnn_model, name, [alphabet["S"]]))

nckcqkrajcbe
87.90904235839844
hrcrzrwrqqccbucahmsqjiqq
158.36312866210938
qqcrhgvkjzxpccqqc
75.4369888305664
ceppczer
76.15601348876953
wgcqcqvssgqcqcbjcccccqqq
112.4327621459961


In [35]:
# As expected, the untrained RNN outputs trash. Let's train.

opt = torch.optim.SGD(rnn_model.parameters(), lr=0.01)
for name in bigram_names:
    rnn_model.zero_grad()
    rnn_model.detach_h()
    loss = 0
    for bigram in zip(name, name[1:]):
        probs = rnn_model.forward(bigram[0])
        loss += -torch.log(probs[bigram[1]])
    loss.backward()
    opt.step()

for i in range(5):
    name = generate_name(rnn_model, [alphabet["S"]])
    print(display_name(name))
    print(name_nll(bigram_model, name, [alphabet["S"]]))

yuzruuezfaylcrlxd
109.2209701538086
lzkkaclrelzuleruwfyllmpe
119.24027252197266

11.512925148010254
zaz
12.60515022277832
ze
7.403119087219238


In [36]:
# Now begins ATTENTION MECHANISM

class MultilayerPerceptron(nn.Module):
    def __init__(self, inputs: int, outputs: int, hidden_shapes: list[int]):
        super().__init__()
        if len(hidden_shapes == 0):
            self.layers = nn.Sequential(
                nn.Linear(inputs, outputs)
            )
        else:
            self.layers = nn.Sequential(
                nn.Linear(inputs, hidden_shapes[0]),
                nn.Tanh(),
            )
            for i in range(len(hidden_shapes)):
                if i == len(hidden_shapes) - 1:
                    self.layers.add_module(f"Linear_{i}", nn.Linear(hidden_shapes[i], outputs))
                else:
                    self.layers.add_module(f"Linear_{i}", nn.Linear(hidden_shapes[i], hidden_shapes[i + 1]))
                    self.layers.add_module(f"Tanh_{i}", nn.Tanh())
    
    def forward(self, x):
        return self.layers(x)


class AttentionMechanism(nn.Module):
    def __init__(self):
        super().__init__()
        self.g = MultilayerPerceptron(len(alphabet), len(alphabet), [len(alphabet)] * 2)
        self.a = MultilayerPerceptron(len(alphabet), len(alphabet), [len(alphabet)] * 2)
        self.s_prev = torch.ones(len(alphabet))
        self.c_prev = torch.ones(len(alphabet)) # Is this how to init?

        self.forward_rnn = RNN(len(alphabet))
        self.backward_rnn = RNN(len(alphabet))

    def forward(self, name: list[int]):
        self.y_prev = one_hot_encode_letter(name[-1])

        h = ....

        e_i = self.a(self.s_prev, h)
        alpha_i = torch.softmax(e_i, dim=1)
        c_i = alpha_i @ h
        s_i = self.f(self.s_prev, self.y_prev, c_i)

        probs = self.g(self.y_prev, s_i, c_i)

        self.s_prev = s_i
        self.c_prev = c_i

        return nn.functional.normalize(probs, p=1.0, dim=0)


class Attention(nn.Module):
    def __init__(self, hidden_size: int, key_size: int, value_size: int):
        super().__init__()
        self.key_size = key_size
        self.value_size = value_size
        self.hidden_size = hidden_size

        self.key = nn.Linear(hidden_size, key_size)
        self.value = nn.Linear(hidden_size, value_size)
        self.query = nn.Linear(hidden_size, key_size)

    def forward(self, hidden: torch.Tensor, encoder_outputs: torch.Tensor):
        # hidden: [batch_size, hidden_size]
        # encoder_outputs: [batch_size, seq_len, hidden_size]

        # Calculate the keys and values
        keys = self.key(encoder_outputs)
        values = self.value(encoder_outputs)

        # Calculate the query
        query = self.query(hidden)

        # Calculate the attention weights
        weights = torch.bmm(query.unsqueeze(1), keys.transpose(1, 2))
        weights = nn.functional.softmax(weights, dim=2)

        # Calculate the attention output
        output = torch.bmm(weights, values)

        return output