### Build and train a simple Recurrent Neural Network
We are going to use writings of shakespear as our data.

Steps:
- Download the data
- Preprocessing the data
- Create character tokenizer
- Tokenize data
- Split data (train / test)
- Create a batches of data to be fed to the model
- Define parameters for the model
- Initialize the model
- Overfit a single batch
- Train the model
- Generate text

In [1]:
# Imports
import torch
import torch.nn as nn
import torch.nn.functional as F
from dataclasses import dataclass
import matplotlib.pyplot as plt

In [2]:
# device (cpu or gpu) depending on what is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
# Download the data
# !wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2024-04-28 21:07:20--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2024-04-28 21:07:20 (32.1 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [4]:
with open('input.txt', 'r', encoding='utf-8') as file:
    text = file.read()

In [5]:
print(f'Total number of characters: {len(text)}')

Total number of characters: 1115394


In [6]:
# All unique characters in the text
chars = sorted(list(set(text)))
vocab_size = len(chars)
print(f'Number of unique characters: {vocab_size}')
''.join(chars)

Number of unique characters: 65


"\n !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"

In [7]:
# Character level tokenizer
stoi = {char: idx for idx, char in enumerate(chars)}
itos = {idx: char for char, idx in stoi.items()}

encode = lambda s: [stoi[char] for char in s]
decode = lambda i: ''.join([itos[idx] for idx in i])

print(encode('hello there!'))
print(decode(encode('hello there!')))

[46, 43, 50, 50, 53, 1, 58, 46, 43, 56, 43, 2]
hello there!


In [8]:
# Tokenize all text
data = torch.tensor(encode(text), dtype=torch.long)

In [9]:
# Looking at some raw text
print(text[:1000])

First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You are all resolved rather to die than to famish?

All:
Resolved. resolved.

First Citizen:
First, you know Caius Marcius is chief enemy to the people.

All:
We know't, we know't.

First Citizen:
Let us kill him, and we'll have corn at our own price.
Is't a verdict?

All:
No more talking on't; let it be done: away, away!

Second Citizen:
One word, good citizens.

First Citizen:
We are accounted poor citizens, the patricians good.
What authority surfeits on would relieve us: if they
would yield us but the superfluity, while it were
wholesome, we might guess they relieved us humanely;
but they think we are too dear: the leanness that
afflicts us, the object of our misery, is as an
inventory to particularise their abundance; our
sufferance is a gain to them Let us revenge this with
our pikes, ere we become rakes: for the gods know I
speak this in hunger for bread, not in thirst for revenge.



In [9]:
# Looking at tokenized text
data[:100]

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59])

In [10]:
def train_test_split(data, test_pct=.1):
    data_size = len(data)
    num_test_samples = int(data_size * test_pct)
    test_data = data[-num_test_samples:]
    train_data = data[:data_size - num_test_samples]
    return train_data, test_data

In [11]:
train_data, test_data = train_test_split(data)
print(f'size of training set: {len(train_data)}')
print(f'size of test set: {len(test_data)}')

assert len(train_data) + len(test_data) == len(data), "Lost data during train test split"

size of training set: 1003855
size of test set: 111539


In [12]:
# Function to get a batch
def get_random_batch(data, batch_size=4, block_size=8):
    # Get  batch_size of random indices which will be the start of sequence
    # Ensure that indices start before the length of data - block_size to prevent out of bounds error
    idx = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i: i + block_size] for i in idx])
    y = torch.stack([data[i + 1: i + block_size + 1] for i in idx])
    return x, y

# Grab a sample batch
x, y = get_random_batch(train_data,)
print('features:')
print(x)
print('labels')
print(y)
print(x.shape, y.shape)

features:
tensor([[45, 46, 58,  8,  0,  0, 35, 13],
        [25, 53, 56, 43,  1, 58, 46, 39],
        [43, 56,  1, 44, 53, 59, 52, 58],
        [ 1, 57, 43, 52, 58,  1, 58, 53]])
labels
tensor([[46, 58,  8,  0,  0, 35, 13, 30],
        [53, 56, 43,  1, 58, 46, 39, 52],
        [56,  1, 44, 53, 59, 52, 58, 39],
        [57, 43, 52, 58,  1, 58, 53,  1]])
torch.Size([4, 8]) torch.Size([4, 8])


In [13]:
def get_batch_sequentially(data, batch_size=4, block_size=8):
    size_of_data = len(data) - 1 # subtracting one for the label
    for idx in range(0, size_of_data, block_size*batch_size):
        x = torch.stack([data[idx + i: idx + block_size + i] for i in range(batch_size)])
        y = torch.stack([data[idx + i + 1: idx + block_size + i + 1] for i in range(batch_size)])
        yield x, y

In [14]:
# rnn = RNN(vocab_size, emb_dim, n_hidden)
@torch.no_grad()
def evaluate(model, batch_size=4, block_size=8):
    train_loader = get_batch_sequentially(train_data, batch_size=batch_size, block_size=block_size)
    test_loader = get_batch_sequentially(test_data,  batch_size=batch_size, block_size=block_size)

    loss_i = []
    for loader in (train_loader, test_loader):
            for x, y in loader:
                x, y = x.to(device), y.to(device)
                loss, _ = rnn(x, y)
                loss_i.append(loss)
            if loader == train_loader:
                print(f'average training loss: {sum(loss_i)/len(loss_i):.4f}', end=' | ')
            else:
                print(f'average test loss: {sum(loss_i)/len(loss_i):.4f}')
# evaluate(rnn)

In [15]:
@torch.no_grad()
def generate(model, block_size=8, max_output_size=50):
    # Handle prompts shorter than block_size
    # if len(prompt) < block_size:
    #     count_missing = block_size - len(prompt)
    #     prompt = (count_missing * ' ') + prompt
    prompt = 's'
    # print(prompt, end='')
    # trim text longer than block size
    for i in range(max_output_size):
        prompt_tok = encode(prompt)
        prompt_trimmed = prompt_tok[-block_size:]
        # print(f'input seq to model: {"".join(decode([i for i in prompt_trimmed]))}')
        prompt_trimmed_tnsr = torch.tensor(prompt_tok).unsqueeze(0).to(device) # Add batch dimension
        loss, logits = model(prompt_trimmed_tnsr)
        # import pdb; pdb.set_trace()
        probs = F.softmax(logits[:, -1], dim=1)
        idx = torch.multinomial(probs, 1, replacement=True)
        prompt += decode([idx.item()])
        # print(f'{prompt=}')
        # prompt_trimmed.pop()
        # prompt_trimmed.append(idx.item())
    return prompt
# print(generate(rnn))

In [None]:
# generate(rnn, block_size=200, max_output_size=200)

"shoud.\n\nTMREK\nARTG:\nSo you me hacet ment elof thol'd roudls heost may, theat nent mere haigcay winentless for of lisand sthy lomd;\nTharght mais be wim. I be beard caghiendedt?\n\nPENRDDIIO:\nI rlaven deas"

In [21]:
foo = torch.randn((2,2))
foo = torch.stack((foo, foo))
foo.shape

torch.Size([2, 2, 2])

In [22]:
class RNN(nn.Module):
    def __init__(self, vocab_size, emb_dim, n_hidden, cell_type):
        super().__init__()
        self.vocab_size = vocab_size
        self.emb_dim = emb_dim
        self.n_hidden = n_hidden
        self.emb = nn.Embedding(self.vocab_size, self.emb_dim)
        self.rnn = nn.GRU(self.emb_dim, self.n_hidden, 2, batch_first=True, dropout=0.5)
        self.tanh = nn.Tanh()
        self.hidden_state = torch.zeros((1, self.n_hidden), requires_grad=True)
        self.linear = nn.Linear(self.n_hidden, self.vocab_size)

    # Impleting with rnn from pytorch
    def forward(self, x, targets=None):
        x = self.emb(x)
        b, t, e = x.shape

        h_prev = self.hidden_state.expand((b, -1)) # shape: batch x n_hidden
        h_prev = torch.stack((h_prev, h_prev)) # shape: num_layers x batch x n_hidden
        output, hidden = self.rnn(x, h_prev.to(device)) # output shape: b x t x n_hidden; hidden shape: see previous line
        logits = self.linear(output) # shape b x t x h_hidden --> b x t x vocab_size
        loss = None
        # import pdb; pdb.set_trace()
        if targets is not None:
            loss = F.cross_entropy(logits.view(-1, logits.shape[-1]), targets.view(-1))
        return loss, logits

In [None]:
### Overfitting single batch

num_iters = 200
vocab_size = 65
emb_dim = 64
n_hidden = 100
block_size = 200
batch_size = 4
max_output_size = 150

rnn = RNN(vocab_size, emb_dim, n_hidden, cell_type='rnn')
optim = torch.optim.AdamW(rnn.parameters(), lr=0.005)#, betas=(0.9, 0.99), eps=1e-8)

x, y = get_random_batch(train_data, batch_size=batch_size, block_size=block_size)
for i in range(num_iters):
    # x, y = get_batch(train_data, batch_size=batch_size, block_size=block_size)
    loss, _ = rnn(x, y)
    loss.backward()
    if not (i % 10):
        print(f'{loss.item()=:.4f}')

    optim.step()
    optim.zero_grad()

loss.item()=4.2010
loss.item()=2.7721
loss.item()=2.1375
loss.item()=1.6474
loss.item()=1.1980
loss.item()=0.7883
loss.item()=0.4686
loss.item()=0.2564
loss.item()=0.1356
loss.item()=0.0762
loss.item()=0.0484
loss.item()=0.0331
loss.item()=0.0251
loss.item()=0.0204
loss.item()=0.0174
loss.item()=0.0153
loss.item()=0.0134
loss.item()=0.0113
loss.item()=0.0102
loss.item()=0.0092


Overfitting single batch with rnn implementation from scratch gets loss of 0.0027 in 200 iterations.

In [None]:
# Hyperparameters
num_iters = 100
vocab_size = 65
emb_dim = 64
n_hidden = 100
block_size = 200
batch_size = 4
max_output_size = 150

# Initialize the model
rnn = RNN(vocab_size, emb_dim, n_hidden, batch_size)

# Training at a higher learning rate
optim = torch.optim.AdamW(rnn.parameters(), lr=0.005)

# Storing loss
lossi = []
stepi = []

for i in range(num_iters):
    x, y = get_random_batch(train_data, batch_size=batch_size, block_size=block_size)
    loss, _ = rnn(x, y)
    loss.backward()

    if not (i % 10):
        print('--------------------------------')
        print(f'step: {i}', end=' | ')
        evaluate(rnn, batch_size=batch_size, block_size=block_size)
        print("Generating a sample: ")
        print(generate(rnn, block_size=block_size, max_output_size=100))
        print("\n==============================")
    lossi.append(loss)
    stepi.append(i)

    optim.step()
    optim.zero_grad()

--------------------------------
step: 0 | average training loss: 4.1930 | average test loss: 4.1932
Generating a sample: 
sJgGNSW'Q!CmQx!MEcPl$.HjGA-G?CpK &tM.X$$oCaecJz-kmGKyEtkYAnx.p.EtXV.Ym$fE-zmksK;iuHECiZRaBXOSjPYj

zL

--------------------------------
step: 10 | average training loss: 2.9437 | average test loss: 2.9437
Generating a sample: 
se te 
nm tohou
hiiav
wfrih osemn oml'C$pr ite en
Io.o
hta edne

athistp 
nZofchcahn tb rdtobseoO
Iad

--------------------------------
step: 20 | average training loss: 2.6479 | average test loss: 2.6471
Generating a sample: 
s wh?n to honsd:
O
epead k and cor motefsthafedre uyou heensokd bo fge a shl
I-;ecanorelcocoCre iangd

--------------------------------
step: 30 | average training loss: 2.4984 | average test loss: 2.4980
Generating a sample: 
s?, cedt houut kiwld urd rete for herut p nhe ed yeear,
lon
lored
Wane welin; as sovelAn
Woingith,
I 

--------------------------------
step: 40 | average training loss: 2.4062 | average test loss

In [21]:
# Hyperparameters
num_iters = 10000
vocab_size = 65
emb_dim = 64
n_hidden = 100
block_size = 32
batch_size = 4
max_output_size = 150

# Initialize the model
rnn = RNN(vocab_size, emb_dim, n_hidden, batch_size).to(device)
# optim = torch.optim.AdamW(rnn.parameters(), .001)
# Training at a higher learning rate
optim = torch.optim.AdamW(rnn.parameters(), lr=0.005)

# Storing loss
lossi = []
stepi = []
# Storing hidden_state
hidden_states = []

for i in range(num_iters):
    x, y = get_random_batch(train_data, batch_size=batch_size, block_size=block_size)
    x, y = x.to(device), y.to(device)
    loss, _ = rnn(x, y)
    loss.backward()

    if not (i % 1000):
        print(f'step: {i}', end=' | ')
        evaluate(rnn, batch_size=batch_size, block_size=block_size)
        print(f'iter: {i}: train loss: {loss:.4f}')
        print('Generating a sample: ')
        print(generate(rnn, block_size=block_size, max_output_size=max_output_size))
    lossi.append(loss)
    stepi.append(i)
    hidden_states.append(rnn.hidden_state)

    optim.step()
    optim.zero_grad()


step: 0 | average training loss: 4.1671 | average test loss: 4.1667
iter: 0: train loss: 4.1578
Generating a sample: 
sRXHkjp?bcEcaBJE&pzpPSN,MtEth$aWbC-; xkJuGHDT!fCgr'cp DJZb-rftYl;NwZIoayL;V
y'UMqUtJ3 bal$zlzoo:p?B,MM:LAyvm.Ux QShO&Xfpl.IjUxBAip-HfWRoKlFTyJTu!vbG3ha
step: 1000 | average training loss: 1.9404 | average test loss: 1.9491
iter: 1000: train loss: 1.6392
Generating a sample: 
swert it, where to prock', in himg?

ONDY: Ond to is and out bid?
DUKY:
Hard than he eed revourrest porth your this laing in his sporth of his wood:
Hu
step: 2000 | average training loss: 1.8593 | average test loss: 1.8703
iter: 2000: train loss: 1.8941
Generating a sample: 
slof yet.

PRICHARD II:
I may him are gis a why, him like that to the wick nput for as und him give of remosclems I'll for revences thow how dost that 
step: 3000 | average training loss: 1.8281 | average test loss: 1.8419
iter: 3000: train loss: 1.9971
Generating a sample: 
sais
Oft trough by himsels.

Sell.
Comourns their cord

In [22]:
# Hyperparameters
num_iters = 100_000
vocab_size = 65
emb_dim = 64
n_hidden = 100
block_size = 200
batch_size = 8
max_output_size = 150

# device (cpu or gpu)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Initialize the model
rnn = RNN(vocab_size, emb_dim, n_hidden, batch_size).to(device)
# optim = torch.optim.AdamW(rnn.parameters(), .001)
# Training at a higher learning rate
optim = torch.optim.AdamW(rnn.parameters(), lr=0.005)

# Storing loss
lossi = []
stepi = []
# Storing hidden_state
hidden_states = []

for i in range(num_iters):
    x, y = get_random_batch(train_data, batch_size=batch_size, block_size=block_size)
    x, y = x.to(device), y.to(device)
    loss, _ = rnn(x, y)
    loss.backward()

    if not (i % 20000):
        print(f'step: {i}', end=' | ')
        evaluate(rnn, batch_size=batch_size, block_size=block_size)
        # print(f'iter: {i}: train loss: {loss:.4f}')
        print('Generating a sample: ')
        print(generate(rnn, block_size=block_size, max_output_size=max_output_size))
        print('\n===============================================')
    lossi.append(loss)
    stepi.append(i)
    hidden_states.append(rnn.hidden_state)

    optim.step()
    optim.zero_grad()

step: 0 | average training loss: 4.1745 | average test loss: 4.1745
Generating a sample: 
sAlQDK'qi?XWRc:cGydQ:VCzBDS3rpVvWRXKHExaN!JSCGtodG  H-'UcArzaZgnV;np;-ObKm!-Y-zv'WLGZsfwg vUoTjlLSKK'qkmZddgx3NRv!Lhkylzk.!duR& ,I3qfyzRzH?JTF
ijG?ojA,

step: 10000 | average training loss: 1.4540 | average test loss: 1.4756
Generating a sample: 
s.
Come, we to well: much of an ingignation!
Breat worthy gradies! muse or taken elding:
That house, who it be wild holy nor day
His king, being it die

step: 20000 | average training loss: 1.4308 | average test loss: 1.4519
Generating a sample: 
s dishen ales to than?

Volzarcemity,
Eargies; than our gnave God's not would do
Destruy, do put our Marcius: than mean that tragispiedy baning he thy 

step: 30000 | average training loss: 1.4245 | average test loss: 1.4458
Generating a sample: 
s unput thee, she will so yet.

Clown:
Yet my frimits ever dear himself, it is,
Who is mispoints his merely short,
His love, meath, that thy hearing ap

step: 40000 | av

In [23]:
# Hyperparameters
num_iters = 200_000
vocab_size = 65
emb_dim = 64
n_hidden = 100
block_size = 200
batch_size = 8
max_output_size = 150

# device (cpu or gpu)
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Initialize the model
rnn = RNN(vocab_size, emb_dim, n_hidden, batch_size).to(device)
# optim = torch.optim.AdamW(rnn.parameters(), .001)
# Training at a higher learning rate
optim = torch.optim.AdamW(rnn.parameters(), lr=0.005)

# Storing loss
lossi = []
stepi = []
# Storing hidden_state
hidden_states = []

for i in range(num_iters):
    x, y = get_random_batch(train_data, batch_size=batch_size, block_size=block_size)
    x, y = x.to(device), y.to(device)
    loss, _ = rnn(x, y)
    loss.backward()

    if not (i % 20000):
        print(f'step: {i}', end=' | ')
        evaluate(rnn, batch_size=batch_size, block_size=block_size)
        # print(f'iter: {i}: train loss: {loss:.4f}')
        print('Generating a sample: ')
        print(generate(rnn, block_size=block_size, max_output_size=max_output_size))
        print('\n===============================================')
    lossi.append(loss)
    stepi.append(i)
    hidden_states.append(rnn.hidden_state)

    optim.step()
    optim.zero_grad()

step: 0 | average training loss: 4.2053 | average test loss: 4.2052
Generating a sample: 
sTmxroxubvUU,LaduF,P !&-:&;!mjsPD$,lzSDNtV VZRm$QX3w?N!zBToRzJgpz 
!.ThF!S?uH,t!QU
dMfpPsy
gA:v3 obyRFETByVKwkPVQYpGWukdbxP hhvZNRE&jC3A?jYBfRzwm3uF;u'

step: 20000 | average training loss: 1.6093 | average test loss: 1.6280
Generating a sample: 
s,
Mespic nighan. Can bear gentle look'd me:
Trukeme oft yow thint! this heards the king.

Sicurerain:
To you'll look-seasim? I
that yet thus: if it tr

step: 40000 | average training loss: 1.5836 | average test loss: 1.6017
Generating a sample: 
ssement of lay.
You, lieking frant-maven one have but bend them,
bucked to be it me,
I'ly he's stelp I thee had for fiocrong aste hisgety.

CLARENCE:
D

step: 60000 | average training loss: 1.5676 | average test loss: 1.5866
Generating a sample: 
se from my braashed, and I king,
Comeing my farth with a comprabe, becomed's our blood:
His upon substaal strige, and King, 'Twixt both this?
Be kend t

step: 80000 | av

In [25]:
# torch.save(rnn, 'model.pth')

In [27]:
print(generate(rnn, max_output_size=500))

sew must a Banionable.

CARULIO:
Tent, beheed thin water, on Wirie'd?

LARTIUS:
Caminmed, thou may away, to-more on haste,
Ind ored oullronsh'd beal those sir,
Lord I cannot's, for i' out-to should bies.

QUEEN ELIZARET:
Sir, renpest that him 'Go will right bit you recrane,
Andee--Goors disbe lewe as arring.
I sweetn he of her: as my off quemen that
Singer it as vwinous house,
No eagles,
and for the cortioninkly, that a
correased she for you have all the fnesord a bate
With speak al one, my trent


In [None]:
# Load saved model
# loaded_model = torch.load('model.pth', weights_only=False)