# Homework 2 - Recurrent Neural Networks

In this part of the homework we are going to work with Recurrent Neural Networks, in particular GRU. One of the greatest things that Recurrent Neural Networks can do when working with sequences is retaining data from several timesteps in the past. We are going to explore that property by constructing an 'echo' Recurrent Neural Network.

The goal here is to make a model that given a sequence of letters or digits will output that same sequence, but with a certain delay. Let's say the input is a string 'abacaba', we want the model to not output anything for 3 steps (delay length), and then output the original string step by step, except the last 3 characters. So, target output is then 'XXXabac', where 'X' is empty output.

This is similar to [this notebook](https://github.com/Atcold/pytorch-Deep-Learning/blob/master/09-echo_data.ipynb) (which you should refer to when doing this assignment), except we're working not with a binary string, but with a sequence of integers between 0 and some N. In our case N is 26, which is the number of letters in the alphabet.

A **synchronized many-to-many task** is a type of machine learning problem where the input and output sequences have different lengths and the sequences need to be aligned in time. In this type of task, a sequence of inputs is mapped to a sequence of outputs, where the number of inputs and outputs can be different for each time step (eg machine translation).



## Dataset

Let's implement the dataset. In our case, the data is basically infinite, as we can always generate more examples on the fly, so don't need to load anything from disk.

In [1]:
import random
import string

import torch

# Max value of the generated integer. 26 is chosen becuase it's
# the number of letters in English alphabet.
N = 26


def idx_to_onehot(x, k=N+1):
  """ Converts the generated integers to one-hot vectors """
  ones = torch.sparse.torch.eye(k) # creates a sparse identity matrix of size k
  shape = x.shape
  #  reshape the x tensor into a 1D tensor, where each element represents an index
  #  pass to the index_select method of the identity matrix:
  #  selects the corresponding row of the identity matrix for each index in the x tensor.
  res = ones.index_select(0, x.view(-1).type(torch.int64))
  return res.view(*shape, res.shape[-1])



In [2]:
x_out = idx_to_onehot(x = torch.tensor([1, 2]))
print(x_out.shape)
x_out

torch.Size([2, 27])


tensor([[0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0., 0., 0.]])

In [3]:
# can take advantage of the one-hot encoding
# of the target and call argmax along its second dimension to create a tensor of shape
# (batch_size) containing the index of the class label that was hot for each sequence.
x_out.argmax(dim = 1)

tensor([1, 2])

In [4]:
class EchoDataset(torch.utils.data.IterableDataset):

  def __init__(self, delay=4, seq_length=15, size=1000):
    self.delay = delay
    self.seq_length = seq_length
    self.size = size

  def __len__(self):
    return self.size

  def __iter__(self):
    """ Iterable dataset doesn't have to implement __getitem__.
        Instead, we only need to implement __iter__ to return
        an iterator (or generator).

        The advantage of using __iter__ instead of __getitem__ is that
        it provides a more flexible and memory-efficient way to access the data,
        since it allows you to generate the samples on-the-fly
        instead of loading them all into memory.
        This can be especially useful when dealing with large datasets.
    """
    for _ in range(self.size):
      seq = torch.tensor([random.choice(range(1, N + 1)) for i in range(self.seq_length)], dtype=torch.int64)
      result = torch.cat((torch.zeros(self.delay), seq[:self.seq_length - self.delay])).type(torch.int64)
      yield seq, result

DELAY = 4
DATASET_SIZE = 200000
ds = EchoDataset(delay=DELAY, size=DATASET_SIZE)

In [5]:
ds_small = EchoDataset(delay=3, size=2, seq_length = 9)
for data, label in ds_small:
  print("Data:", data, ", label:", label)

Data: tensor([13, 17, 12,  4, 20,  2, 19,  3,  4]) , label: tensor([ 0,  0,  0, 13, 17, 12,  4, 20,  2])
Data: tensor([15,  4,  8, 19, 17,  1,  5, 25,  2]) , label: tensor([ 0,  0,  0, 15,  4,  8, 19, 17,  1])


In [6]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cuda', index=0)

In [7]:
for i, (data, label) in enumerate(ds_small):
        print("i:", i, "Data:", data, ", label:", label)

i: 0 Data: tensor([17, 14, 26, 17, 15, 26,  4, 21,  3]) , label: tensor([ 0,  0,  0, 17, 14, 26, 17, 15, 26])
i: 1 Data: tensor([ 9, 26, 21, 20,  2, 19, 20, 25, 19]) , label: tensor([ 0,  0,  0,  9, 26, 21, 20,  2, 19])


## Model

Now, we want to implement the model. For our purposes, we want to use GRU. The architecture consists of GRU and a decoder. Decoder is responsible for decoding the GRU hidden state to yield a predicting for the next output. The parts you are responsible for filling with your code are marked with `TODO`.

In [8]:
def map_letters_to_numbers(input_string):
  """
  In this function, the chr function is used to get the character
  corresponding to a given ASCII code.
  The ASCII code for a is 97, so adding 96 to a number gives us the ASCII code
  for the corresponding letter.
  """
  result = []
  for char in input_string:
    if char.isalpha():
      char = char.lower()
      result.append(ord(char) - 96)

  return result

In [9]:
map_letters_to_numbers('abcde')

[1, 2, 3, 4, 5]

In [10]:
def map_numbers_to_letters(input_numbers):
    result = []
    for num in input_numbers:
        if 1 <= num <= 26:
            result.append(chr(num + 96))
    return ''.join(result)

In [11]:
map_numbers_to_letters([1, 2, 3])

'abc'

Things tried :
* ReLU before linear layer (makes things worse)
* two linear layers with ReLU in between (gets stuck on single number prediction) or sigmoid (correctly predicts first four 0s but doesn't learn beyond)

In [12]:
class GRUMemory(torch.nn.Module):

  def __init__(self, hidden_size, n_categories, middle_size = 128):
    super().__init__()

    self.hidden_size = hidden_size

    self.linear_in = torch.nn.Linear(n_categories + 1, hidden_size)

    self.gru = torch.nn.GRU(
            input_size=hidden_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True
        )
    # only use last time step as input for linear layer so only need hidden size as input
    self.linear_out = torch.nn.Linear(hidden_size, n_categories + 1)

  def forward(self, x, h):
    # inputs: x - input tensor of shape (batch_size, seq_length, N+1)
    # returns:
    # logits (scores for softmax) of shape (batch size, seq_length, N + 1)
    x = self.linear_in(x)
    x, h = self.gru(x)
    x = self.linear_out(x)
    # x = torch.softmax(x, dim = -1)
    return x, h

  @torch.no_grad()
  def test_run(self, s):
    # This function accepts one string s containing lowercase characters a-z.
    # You need to map those characters to one-hot encodings,
    # then get the result from your network, and then convert the output
    # back to a string of the same length, with 0 mapped to ' ',
    # and 1-26 mapped to a-z.

    # function idx_to_onehot takes numerical indices
    s = map_letters_to_numbers(s)
    s_hot = idx_to_onehot(torch.tensor(s))

    h = None
    # model expects (batch, T, H_in)
    s_hot = s_hot.unsqueeze(0)

    logits, _ = self(s_hot, h = None)

    ### model is going to return softmax scores :
    ### get indices using argmax
    output = torch.argmax(logits, dim=-1)

    ## convert back to strings
    s_out = map_numbers_to_letters(output.squeeze().tolist())

    return s_out







The number of features in the hidden unit of a GRU is a design choice and can be different from the number of input features (n). It's common to use the same number of hidden units as input features, but this is not always the case. In general, the number of hidden units is a hyperparameter that can be tuned during the model training process. Another approach is to use rules of thumb based on the complexity of the data and task. For example, a simple rule of thumb is to choose the number of hidden units to be between the input and output size. However, this is just a rough guideline and the optimal number of hidden units can vary depending on the specific task and dataset.

In [13]:
model = GRUMemory(hidden_size = 64, n_categories = N)

In [14]:
for name, param in model.named_parameters():
    print(name, param.shape)

linear_in.weight torch.Size([64, 27])
linear_in.bias torch.Size([64])
gru.weight_ih_l0 torch.Size([192, 64])
gru.weight_hh_l0 torch.Size([192, 64])
gru.bias_ih_l0 torch.Size([192])
gru.bias_hh_l0 torch.Size([192])
linear_out.weight torch.Size([27, 64])
linear_out.bias torch.Size([27])


In [15]:
model.test_run("abcde")

'zpppp'

In [16]:
model

GRUMemory(
  (linear_in): Linear(in_features=27, out_features=64, bias=True)
  (gru): GRU(64, 64, batch_first=True)
  (linear_out): Linear(in_features=64, out_features=27, bias=True)
)

In [17]:
# define training and testing data

train_data = EchoDataset(delay=DELAY, size=DATASET_SIZE)



In [18]:
train_dataloader = torch.utils.data.DataLoader(train_data, batch_size = 50)

In [19]:
len(train_dataloader)

4000

In [20]:
for batch_idx in train_dataloader:
    data, label = batch_idx
    print("Data shape:", data.shape)
    print("Label shape:", label.shape)
    break

Data shape: torch.Size([50, 15])
Label shape: torch.Size([50, 15])


In [21]:
ds_test = EchoDataset(delay=3, size=6)

In [22]:
for seq, lab in train_data:
  print("Sequence shape", seq.shape)
  print("label shape", lab.shape)
  break

Sequence shape torch.Size([15])
label shape torch.Size([15])


In [23]:
len(train_data)

200000

In [24]:
for batch in ds_test:
  data, label = batch
  print("DATA")
  print(data)
  print("LABEL")
  print(label)
  break

DATA
tensor([15,  9,  5,  4, 15,  4, 18, 15, 16,  4,  9, 12, 13, 13,  1])
LABEL
tensor([ 0,  0,  0, 15,  9,  5,  4, 15,  4, 18, 15, 16,  4,  9, 12])


## Training
Below you need to implement the training of the model. We give you more freedom as for the implementation. The two limitations are that it has to execute within 10 minutes, and that error rate should be below 1%.

In [25]:
criterion = torch.nn.CrossEntropyLoss()
# criterion = torch.nn.MultiLabelMarginLoss()
# optimizer = torch.optim.RMSprop(model.parameters(), lr=0.01)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # works better than 0.1?

In [26]:
import numpy as np

In [27]:
def train(epoch):
    """
    copied from https://github.com/Atcold/pytorch-Deep-Learning/blob/master/09-echo_data.ipynb
    """

    model.train()

    # New epoch --> fresh hidden state
    hidden = None   # initialize hidden state with zeros
    correct = 0

    print_count = 0

    n_samples = 0

    for batch_idx in train_dataloader:
    # for batch_idx in ds_test: # mini dataset for comp speed
        data, target = batch_idx
        optimizer.zero_grad()
        # if hidden is not None: hidden.detach_()

        # add dimension for batch -- don't need if use dataloader
        # data = data.unsqueeze(0)

        # Model works when sequence is one hot encoded
        data = idx_to_onehot(data)
        data = data.to(device)

        target_OH = idx_to_onehot(target)
        target_OH = target_OH.to(device)
        target = target.to(device)

      # Do forward pass
        logits, _ = model(data, h = None)

        # cross entropy with logits
        # TORCH.NN.FUNCTIONAL.CROSS_ENTROPY takes
        # input : Predicted unnormalized logits
        # target : Ground truth class INDICES

#         if print_count < 2:
#             print("One hot encoded data (input) shape:", data.shape)
#             print("Logit shape :", logits.shape)
#             print("Target shape:", target.shape)
#             print("Data shape ",  data.shape[0])
#             print("OHE target shape:", target_OH.shape)
#             print("")


        # loss = criterion(logits.squeeze(), target) # RuntimeError: Expected target size [50, 27], got [50, 15]

        loss = criterion(logits.reshape(-1, N+1), target.reshape(-1))


        loss.backward()
        optimizer.step()

        # model returns softmax scores :
        # get indices using argmax
        pred = torch.argmax(logits, dim=-1).squeeze()

        if (epoch == 2) | (epoch == 8):
            print_count += 1
            if (print_count) < 2:
                print("PREDICTION:")
                print(pred)
                print("TARGET:")
                print(target)
                print("")


        correct += (pred == target).int().sum().item()/target.shape[1] # divide by sequence length here
        n_samples += target.shape[0]

    correct = correct/n_samples # will return number between 0 and 1

    return correct, loss.item()

In [39]:
def test_model(model, sequence_length=15, D = 4):
    """
    This is the test function that runs 100 different strings through your model,
    and checks the error rate.
    """
    total = 0
    correct = 0
    model.to('cpu')


    for i in range(1):
        printer = 0

        s = ''.join([random.choice(string.ascii_lowercase) for i in range(random.randint(15, 25))])
        result = model.test_run(s)

        # if printer < 1:
        #   print("Label:", s)
        #   print("Label length:", len(s))
        #   print("Prediction:", result)
        #   print("Label length:", len(result))
        #   printer += 1

        assert D > 0, 's[:-D] won\'t work for D=0'

        #for c1, c2 in zip(s[:-D], result[D:]):
        for c1, c2 in zip(s[:-D], result): #### something wrong with the test run : it's not returning the full sequence length?
            # print("c1:", c1)
            # print("c2:", c2)
            correct += int(c1 == c2)
        total += len(s) - D

    return correct / total

In [29]:
import time
start_time = time.time()

# TODO: initialize and train your model here.
end_time = time.time()
duration = end_time - start_time

# call training function
model.to(device)
n_epochs = 10

for epoch in range(1, n_epochs+1):
    correct, loss = train(epoch)
    train_accuracy = float(correct)*100
    print(f'Train Epoch: {epoch}/{n_epochs}, loss: {loss:.3f}, accuracy {train_accuracy:.1f}%')


Train Epoch: 1/10, loss: 0.004, accuracy 87.0%
PREDICTION:
tensor([[ 0,  0,  0,  0, 10, 14,  8, 10,  9, 26,  1, 21, 11, 26, 15],
        [ 0,  0,  0,  0, 24, 11,  9, 26, 24, 10,  6, 17, 19, 11, 15],
        [ 0,  0,  0,  0, 18, 24, 12, 21, 20, 22,  8,  5,  6, 15,  7],
        [ 0,  0,  0,  0,  3, 11, 23, 22,  4,  4,  5, 20,  5, 22, 20],
        [ 0,  0,  0,  0,  1, 21, 13,  9,  6, 24,  4,  2, 11, 15, 22],
        [ 0,  0,  0,  0, 19, 20, 21,  7, 10, 17,  5, 23,  5, 19, 12],
        [ 0,  0,  0,  0, 19,  6, 24,  9,  4,  9, 20,  4, 12, 14, 10],
        [ 0,  0,  0,  0,  3, 18, 18, 10,  3,  5, 11, 14, 18, 20, 17],
        [ 0,  0,  0,  0,  1, 12, 18, 22, 21,  3, 20, 15,  7, 15, 18],
        [ 0,  0,  0,  0, 23, 17, 18,  8,  3, 14, 18,  8, 15, 26,  6],
        [ 0,  0,  0,  0, 24, 20, 22,  1, 16, 15,  3,  1, 16, 17,  4],
        [ 0,  0,  0,  0, 16, 18, 21, 24, 19, 17, 14, 11, 21, 13, 16],
        [ 0,  0,  0,  0, 16, 20, 23, 19, 24,  1,  3,  7, 20, 19, 13],
        [ 0,  0,  0,  0, 10, 10

In [40]:
accuracy = test_model(model)
print("Accuracy:", accuracy)
print("Training time:", round(duration/60), "minutes")

assert duration < 600, 'execution took f{duration:.2f} seconds, which longer than 10 mins'
assert accuracy > 0.99, f'accuracy is too low, got {accuracy}, need 0.99'
print('tests passed')

Accuracy: 1.0
Training time: 0 minutes
tests passed


## Variable delay model

Now, to make this more complicated, we want to have varialbe delay. So, now, the goal is to transform a sequence of pairs (character, delay) into a character sequence with given delay. Delay stays constant within one sequence.

### Dataset
As before, we first implement the dataset:

In [None]:
class VariableDelayEchoDataset(torch.utils.data.IterableDataset):

  def __init__(self, max_delay=8, seq_length=20, size=1000):
    self.max_delay = max_delay
    self.seq_length = seq_length
    self.size = size

  def __len__(self):
    return self.size

  def __iter__(self):
    for _ in range(self.size):
      seq = torch.tensor([random.choice(range(1, N + 1)) for i in range(self.seq_length)], dtype=torch.int64)
      delay = random.randint(0, self.max_delay)
      result = torch.cat((torch.zeros(delay), seq[:self.seq_length - delay])).type(torch.int64)
      yield seq, delay, result

### Model

And the model.

In [None]:
class VariableDelayGRUMemory(torch.nn.Module):

  def __init__(self, hidden_size, max_delay):
    super().__init__()
    #TODO

  def forward(self, x, delays):
    # inputs:
    # x - tensor of shape (batch size, seq length, N + 1)
    # delays - tensor of shape (batch size)
    # returns:
    # logits (scores for softmax) of shape (batch size, seq_length, N + 1)

    # TODO
    pass

  @torch.no_grad()
  def test_run(self, s, delay):
    # This function accepts one string s containing lowercase characters a-z,
    # and a delay - the desired output delay.
    # You need to map those characters to one-hot encodings,
    # then get the result from your network, and then convert the output
    # back to a string of the same length, with 0 mapped to ' ',
    # and 1-26 mapped to a-z.

    # TODO
    pass


### Train

As before, you're free to do what you want, as long as training finishes within 10 minutes and accuracy is above 0.99 for delays between 0 and 8.

In [None]:
def test_variable_delay_model(model, seq_length=20):
  """
  This is the test function that runs 100 different strings through your model,
  and checks the error rate.
  """
  total = 0
  correct = 0
  for i in range(500):
    s = ''.join([random.choice(string.ascii_lowercase) for i in range(seq_length)])
    d = random.randint(0, model.max_delay)
    result = model.test_run(s, d)
    if d > 0:
      z = zip(s[:-d], result[d:])
    else:
      z = zip(s, result)
    for c1, c2 in z:
      correct += int(c1 == c2)
    total += len(s) - d

  return correct / total

In [None]:
import time
start_time = time.time()

MAX_DELAY = 8
SEQ_LENGTH = 20

# TODO: implement model training here.
model = None

end_time = time.time()
assert end_time - start_time < 600, 'executing took longer than 10 mins'
assert test_variable_delay_model(model) > 0.99, 'accuracy is too low'
print('tests passed')