In [13]:
import numpy as np
from torch import nn

In [8]:
text = "this is a test text!"
chars = list(set(text))
    # set(text): good way to obtain the unique elements of a string
indexer = {char:index for (index, char) in enumerate(chars)}

indexed_data = []
for c in text:
    indexed_data.append(indexer[c])
    
    
# creating batches
x = np.array(indexed_data).reshape((2, -1))
    # reshape the array into a matrix of two rows and the necessary number of columns
for b in range(0, x.shape[1], 5):
    batch = x[:, b:b + 5]
    print(batch)
    # divide the sequences inside each batch in subsequences of 5 characters
    
# one-hot-encoding
batch = np.array([[2, 4, 7, 6, 5],
                  [2, 1, 6, 2, 5]])
batch_flatten = batch.flatten() # return a copy of the array collapsed into one dimension
onehot_flat = np.zeros((batch.shape[0]*batch.shape[1], len(indexer)))
onehot_flat[range(len(batch_flatten)), batch_flatten] = 1
    # for each element of the onehot_flat array (first dimension),
    # select the correct indicator (second dimension) and set it to 1
oneht = onehot_flat.reshape((batch.shape[0], batch.shape[1], -1))

[[5 3 0 4 1]
 [5 7 4 5 1]]
[[0 4 1 6 1]
 [5 7 2 5 8]]


_Preprocessing the input data and creating a one-hot matrix_

In [4]:
text = "Hello World!"

chars = list(set(text))
indexer = {char:index for (index, char) in enumerate(chars)}
print(indexer)

{'l': 0, 'H': 1, ' ': 2, 'r': 3, 'e': 4, 'W': 5, 'o': 6, 'd': 7, '!': 8}


In [5]:
encoded = []
for c in text:
    encoded.append(indexer[c])

encoded = np.array(encoded).reshape(2, -1)
print(encoded)

[[1 4 0 0 6 2]
 [5 6 3 0 7 8]]


In [7]:
def index2onehot(batch: np.ndarray):
    batch_flatten = batch.flatten() # Return a copy of the array collapsed into one dimension.
    onehot_flat = np.zeros((batch.shape[0] * batch.shape[1], len(indexer)))
    # dimensions: (sequence_length, num_features)
    onehot_flat[range(len(batch_flatten)), batch_flatten] = 1 # for the whole sequence, select the correspondent index given 
                                                              # by batch_flatten and set it to 1
    onehot = onehot_flat.reshape((batch.shape[0], batch.shape[1], -1)) # rearrange the array to (num_samples, sequence_size, num_features)
    
    return onehot

In [11]:
one_hot = index2onehot(encoded)
print(one_hot)

[[[0. 1. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 1. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 0. 1. 0. 0. 0. 0. 0. 0.]]

 [[0. 0. 0. 0. 0. 1. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 1. 0. 0.]
  [0. 0. 0. 1. 0. 0. 0. 0. 0.]
  [1. 0. 0. 0. 0. 0. 0. 0. 0.]
  [0. 0. 0. 0. 0. 0. 0. 1. 0.]
  [0. 0. 0. 0. 0. 0. 0. 0. 1.]]]


_Building the Architecture_

In [14]:
class LSTM(nn.Module):
    def __init__(self, char_length, hidden_size, n_layers):
        super().__init__()
        self.hidden_size = hidden_size
        self.n_layers    = n_layers
        self.lstm = nn.LSTM(char_length, hidden_size, n_layers, batch_first = True)
            # char length: The number of expected features in the input `x`
            # hidden_size: The number of features in the hidden state
            # Number of recurrent layers. E.g., setting ``num_layers=2``
                # would mean stacking two LSTMs together to form a `stacked LSTM`,
                # with the second LSTM taking in outputs of the first LSTM and
                # computing the final results.
        self.output = nn.Linear(hidden_size, char_length) # Para cada batch (uma sequÃªncia independente)
        
    def forward(self, x, states):
        out, states = self.lstm(x, states)
        out = out.contiguous().view(-1, self.hidden_size) # Why this is rearranged this way?
                                                          # Why not use the hidden state as input for the output layer?
        out = self.output(out)
            # why is it necessary to put another output layer? This one is the classifier?
            # Yes, it must be the classifier.
        
        return out, states
    
    def init_states(self, batch_size):
        hidden = next(self.parameters()).data.new(self.n_layers, batch_size, self.hidden_size).zero_()
        cell   = next(self.parameters()).data.new(self.n_layers, batch_size, self.hidden_size).zero_()
        # You must remember that the state is a vector, hence the necessity to create a matrix with the state vector for all batches.
        # Why? I would need to understand how this is implemented.
        states = (hidden, cell)
        
        return states

_Training the Model_

In [None]:
# Step 1: Number of epochs
for e in range(1, epochs + 1):
    # Step 2: memory Initialized
    states = model.init_states(n_seq) # Couldn't i put this inside the model?
    
    # Step 3: for loop to split data in batches.
    for b in range(0, x.shape[1], seq_length): # for 0 to x.shape[1] (size of the whole sequence) by steps of seq_length
        x_batch = x[:, b:b + seq_length]
        
        if b == x.shape[1] - seq_length: # for the last sequence
            y_batch = x[:, b+1:b+seq_length]
            y_batch = np.hstack((y_batch, indexer["."]*np.ones((y_batch.shape[0], 1)))) # ?????????????
        else: # for the earlier sequences
            y_batch = x[:, b+1: b+seq_length+1]
        
        # Step 4: input data is converted to one-hot matrix. Inputs and targets are converted to tensors
        
        x_onehot = torch.Tensor(index2onehot(x_batch))
        y = torch.Tensor(y_batch).view(n_seq * seq_length)
        
        # Step 5: get a prediction and perform the backward propagation.
        
        pred, states = model(x_onehot, states)
        loss = loss_function(pred, y.long())
        optimizer.zero_grad()
        loss.backward(retain_graph = True)