In [None]:
import time
import random
import string
import re

import matplotlib.pyplot as plt
import torch

import platform
print(platform.platform())

print(torch.__version__)

# Check PyTorch has access to MPS (Metal Performance Shader, Apple's GPU architecture)
device = "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using device: {device}")

Taken from [Introduction to Deep Learning](https://sebastianraschka.com/blog/2021/dl-course.html) course by [sebastian raschka](https://sebastianraschka.com/). Videos
- [L19.1 Sequence Generation with Word and Character RNNs](https://www.youtube.com/watch?v=fSBw6TrePPg&list=PLTKMiZHVd_2KJtIXOW0zFhFfBaJJilH51&index=155). INtroducing character RNN with s imple toy example
- [L19.2.1 Implementing a Character RNN in PyTorch (Concepts)](https://www.youtube.com/watch?v=PFcWQkGP4lU&list=PLTKMiZHVd_2KJtIXOW0zFhFfBaJJilH51&index=156). Discuss the LSTM class
- []

## LSTM in pytorch
See [the video](https://www.youtube.com/watch?v=PFcWQkGP4lU&list=PLTKMiZHVd_2KJtIXOW0zFhFfBaJJilH51&index=156) abd [slides](https://sebastianraschka.com/pdf/lecture-notes/stat453ss21/L19_seq2seq_rnn-transformers__slides.pdf)

In [None]:
# example
input_size = 10      # The dimension of the input vector x, here number of characters in string
hidden_size = 20                        # The dimension of the embedding layer 
num_lstm_layers = 2                     # Number of recurrent layers,or time steps, sometimes called w in textbooks
sequence_length = 5
batch_size = 3

input_tensor = torch.randn(sequence_length, batch_size, input_size)
h0 = torch.zeros(num_lstm_layers, batch_size, hidden_size)    # Initial hidden state, usually initiallized with zeros
c0 = torch.zeros(num_lstm_layers, batch_size, hidden_size)    # Initial cell state, usually initiallized with zeros

print(f'Input size: {input_size}, hidden size: {hidden_size}, number of recurrent layers: {num_lstm_layers}, batch size: {batch_size}')

###  LSTM class
See [pytorch documentation](https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html). This class works on entire LSTM RNN and evaluate it over anhy input with specified number of sequence items.

In [None]:
lstm = torch.nn.LSTM(input_size, hidden_size, num_lstm_layers)

output_tensor_lstm, (hn, cn) = lstm(input_tensor, (h0,c0))
print(f'Shape of output tensor [{sequence_length} X {batch_size} X {hidden_size}]: {output_tensor_lstm.shape}')
print(f'Shape of hidden-cell-states [{h0.shape}]: {hn.shape}')

### LSTMCell class
See [pytorch documentation](https://pytorch.org/docs/stable/generated/torch.nn.LSTMCell.html). This class just encapsulates a single LSTM cell so to compute the forward pass over a network, we need to iterate. The example below is for a single hidden layer.

In [None]:
lstm_cell = torch.nn.LSTMCell(input_size, hidden_size)

hx = torch.zeros(batch_size, hidden_size)    # Initial hidden state, usually initiallized with zeros
cx = torch.zeros(batch_size, hidden_size)    # Initial cell state, usually initiallized with zeros

print(input_tensor.size()[0])

# hn, cn : hidden and cell states at last time step
output_list = []
for i in range(input_tensor.size()[0]):
    # go over the input sequence
    hx, cx = lstm_cell(input_tensor[i], (hx, cx))
    output_list.append(hx)
output_tensor_lstm_cell = torch.stack(output_list, dim=0)

print(f'Shape of output tensor [{sequence_length} X {batch_size} X {hidden_size}]: {output_tensor_lstm_cell.shape}')
print(f'Shape of hidden-cell-states [{batch_size} X {hidden_size}]: {hx.shape}')

# Character RNN/LSTM

In [None]:
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

device = 'cpu'
DEVICE = torch.device(device)

TEXT_PORTION_SIZE = 200   # Number of time steps for each input in a batch
EMBEDDING_DIM = 120       # dimension of embedding layer
HIDDEN_DIM = 130          # dimesion of hidden layer
TEXT_VOCAB_SIZE = len(string.printable)

NUM_ITER = 5000
LEARNING_RATE = 0.005
NUM_HIDDEN_LAYERS = 1

print('Device:', DEVICE)
print(f'Size of character vocabulary: {TEXT_VOCAB_SIZE}')

In [None]:
import unidecode

# divide test into small portions
random.seed(RANDOM_SEED)
def random_portion(textfile):
    start_index = random.randint(0, TEXT_LENGTH - TEXT_PORTION_SIZE)
    end_index = start_index + TEXT_PORTION_SIZE + 1
    return textfile[start_index:end_index]

# convert characters into tensors of integers (type long). The integer is
# the position of the character in string.printable
def char_to_tensor(text):
    lst = [string.printable.index(c) for c in text]
    tensor = torch.tensor(lst).long()
    return tensor

# Draw random sample for training: 
#     (1) split randomly the text 
#     (2) input is the random text from first character to one before last
#     (3) target is the random text from second character to last character
def draw_random_sample(textfile):  
    text_long = char_to_tensor(random_portion(textfile))
    inputs = text_long[:-1]
    targets = text_long[1:]
    return inputs, targets


with open('covid19-faq.txt', 'r') as f:
    textfile = f.read()

# convert special characters
textfile = unidecode.unidecode(textfile)

# strip extra whitespaces
textfile = re.sub(' +',' ', textfile)

TEXT_LENGTH = len(textfile)

print(f'Printable characters: {string.printable} of size {len(string.printable)}')
print(f'Number of characters in text: {TEXT_LENGTH}')
print(f'Convert characters to tensor: {char_to_tensor("abcDEF")}')
print(f'A random portion of the textfile: {random_portion(textfile)}')

input, target = draw_random_sample(textfile)
print(f'NUmber of time steps in a training example={len(input)} [{input[0:5]}], target size={len(target)} [{target[0:5]}]')


## RNN model

In [None]:
class RNN(torch.nn.Module):
    def __init__(self, vocabulary_size, embed_size,
                 hidden_size, output_size):
        """Basic RNN model

        Args:
            input_size (_type_): dimension of a single input vector,
            embed_size (_type_): dimension of embedding vector
            hidden_size (_type_): _description_
            output_size (_type_): _description_
        """
        super().__init__()

        self.hidden_size = hidden_size
        
        # A simple lookup table that stores embeddings of a fixed dictionary and size.
        #  num_embeddings : size of the dictionary of embeddings,  
        #  embedding_dim  : the size of each embedding vector
        # weights are trainable
        self.embed = torch.nn.Embedding(num_embeddings=vocabulary_size, embedding_dim=embed_size) 

        self.rnn = torch.nn.LSTMCell(input_size=embed_size, hidden_size=hidden_size)
        
        self.fc = torch.nn.Linear(hidden_size, output_size)
    
    def forward(self, character, hidden, cell_state):
        # expects character as size [batch_size, 1]
    
        # [batch size, embedding dim] = [1, embedding dim]
        embedded = self.embed(character)

        (hidden, cell_state) = self.rnn(embedded, (hidden, cell_state))
        # 1. output dim: [batch size, output_size] = [1, output_size]
        # 2. hidden dim: [batch size, hidden dim] = [1, hidden dim]
        # 3. cell dim: [batch size, hidden dim] = [1, hidden dim]

        output = self.fc(hidden)

        return output, hidden, cell_state
      
    def init_zero_state(self):
        init_hidden = torch.zeros(1, self.hidden_size).to(DEVICE)
        init_cell = torch.zeros(1, self.hidden_size).to(DEVICE)
        return (init_hidden, init_cell)


In [None]:
torch.manual_seed(RANDOM_SEED)

vocabulary_size = TEXT_VOCAB_SIZE
output_size = TEXT_VOCAB_SIZE

model = RNN(vocabulary_size, EMBEDDING_DIM, HIDDEN_DIM, output_size)
model = model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

## Train

In [None]:
def evaluate(model, prime_str='A', predict_len=100, temperature=0.8):
    ## based on https://github.com/spro/practical-pytorch/
    ## blob/master/char-rnn-generation/char-rnn-generation.ipynb

    (hidden, cell_state) = model.init_zero_state()
    print(hidden.shape, cell_state.shape)
    prime_input = char_to_tensor(prime_str)    # transform 
    predicted = prime_str

    # Use priming string to "build up" hidden state
    # Run over characters of `prime_str` and run in the RNN
    for p in range(len(prime_str) - 1):
        inp = prime_input[p].unsqueeze(0)   # tensor of size 1 containing the index of the character in the vocabulary
        _, hidden, cell_state = model(inp.to(DEVICE), hidden, cell_state)
    inp = prime_input[-1].unsqueeze(0)
    
    for p in range(predict_len):

        outputs, hidden, cell_state = model(inp.to(DEVICE), hidden, cell_state)
        
        # Sample from the network as a multinomial distribution
        # The higher is the temprature, the more diverse output will be generated at each evaluate (with same input string)
        output_dist = outputs.data.view(-1).div(temperature).exp() # e^{logits / T}  - logits on the set of 100 characters
        top_i = torch.multinomial(output_dist, 1)[0]    # sample from a multinomial distribution with probablities determioned by th elogits output of the network
        
        # Add predicted character to string and use as next input
        predicted_char = string.printable[top_i]
        predicted += predicted_char
        inp = char_to_tensor(predicted_char)

    return predicted


In [None]:
ppp = evaluate(model, prime_str='The', temperature=0.8)
print(ppp)

In [None]:
start_time = time.time()

loss_list = []

for iteration in range(NUM_ITER):

    hidden, cell_state = model.init_zero_state()
    optimizer.zero_grad()
    
    loss = 0.
    inputs, targets = draw_random_sample(textfile)
    inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
    
    for c in range(TEXT_PORTION_SIZE):
        # Run over all characters in the random ranple. unsqueeze adds empty dimension
        outputs, hidden, cell_state = model(inputs[c].unsqueeze(0), hidden, cell_state)
        loss += torch.nn.functional.cross_entropy(outputs, targets[c].view(1))

    loss /= TEXT_PORTION_SIZE
    loss.backward()
    
    ### UPDATE MODEL PARAMETERS
    optimizer.step()

    ### LOGGING
    with torch.no_grad():
        if iteration % 200 == 0:
            print(f'Time elapsed: {(time.time() - start_time)/60:.2f} min')
            print(f'Iteration {iteration} | Loss {loss.item():.2f}\n\n')
            print(evaluate(model, 'Th', 200), '\n')
            print(50*'=')
            
            loss_list.append(loss.item())
            plt.clf()
            plt.plot(range(len(loss_list)), loss_list)
            plt.ylabel('Loss')
            plt.xlabel('Iteration x 1000')
            plt.savefig('loss1.pdf')
            
plt.clf()
plt.ylabel('Loss')
plt.xlabel('Iteration x 1000')
plt.plot(range(len(loss_list)), loss_list)
plt.show()
