In [50]:
f = open("./TinyShakespeare/input.txt")
text = f.read()
print(text[:100])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [51]:
chars = sorted(list(set(text)))
vocab_size = len(chars)
vocab_size

65

In [52]:
char_to_idx = {ch:i for i, ch in enumerate(chars)}
idx_to_char = {i:ch for i, ch in enumerate(chars)}

def encode(text):
    return [char_to_idx[ch] for ch in text]

def decode(idx):
    return [idx_to_char[i] for i in idx]

In [54]:
import torch
data = torch.tensor(encode(text), dtype=torch.long)
data.shape

torch.Size([1115394])

In [55]:
n = int(0.9 * len(data))
train_data = data[:n]
test_data = data[n:]

In [61]:
sequence_length = 8
x = train_data[:sequence_length]
y = train_data[1:sequence_length + 1]
for t in range(sequence_length):
    context = x[:t + 1]
    target = y[t]
    print(context, target)
x.shape

tensor([18]) tensor(47)
tensor([18, 47]) tensor(56)
tensor([18, 47, 56]) tensor(57)
tensor([18, 47, 56, 57]) tensor(58)
tensor([18, 47, 56, 57, 58]) tensor(1)
tensor([18, 47, 56, 57, 58,  1]) tensor(15)
tensor([18, 47, 56, 57, 58,  1, 15]) tensor(47)
tensor([18, 47, 56, 57, 58,  1, 15, 47]) tensor(58)


torch.Size([8])

In [81]:
import torch
from torch.utils.data import Dataset
import random

class ContextTargetDataset(Dataset):
    def __init__(self, data, window_size):
        self.data = data
        self.window_size = window_size

    def __len__(self):
        return len(self.data) - self.window_size

    def __getitem__(self, idx):
        start_idx = random.randint(0, len(self.data) - self.window_size)
        x = self.data[start_idx : start_idx + self.window_size]
        y = self.data[start_idx + 1 : start_idx + self.window_size + 1]
        
        return x, y

    def collate_fn(batch):
        xs, ys = zip(*batch)
        return torch.stack(xs), torch.stack(ys)



In [103]:
from torch.utils.data import DataLoader
train_dataset = ContextTargetDataset(train_data, window_size=8)
train_loader = DataLoader(train_dataset, batch_size=2)

test_dataset = ContextTargetDataset(test_data, window_size=8)
test_loader = DataLoader(test_dataset, batch_size=2)
for batch_x, batch_y in test_loader:
    print("X: ", batch_x)
    print("Y: ", batch_y)
    break

X:  tensor([[60, 43,  1, 40, 43, 43, 52,  1],
        [41, 53, 59, 56, 58,  1, 51, 63]])
Y:  tensor([[43,  1, 40, 43, 43, 52,  1, 57],
        [53, 59, 56, 58,  1, 51, 63,  1]])


# RNN

In [23]:
from collections import defaultdict

def sequences_to_dicts(sequences):
    flatten = lambda l: [item for sublist in l for item in sublist]
    
    all_words = flatten(sequences)
    
    word_count = defaultdict(int)
    for word in flatten(sequences):
        word_count[word] += 1

    word_count = sorted(list(word_count.items()), key=lambda l: -l[1])

    unique_words = [item[0] for item in word_count]
    
    unique_words.append('UNK')

    num_sentences, vocab_size = len(sequences), len(unique_words)

    word_to_idx = defaultdict(lambda: vocab_size-1)
    idx_to_word = defaultdict(lambda: 'UNK')

    for idx, word in enumerate(unique_words):
        word_to_idx[word] = idx
        idx_to_word[idx] = word

    return word_to_idx, idx_to_word, num_sentences, vocab_size


word_to_idx, idx_to_word, num_sequences, vocab_size = sequences_to_dicts(sequences)


In [24]:
from torch.utils import data

class Dataset(data.Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, index):
        X = self.inputs[index]
        y = self.targets[index]

        return X, y

def create_datasets(sequences, dataset_class, p_train=0.8):
    num_train = int(len(sequences)*p_train)
    num_test = int(len(sequences)*(1 - p_train))

    sequences_train = sequences[:num_train]
    sequences_test = sequences[-num_test:]

    def get_inputs_targets_from_sequences(sequences):
        inputs, targets = [], []
        
        for sequence in sequences[2:]:
            inputs.append(sequence[:-1])
            targets.append(sequence[1:])
            
        return inputs, targets

    inputs_train, targets_train = get_inputs_targets_from_sequences(sequences_train)
    inputs_test, targets_test = get_inputs_targets_from_sequences(sequences_test)

    training_set = dataset_class(inputs_train, targets_train)
    test_set = dataset_class(inputs_test, targets_test)

    return training_set, test_set
    

training_set, test_set = create_datasets(sequences, Dataset)

In [25]:
def one_hot_encode(idx, vocab_size):
    one_hot = np.zeros(vocab_size)
    one_hot[idx] = 1.0

    return one_hot


def one_hot_encode_sequence(sequence, vocab_size):
    encoding = np.array([one_hot_encode(word_to_idx[word], vocab_size) for word in sequence])

    encoding = encoding.reshape(encoding.shape[0], encoding.shape[1], 1)
    
    return encoding

one_hot_encode_sequence(['a', 'b'], vocab_size)

array([[[1.],
        [0.],
        [0.],
        [0.]],

       [[0.],
        [1.],
        [0.],
        [0.]]])

In [26]:
import torch
input_size = vocab_size
hidden_size = 64
output_size = vocab_size

def initialize_params():
    W_xh = torch.nn.Parameter(torch.randn(hidden_size, input_size) * 0.01)
    W_hh = torch.nn.Parameter(torch.randn(hidden_size, hidden_size) * 0.01)
    b_h  = torch.nn.Parameter(torch.zeros(hidden_size))
    
    W_hy = torch.nn.Parameter(torch.randn(output_size, hidden_size) * 0.01)
    b_y  = torch.nn.Parameter(torch.zeros(output_size))
    
    return W_xh, W_hh, b_h, W_hy, b_y


In [31]:
def rnn_forward(inputs, W_xh, W_hh, W_hy, b_h, b_y, hidden_size):
    h_t = torch.zeros(hidden_size, dtype=torch.float32)
    
    outputs = []

    for x in inputs:
        # x = x.squeeze(-1)
        h_t = torch.tanh(x @ W_xh + h_t @ W_hh + b_h).to(torch.float32)
        y = W_hy @ h_t + b_y
        outputs.append(y)

    return outputs, h_t

In [32]:
import torch.nn as nn
import torch.optim as optim
from torch import tensor

num_epochs = 500

W_xh, W_hh, b_h, W_hy, b_y = initialize_params()

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD([W_xh, W_hh, b_h, W_hy, b_y], lr=0.1)

for epoch in range(num_epochs):
    total_loss = 0
    for input_tokens, target_tokens in training_set:
        inputs = tensor(one_hot_encode_sequence(input_tokens, vocab_size), dtype=torch.float32)
        targets = tensor([word_to_idx[tok] for tok in target_tokens])

        outputs, _ = rnn_forward(inputs, W_xh, W_hh, W_hy, b_h, b_y, hidden_size)
        logits = torch.stack(outputs)

        loss = criterion(logits, targets)
       
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Loss = {total_loss:.4f}")

RuntimeError: mat1 and mat2 shapes cannot be multiplied (4x1 and 64x4)