<a href="https://colab.research.google.com/github/shazzad-hasan/practice-deep-learning-with-pytorch/blob/main/seq_to_seq/char_level_lstm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# upload kaggle API key from your local machine
from google.colab import files
files.upload()

In [None]:
# make a kaggle dir, copy the API key to it
# and make sure the file in only readable by yourself (chmod 600)
!mkdir ~/.kaggle 
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# use API command to download the dataset
!kaggle datasets download -d wanderdust/anna-karenina-book

In [None]:
# uncompress the dataset
!unzip -qq anna-karenina-book.zip

In [None]:
# import required libraries
import torch
import numpy as np

In [None]:
# check if cuda is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
  print("CUDA is not available")
else:
  print("CUDA is available")

device = torch.device('cuda') if train_on_gpu else torch.device('cpu')

In [None]:
# open text file and read dataset
with open("/content/anna.txt", "r") as f:
  text = f.read()

text[:100]

### Pre-process the dataset

In [None]:
# tokenization

chars = tuple(set(text))
# map each int to char
int_to_char = dict(enumerate(chars))
# map each char to int
char_to_int = {ch:idx for idx, ch in int_to_char.items()}

# encode 
encoded = np.array([char_to_int[ch] for ch in text])

encoded[:100]

In [None]:
def one_hot_encode(arr, n_labels):
  # initialize the encoded array with zeros
  one_hot = np.zeros((arr.size, n_labels), dtype=np.float32)
  # fill with ones where appropriate
  one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
  # reshape to get back to the original array
  one_hot = one_hot.reshape((*arr.shape, n_labels))
  
  return one_hot

In [None]:
def get_batches(arr, batch_size, seq_length):
  """returns batches of size batch_size * seq_length"""
  total_batch_size = batch_size * seq_length
  # total number of batches
  n_batches = len(arr)//total_batch_size
  
  # keep enough characters to make full batches
  arr = arr[:n_batches * total_batch_size]
  # reshape into batch_size rows
  arr = arr.reshape((batch_size, -1))
  
  for n in range(0, arr.shape[1], seq_length):
      # features
      x = arr[:, n:n+seq_length]
      # targets, shifted by one
      y = np.zeros_like(x)
      try:
          y[:, :-1], y[:, -1] = x[:, 1:], arr[:, n+seq_length]
      except IndexError:
          y[:, :-1], y[:, -1] = x[:, 1:], arr[:, 0]
      yield x, y

In [None]:
# split the dataset
valid_size = 0.2

valid_idx = int(len(encoded)*(1-valid_size))
train_data, valid_data = encoded[:valid_idx], encoded[valid_idx:]

### Model

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class RNN(nn.Module):
  def __init__(self, tokens, n_hidden, n_layers, drop_prob=0.5):
    super().__init__()
    self.n_layers = n_layers 
    self.n_hidden = n_hidden 
    self.drop_prob = drop_prob

    # create character dictionaries
    self.chars = tokens 
    self.int_to_char = dict(enumerate(self.chars))
    self.char_to_int = {ch:idx for idx, ch in self.int_to_char.items()}

    self.lstm = nn.LSTM(len(self.chars), n_hidden, n_layers, dropout=drop_prob, batch_first=True)
    self.dropout = nn.Dropout(drop_prob)
    self.fc = nn.Linear(n_hidden, len(self.chars))


  def forward(self, x, hidden):
    out, hidden = self.lstm(x, hidden)
    out = self.dropout(out)
    out = out.contiguous().view(-1, self.n_hidden)
    out = self.fc(out)
    return out, hidden

  def initialize_hidden(self, batch_size):
    weight = next(self.parameters()).data

    # initialize hidden state and cell state of LSTM with zeros (n_layers * batch_size * n_hidden)
    hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device),
             weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(device))
    
    return hidden

In [None]:
n_hidden = 512 
n_layers = 2 
drop_prob=0.5
lr=0.001

model = RNN(chars, n_hidden, n_layers, drop_prob)
print(model)
model.to(device)

### Train

In [None]:
def train(model, data, epochs, batch_size, seq_length, criterion, optimizer, clip, print_every=10):

  model.train()

  counter = 0
  n_chars = len(model.chars)
  for epoch in range(epochs):

    # initialize the hidden state
    h = model.initialize_hidden(batch_size)

    for inputs, targets in get_batches(data, batch_size, seq_length):
      counter += 1 
      # one-hot encode the data
      inputs = one_hot_encode(inputs, n_chars)
      # make torch tensor
      inputs, targets = torch.from_numpy(inputs), torch.from_numpy(targets)
      # move the tensors to the right device
      inputs, targets = inputs.to(device), targets.to(device)

      # create new variable for the hidden state to avoid backprop through the 
      # entire training history
      h = tuple([each.data for each in h])

      # clear the gradients of all optimized variables
      model.zero_grad()
      # forward pass
      output, h = model(inputs, h)
      # calculate the loss
      loss = criterion(output, targets.view(batch_size * seq_length).long())
      # backprob
      loss.backward()
      # prevent exploding gradients problem in rnn/lstm
      nn.utils.clip_grad_norm_(model.parameters(), clip)
      # update parameters
      optimizer.step()

      # ------------ validate the model -----------------
      if counter % print_every == 0:
        # initialize the hidden state
        valid_h = model.initialize_hidden(batch_size)

        valid_losses = []

        # set the model to evaluation mode
        model.eval()
        for inputs, targets in get_batches(valid_data, batch_size, seq_length):
          # one-hot encode the inputs
          inputs = one_hot_encode(inputs, n_chars)
          # make torch tensor
          inputs, targets = torch.from_numpy(inputs), torch.from_numpy(targets)
          # create new variable for the hidden state to avoid backprop through the 
          # entire training history 
          valid_h = tuple([each for each in valid_h])
          # move the tensor to the right device
          inputs, targets = inputs.to(device), targets.to(device)
          # forward pass
          output, valid_h = model(inputs, valid_h)
          # calculate the batch loss
          valid_loss = criterion(output, targets.view(batch_size * seq_length).long())

          valid_losses.append(valid_loss.item())

        # reset to train mode
        model.train()

        print("Epochs: {} \tStep: {} \tTraining loss: {:.6f} \tValidation loss: {:.6f}".format(epoch+1, 
                                                                                               counter, 
                                                                                               loss.item(), 
                                                                                               np.mean(valid_losses)))

In [None]:
epochs = 50
batch_size = 128
seq_length = 200
lr=0.001
clip = 5
print_every=10

# define an optimizer
optimizer = optim.Adam(model.parameters(), lr = lr)
# define a loss function
criterion = nn.CrossEntropyLoss()

# train the model
train(model, encoded, epochs, batch_size, seq_length, criterion, optimizer, clip, print_every)

### Test

In [None]:
def predict(model, char, h=None, top_k=None):
  """Given an input character, returns the predicted next character and hidden state"""

  x = np.array([[model.char_to_int[char]]])
  x = one_hot_encode(x, len(model.chars))
  inputs = torch.from_numpy(x)
  inputs = inputs.to(device)

  # detach hidden state from history
  h = tuple([each.data for each in h])
  # output of the model
  out, h = model(inputs, h)

  # character probabilities
  p = F.softmax(out, dim=1).data 

  if train_on_gpu: p = p.cpu()

  # get top charactors
  if top_k is None:
    top_ch = np.arange(len(model.chars))
  else:
    p, top_ch = p.topk(top_k)
    top_ch = top_ch.numpy().squeeze()

  # randomly select the probable next characters
  p = p.numpy().squeeze()
  char = np.random.choice(top_ch, p = p/p.sum())

  return model.int_to_char[char], h

In [None]:
def sample(model, size, prime="The", top_k=None):

  model.to(device)

  model.eval()
  # run through the prime characters
  chars = [ch for ch in prime]
  h = model.initialize_hidden(1)
  for ch in prime:
    char, h = predict(model, ch, h, top_k=top_k)

  chars.append(char)

  for i in range(size):
    char, h = predict(model, chars[-1], h, top_k=top_k)
    chars.append(char)

  return "".join(chars)

In [None]:
print(sample(model, 2000, prime="Anna Levin", top_k=10))

In [None]:
# save the trained model
model_name = "char_rnn.model"

checkpoint = {"n_hidden": model.n_hidden,
              "n_layers": model.n_layers,
              "state_dict": model.state_dict(),
              "tokens": model.chars}

with open(model_name, "wb") as f:
  torch.save(checkpoint, f)

In [None]:
# load saved model
with open("char_rnn.model", "rb") as f:
  checkpoint = torch.load(f)

# sample using trained model
loaded = RNN(checkpoint['tokens'], n_hidden=checkpoint["n_hidden"], n_layers=checkpoint["n_layers"])
loaded.load_state_dict(checkpoint["state_dict"])

In [None]:
print(sample(loaded, size = 2000, prime="Stepan Arkadyevitch", top_k=10))

In [None]:
print(sample(loaded, size = 2000, prime="Alexey Alexandrovitch", top_k=10))