In [4]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data

# Load titles as string
titles = open('titles.txt', 'r', encoding='utf-8').read()
titles[0:999]

In [5]:
# Assign characters to integers
chars = sorted(list(set(titles)))
char_to_int = dict((c, i) for i, c in enumerate(chars))

In [7]:
# Summarize data
n_chars = len(titles)
n_vocab = len(chars)
print(f"Total Characters: {n_chars} ")
print(f"Total Vocab: {n_vocab}")

Total Characters: 431047 
Total Vocab: 45


In [None]:
# Prepare dataset of input to output pairs encoded as integers
seq_length = 100
dataX, dataY = [], []
for i in range(0, n_chars - seq_length, 1):
    seq_in = titles[i:i + seq_length]
    seq_out = titles[i + seq_length]
    dataX.append([char_to_int[char] for char in seq_in])
    dataY.append(char_to_int[seq_out])
n_patterns = len(dataX)
print("Total Patterns: ", n_patterns)

In [None]:
# Reshape X to be [samples, time steps, features]
X = torch.tensor(dataX, dtype=torch.float32).reshape(n_patterns, seq_length, 1)
X = X / float(n_vocab) # works better as number between 0 and 1
y = torch.tensor(dataY)
print(X.shape, y.shape)

In [None]:
# Define LSTM model
class CharModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=256, num_layers=2, batch_first=True, dropout=0.2)
        self.dropout = nn.Dropout(0.2)
        self.linear = nn.Linear(256, n_vocab)
    def forward(self, x):
        x, _ = self.lstm(x)
        # take only the last output
        x = x[:, -1, :]
        # produce output
        x = self.linear(self.dropout(x))
        return x

In [None]:
# Define model parameters
n_epochs = 25
batch_size = 128
model = CharModel()

# Define optimization, loss function, and batch
optimizer = optim.Adam(model.parameters())
loss_fn = nn.CrossEntropyLoss(reduction="sum")
loader = data.DataLoader(data.TensorDataset(X, y), shuffle=True, batch_size=batch_size)

# Initialize values for keeping track of best model
best_model = None
best_loss = np.inf

In [None]:
# Create save model/checkpoint function
def save_model(epoch, model, optimizer, loss):
    checkpoint = {
                'epoch': epoch,
                'model_state_dict': CharModel.state_dict(),
                'optimizer_state_dict': optimizer.state_dict()
                'loss':
                }
    filename = f"model_epoch_{epoch}.pt"
    torch.save(checkpoint, filename)

In [None]:
# Create function to load latest model/checkpoint
def load_latest_model(model, optimizer=None):
    saved_models = [f for f in os.listdir() if f.startswith('model') and f.endswith(.pt)]
    if not saved_models:
        return None, None, None
    
    latest_model = max(saved_models, key=lambda x: int(x.split('_')[2].split('.')[0]))
    current_model = torch.load(latest_model)
    
    model.load_state_dict(current_model['model_state_dict'])
    if optimizer:
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        
    return checkpoint['epoch'], model, optimizer

In [None]:
epoch, model, optimizer = load_latest_model(model, optimizer)

if epoch is None:
    epoch = 0
    
# Training loop
for current_epoch in range(n_epochs):
    # Training
    model.train()
    for X_batch, y_batch in loader:
        y_pred = model(X_batch)
        loss = loss_fn(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Validation
    model.eval()
    loss = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            y_pred = model(X_batch)
            loss += loss_fn(y_pred, y_batch)
        if loss < best_loss:
            best_loss = loss
            best_model = model.state_dict()
        print("Epoch: %d: Cross-entropy: %.4f" % (epoch, loss))
        
    # Save checkpoint
    save_checkpoint(current_epoch, model, optimizer, loss)