In [1]:
device="mps"


In [2]:
from typing import List
import torch
import torch.nn as nn
import tqdm.notebook as tqdm

In [3]:
with open("data/tiny-shakespeare.txt") as f:
    corpus = f.read()

In [4]:
split = 0.8
train_text = corpus[:int(len(corpus)*split)]
test_text = corpus[int(len(corpus)*split):]

print(f"Training Size: {len(train_text):,} chars")
print(f"Test Size: {len(test_text):,} chars")

Training Size: 892,314 chars
Test Size: 223,079 chars


In [5]:
def encode(list: List[str]) -> torch.Tensor:
    result = []
    for string in list:
        result.append([ord(c) for c in string])
    return torch.tensor(result)

def decode(arr: torch.Tensor) -> str:
    arr = arr.tolist()
    result = []
    for string in arr:
        result.append("".join([chr(c) for c in string]))
    return result

def get_batch(batch_size, seq_length, use_train=True):
    if use_train:
        data = train_text
    else:
        data = test_text
    
    start_idx = torch.randint(0, len(data) - seq_length, (batch_size,))

    batch = []
    for i in start_idx:
        batch.append(encode([data[i:i+seq_length]]))
        
    return torch.stack(batch).view(batch_size, seq_length)
    

print(encode(["hello", "world"]))
print(decode(encode(["hello", "world"])))
print(get_batch(2, 5, use_train=False))

tensor([[104, 101, 108, 108, 111],
        [119, 111, 114, 108, 100]])
['hello', 'world']
tensor([[114,  44,  10,  76, 111],
        [ 39, 100,  59,  32,  97]])


In [26]:
class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers):
        super(Model, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # RNN
        self.model = nn.RNN(embed_dim, hidden_dim, num_layers, batch_first=True)
        
        # Fully connected layer to predict each character
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden=None):
        # Embedding
        x = self.embedding(x)
        
        # Initialize hidden state if not provided
        if hidden is None:
            hidden = torch.zeros(self.num_layers, x.size(0), self.hidden_dim, device=x.device)
        
        # GRU output along with new hidden state
        out, hidden = self.model(x, hidden)
        
        # Reshape output for the fully connected layer
        out = out.reshape(-1, self.hidden_dim)
        out = self.fc(out)
        return out, hidden

# Create an instance of the updated model
vocab_size = 256  # number of unique characters
embed_dim = 64   # embedding dimension
hidden_dim = 128  # LSTM hidden dimensions
num_layers = 6  # number of GRU layers

model = Model(vocab_size, embed_dim, hidden_dim, num_layers).to(device)
num_train_steps = 0
print(f"Model created with {sum(p.numel() for p in model.parameters()):,} parameters")

Model created with 239,360 parameters


In [30]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.9)


In [31]:
# Validation
@torch.no_grad()
def evaluate():
    model.eval()
    batch = get_batch(128, 256, use_train=False).to(device)
    x = batch[:,:-1]
    y = batch[:,1:]

    output, _ = model(x)
    loss = criterion(output, y.flatten())
    return loss.item()

evaluate()

5.42617654800415

In [32]:
pbar = tqdm.tnrange(100)
for step in pbar:
    model.train()
    # Encode the sequence
    batch = get_batch(64, 256).to(device)
    x = batch[:,:-1]
    y = batch[:,1:]
        
    # Forward pass
    output, _ = model(x)
    loss = criterion(output, y.flatten())
        
    # Backward pass
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # scheduler.step()

    if step % 100 == 0:
        val_loss = evaluate()
        
    pbar.set_postfix_str(f"Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}")
    pbar.set_description_str(f"num_train_steps: {num_train_steps:,}")

  0%|          | 0/100 [00:00<?, ?it/s]

In [33]:
import torch.nn.functional as F

def temperature_sampling(logits, temperature=0.8):
    # Scale logits by temperature
    scaled_logits = logits / temperature
    # Convert logits to probabilities
    probs = F.softmax(scaled_logits, dim=-1)
    # Sample from the probabilities
    return torch.multinomial(probs, num_samples=1)

# Validation Loop with Temperature Sampling
model.eval()
initial_input = encode(['First Citizen:']).to(device)
generated_text = []
hidden = None  # Hidden state initialization

with torch.no_grad():
    for _ in range(512):  # Generate 512 characters
        output, hidden = model(initial_input, hidden)  # Ensure model accepts and returns hidden state
        predicted = temperature_sampling(output[-1], temperature=1)
        generated_text.append(predicted)
        initial_input = predicted.unsqueeze(0)

print('First Citizen:'+decode(torch.stack(generated_text).view(1, -1))[0])


First Citizen:helyefEednr
v,Se rwAh 
md
ts
w  pbrtrhn Ao Iyamr
f 
n: nvth w sei thsi hNtdI?te
t a  pe,rsutlmr dh yuhpvs sdVueauifanw Ee
baCdso yeou n,.Dios,Cs wac hsIlmnwmscrnet  oar.avdou rmdOtfaurwtHs rsO,emyt  .ihsfia ed:LuN3hnw 
i ;trRrrf
t, r saan h'anlg tasmy luala,Inrterv
,rtw eonT a  yhy e erwoiwEhe,  h w  naa
cweetgrautsf:u nMe
tb!Waehsu ,or?c srsea ÒwiaueheC,cee eulWiwfc:ssd
aloHvoyybee.fw seah  ne:ycba vodahgCtce l E:l eFd,a ehBfyta Iovmuf;
uyag
ey TtgRY
dsrua elwdi, assTTalmwo .Qtmhrrsx,ago t he.HrSt neaoTeOU
