<a href="https://colab.research.google.com/github/jonasmue/nlp-playground/blob/master/HIMYM_Predictor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports and Initial Config

In [0]:
import os
import torch

import numpy as np

from google.colab import drive
from torch import nn
from torch import optim
from datetime import datetime

In [0]:
dev = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Data Loading and Preprocessing

### Load Data from Drive

In [3]:
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [4]:
!ls drive/My\ Drive

'Colab Notebooks'   data  'Getting started.pdf'   results


In [0]:
data_path = os.path.join("drive", "My Drive", "data", "himym.txt")
with open(data_path, "r") as input_file:
  text = input_file.read()

In [6]:
# Print the first letters of the text
text[:150]

"\n\n\n01x01 - Pilot\n\n\nPilot\nScene One\n[Title: The Year 2030]\nNarrator: Kids, I'm going to tell you an incredible story. The story of how I met your mothe"

### Turn Data into Label Encodings

In [0]:
# Convenience Dictionaries
characters = set(text)
id2char = dict(enumerate(characters))
char2id = {c:i for i,c in enumerate(characters)}

In [0]:
assert char2id[id2char[5]] == 5

In [0]:
num_characters = len(characters)
text_labels = [char2id[c] for c in text]

In [10]:
print("The text consists of {} distinct characters.".format(num_characters))

The text consists of 93 distinct characters.


### Batch Generator

In [0]:
def to_one_hot(text_labels, num_characters):
  eye = torch.eye(num_characters)
  X = torch.zeros((text_labels.shape[0], text_labels.shape[1], num_characters))
  for i, sentence_labels in enumerate(text_labels):
    X[i] = eye[sentence_labels]
  return X

In [0]:
# Outputs tensor of X with shape [batch_size, seq_len, num_chars] and y with shape [batch_size, seq_len]
def get_next_training_batch(labels=text_labels, num_chars=num_characters, seq_len=128, batch_size=32):
  """
  Returns a training batch generator which itself returns batches with
  tuples of the following format

  X of shape [batch_size, seq_len, num_chars] (one-hot-encoded) and
  y of shape [batch_size, seq_len] (label-encoded)

  Arguments:
    labels: label encodings of the text to create batches from
    num_chars: the total number of characters
    seq_len: the length of the character sequence of each batch
    batch_size: the number of character sequences per batch
  """
  for batch_offset in range(0, len(labels), batch_size * (seq_len + 1)):
    if len(labels) < batch_offset + batch_size * (seq_len + 1):
      return
    batch = labels[batch_offset:batch_offset + batch_size * (seq_len + 1)]
    X_text_labels = torch.Tensor([batch[i:i+seq_len] for i in range(0, len(batch), seq_len + 1)]).long()
    X_one_hot = to_one_hot(X_text_labels, num_characters)
    y_text_labels = torch.Tensor([batch[i+1:i+seq_len+1] for i in range(0, len(batch), seq_len + 1)]).long()
    yield X_one_hot.to(dev), y_text_labels.to(dev)

In [0]:
# Test the implementation to see if it generates valid outpus
X_sample, y_sample = next(get_next_training_batch(seq_len=8, batch_size=5))

In [0]:
assert X_sample.shape == torch.Size([5, 8, num_characters])
assert y_sample.shape == torch.Size([5, 8])

In [0]:
assert X_sample[0, 1].argmax().item() == y_sample[0][0]
assert X_sample[1, 2].argmax().item() == y_sample[1][1]
assert X_sample[4, 7].argmax().item() == y_sample[4][6]

In [0]:
def tensor_to_text(tensor):
  """
  Converts a tensor representation back to a string representation.

  Arguments:
    tensor: a torch.Tensor object with the following shape:
      3D: [batch_size, seq_len, num_chars]
      2D: [batch_size, seq_len]
      1D: [seq_len]
  """
  if len(tensor.shape) == 3:
    return tensor_to_text(tensor.argmax(dim=2))
  if len(tensor.shape) == 2:
    return [tensor_to_text(line) for line in tensor]
  if len(tensor.shape) == 1:
    return "".join([tensor_to_text(char_encoding) for char_encoding in tensor])
  if len(tensor.shape) == 0:
    return id2char[tensor.item()]

In [68]:
print("3D:", tensor_to_text(X_sample))
print("2D:", tensor_to_text(y_sample))
print("1D:", tensor_to_text(torch.Tensor([char2id["J"], char2id["o"], char2id["n"], char2id["a"], char2id["s"]])))
print("0D:", tensor_to_text(torch.tensor(char2id["J"])))

3D: ['\n\n\n01x01', '- Pilot\n', '\nPilot\nS', 'ene One\n', 'Title: T']
2D: ['\n\n01x01 ', ' Pilot\n\n', 'Pilot\nSc', 'ne One\n[', 'itle: Th']
1D: Jonas
0D: J


In [59]:
torch.Tensor([15])

tensor([15.])

# Custom GRU Model

In [0]:
class GRUCell(nn.Module):
  def __init__(self, input_size, hidden_size):
    super().__init__()

    self.input_size = input_size
    self.hidden_size = hidden_size

    # Weights and Biases
    # See https://en.wikipedia.org/wiki/Gated_recurrent_unit#Fully_gated_unit
    
    ## z
    self.W_xz = nn.Parameter(torch.zeros(self.input_size, self.hidden_size))
    self.U_hz = nn.Parameter(torch.zeros(self.hidden_size, self.hidden_size))
    self.b_z = nn.Parameter(torch.zeros(self.hidden_size))

    ## r
    self.W_xr = nn.Parameter(torch.zeros_like(self.W_xz))
    self.U_hr = nn.Parameter(torch.zeros_like(self.U_hz))
    self.b_r = nn.Parameter(torch.zeros_like(self.b_z))

    ## h
    self.W_xh = nn.Parameter(torch.zeros_like(self.W_xz))
    self.U_hh = nn.Parameter(torch.zeros_like(self.U_hz))
    self.b_h = nn.Parameter(torch.zeros_like(self.b_z))

    self.init_weights()

  def init_weights(self):
    for weight in self.parameters():
      if len(weight.shape) > 1:
        # Init matrices with random noise
        nn.init.xavier_normal_(weight)
      else:
        # Init biases with zeros
        nn.init.zeros_(weight)

  def init_hidden(self, batch_size):
    h = nn.Parameter(torch.zeros((batch_size, self.hidden_size))).to(dev)
    nn.init.zeros_(h)
    return h

  def forward(self, x, h=None):
    """
    Argument shapes:
    x is of shape [batch_size, input_size]
    h is of shape [batch_size, hidden_size]

    Output shape:
    h is of shape [batch_size, hidden_size]
    """
    assert len(x.shape) == 2

    if h is None:
      h = self.init_hidden(x.shape[0])
    
    z = torch.sigmoid(x.mm(self.W_xz) + h.mm(self.U_hz) + self.b_z)
    r = torch.sigmoid(x.mm(self.W_xr) + h.mm(self.U_hr) + self.b_r)
    h = z * h + (1 - z) * torch.tanh(x.mm(self.W_xh) + (r * h).mm(self.U_hh) + self.b_h)
    return h

In [0]:
class CharRNN(nn.Module):
  def __init__(self, num_characters=num_characters, hidden_size=512, batch_first=True):
    super().__init__()

    self.num_characters = num_characters
    self.hidden_size = hidden_size
    self.batch_first = batch_first

    self.cell = GRUCell(num_characters, hidden_size)
    self.dense = nn.Linear(self.hidden_size, self.num_characters)

  def forward(self, X, h_0=None):
    """
    Argument shapes:
    X is of shape [batch_size, seq_len, num_chars] if self.batch_first
    X is of shape [seq_len, batch_size, num_chars] if not self.batch_first 
    ---
    h is of shape [batch_size, hidden_size]

    Output shapes:
    y_hat is of shape [batch_size * seq_len, num_chars]
    h_t is of shape [batch_size, hidden_size]
    """
    assert len(X.shape) == 3

    # Put seq_len in the front
    if self.batch_first:
      X = X.permute(1, 0, 2)
      # X is now of shape [seq_len, batch_size, num_chars]

    h_t = h_0
    output = torch.zeros((X.shape[0], X.shape[1], self.hidden_size)).to(dev)
    for t, x_t in enumerate(X):
      # Iterate over sequence
      h_t = self.cell(x_t, h_t)
      output[t] = h_t # [batch_size, hidden_size]
    
    # TODO: Permute output back?!
    output = output.permute(1, 0, 2)

    output = output.contiguous().view(-1, self.hidden_size) # [batch_size * seq_len, hidden_size]
    y_hat = self.dense(output) # [batch_size * seq_len, num_chars]
    return y_hat, h_t

# Training and Prediction

In [0]:
def predict_next_char(rnn, char, h):
  x = to_one_hot(torch.LongTensor([[char2id[char]]]), num_characters).to(dev)
  y_hat, h = rnn(x, h)
  next_char = tensor_to_text(torch.softmax(y_hat, dim=-1).argmax())[0]
  return next_char, h

In [0]:
def predict_text(rnn, h=None, seq_len=150, starting_character="\n"):
  current_char = starting_character
  result = [current_char]
  for i in range(seq_len):
    current_char, h = predict_next_char(rnn, current_char, h)
    result.append(current_char)
  return "".join(result)

In [0]:
def train(rnn, n_epochs=50, learning_rate=2e-3, print_every=100, batch_size=64, seq_len=128, predict_len=150):
  outname = os.path.join("drive", "My Drive", "results", str(datetime.now()) + ".txt")
  with open(outname, "w") as f:
    f.write("Training {}, num layers: {}, hidden size: {}, batch size: {}, sequence length: {}".format(str(datetime.now()), 1, rnn.hidden_size, batch_size, seq_len))

  rnn.train()

  step = 0
  losses = []
  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(rnn.parameters(), lr=learning_rate)

  for epoch in range(n_epochs):
    h = None
    for X, y in get_next_training_batch(seq_len=seq_len, batch_size=batch_size):
      step += 1
      rnn.zero_grad()
      
      y_hat, h = rnn(X, h)
      h = h.data # in order to not go through entire history

      loss = criterion(y_hat, y.view(batch_size * seq_len))
      losses.append(loss.item())
      loss.backward()

      # Apply gradient clipping
      nn.utils.clip_grad_norm_(rnn.parameters(), 5)
      optimizer.step()

      if not step % print_every:
        rnn.eval()
        running_loss = sum(losses) / len(losses)
        losses = []
        out_string = "\n-----------\n" \
          + "Epoch: {}".format(epoch + 1) + "/{}".format(n_epochs) \
          + " | Iteration: {}".format(step) \
          + " | Loss {:.5f}".format(running_loss) \
          + "\n-----------\n"
        pred_string = predict_text(rnn, seq_len=predict_len)
        print(out_string)
        print(pred_string)
        with open(outname, "a") as f:
          f.write("\n" + str(datetime.now()))
          f.write(out_string)
          f.write(pred_string)
        
        rnn.train()
  rnn.eval()

In [167]:
charRNN = CharRNN(hidden_size=512)
charRNN.to(dev)
train(charRNN, predict_len=256)

ValueError: ignored