# LSTM for NLP
- predict next word in Anna Karenina
- first with pytorch lstm
- then with numpy lstm

## read in text

In [13]:
import numpy as np
import os
import sys 
import regex as re
from typing import Tuple, Dict, List
from collections import Counter

cwd = os.getcwd()

In [None]:
# read in raw text
sys.path.append('../../data/')
with open('data/anna.txt' , 'r') as f:
    anna = f.read()
print(type(anna))

In [5]:
# clean up line breaks and add space around punctuation (for tokenization)
clean_text=anna.lower().replace("\n", " ") 
clean_text=clean_text.replace("-", " ") 
for x in ",.:;?!$()/_&%*@'`":
    clean_text=clean_text.replace(f"{x}", f" {x} ")
    clean_text=clean_text.replace('"', ' " ') 
text=clean_text.split()

## implement simplified word tokenizer

In [None]:
# build vocab
word_counts = Counter(text)
vocab = sorted(word_counts, key=word_counts.get,
reverse = True)
print(vocab[:10])

In [9]:
# definde tokenizer-encoder and tokenizer-decoder
encoder={v:k for k,v in enumerate(vocab)} 
decoder={k:v for k,v in enumerate(vocab)}

word_tokenizer = {'encoder': encoder, 'decoder': decoder}
# save the simple tokenizer
import joblib
joblib.dump(word_tokenizer, 'word_tokenizer.pkl')

In [None]:
# apply tokenizer
tokens = [encoder[x] for x in text]
print(text[:20])
print(tokens[:20])

In [None]:
# split into train, test sets
len_of_text = len(text)
print(len(tokens))
split_point = int(np.round(len_of_text*0.7))
train_x = tokens[:split_point]
print(len(train_x))
test_x = tokens[split_point:]
print(len(test_x))

## PyTorch LSTM

In [None]:
import torch
print(torch.__version__)

In [None]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

In [14]:
# creat PyTorch dataset 
from torch.utils.data import Dataset

# 1. Subclass torch.utils.data.Dataset
class my_dataset(Dataset):
    
    # 2. Initialize with customized varargs 
    def __init__(self, tokenized_text: str, max_length: int, stride: int=1) -> None:
        
        # 3. Create class attributes
        # initialize input_tokens_x and target_tokens_y
        self.input_tokens_x = [] # alternative nested tensor 
        self.target_tokens_y = []
        # set y as stride number of tokens trailing x 
        for i in range(0, (len(tokenized_text)-max_length), stride):
            x_tmp = tokenized_text[i : (i+max_length)]
            y_tmp = tokenized_text[(i+1) : (i+max_length+1)]
            self.input_tokens_x.append(torch.tensor(x_tmp))
            self.target_tokens_y.append(torch.tensor(y_tmp))
    
    # 4. Overwrite the __len__() method to return number of rows in the dataset
    def __len__(self) -> int:
        "Returns the number of rows / pairs of x-y sequences in the dataset"
        return len(self.input_tokens_x)
    
    # 5. Overwrite the __getitem__() method (required for subclasses of torch.utils.data.Dataset)
    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int]:
        "Returns one sample of data, data and label (X, y)."
        return self.input_tokens_x[idx], self.target_tokens_y[idx]

In [17]:
# create and save torch dataset for later use
context_length = 100
dataset_train = my_dataset(tokenized_text=train_x, max_length=context_length, stride=1)
torch.save(dataset_train, 'data/train.pt')
dataset_test = my_dataset(tokenized_text=test_x, max_length=context_length, stride=1)
torch.save(dataset_test, 'data/test.pt')

In [None]:
# load data with torch dataloader
from torch.utils.data import DataLoader
torch.manual_seed(42)

batch_size = 32
# num_workers = 3 
# num_workers=num_workers # number of cpu processes to use for preprocessing
# num_works makes next(iter()) stuck in a loop

loader_train = DataLoader(
    dataset_train,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True, # if True, # drops the last batch if it is shorter than the specified batch_size to prevent loss spikes during training  
)

print(type(loader_train))

In [None]:
# loading the next pair in dataset
x,y=next(iter(loader_train))
print(x.shape,y.shape)

In [None]:
vocab_size = len(encoder) # len(vocab) # 
print(vocab_size)

In [30]:
from torch import nn

class word_lstm(nn.Module):
    def __init__(self, 
                 vocab_size, 
                 emb_dim,
                 lstm_layers,
                 drop_out_rate):
        super().__init__()
        self.vocab_size = vocab_size
        self.emb_dim = emb_dim
        self.lstm_layers = lstm_layers
        self.drop_out_rate = drop_out_rate

        self.embedding = nn.Embedding(self.vocab_size, self.emb_dim)
        self.lstm = nn.LSTM(input_size=self.emb_dim,
                            hidden_size=self.emb_dim,
                            num_layers=self.lstm_layers,
                            dropout=self.drop_out_rate,
                            batch_first=True)
        self.fc = nn.Linear(self.emb_dim, self.vocab_size, bias=True)

    def forward(self, x, hc):
        embed = self.embedding(x)
        x, hc = self.lstm(embed, hc)
        x = self.fc(x)
        return x, hc
    
    def init_hidden(self, context_length):
        weight = next(self.parameters()).data
        return (weight.new(self.lstm_layers, context_length, self.emb_dim).zero_(),
                weight.new(self.lstm_layers, context_length, self.emb_dim).zero_())



In [None]:
model = word_lstm(vocab_size=vocab_size,
                  emb_dim=128,
                  lstm_layers=3,
                  drop_out_rate=0.2).to(device)
print(model)

In [32]:
lr=0.0001
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_func = nn.CrossEntropyLoss()

In [None]:
model.train()

for epoch in range(50):
    tloss=0
    sh,sc = model.init_hidden(batch_size)
    for i, (x,y) in enumerate(loader_train):    
        if x.shape[0]==batch_size:
            inputs, targets = x.to(device), y.to(device)
            optimizer.zero_grad()
            output, (sh,sc) = model(inputs, (sh,sc))
            loss = loss_func(output.transpose(1,2),targets)
            sh,sc=sh.detach(),sc.detach()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 5)
            optimizer.step()
            tloss+=loss.item()
        if (i+1)%100==0:
            print(f"at epoch {epoch} iteration {i+1}\
            average loss = {tloss/(i+1)}")

# at epoch 0 iteration 100            average loss = 6.056614637374878
# at epoch 0 iteration 500            average loss = 6.038397843360901
# at epoch 0 iteration 1200            average loss = 6.0286973142623905

In [None]:
torch.save(model.state_dict(),"model/word_lstm).pth")

In [None]:
# Inference - greedy

In [None]:
# Inference - top-k

## numpy LSTM
- 3 gates
    - input gate i = sigmoid(W_i[h_(t-1), x_t])
    - forget gate f = sigmoid(W_f[h_(t-1), x_t])
    - output gate o = sigmoid(W_o[h_(t-1), x_t])
- a candidate g = tahn(W_g[h_(t-1), x_t])
- a cell's memory c_t = element_wise_multiplication(c_(t-1), f) + element_wise_multiplication(g, i)
- current hidden state h_t = element_wise_multiplication(tahn(c_t), o)
- NOTE: c_t is not directly used to calculate current step's output but sent to next time step (memory used in the future)

In [1]:
import numpy as np

In [2]:
# initialize lstm 
class my_lstm:
    def __init__(self, d_hidden, vocab_size):
        self.d_hidden = d_hidden
        self.vocab_size = vocab_size
        self.d_hv = d_hidden + d_hidden
    
    def init_orthogonal(weight_mat):
        """
        orthogonalize (QR decomp) weight matrix
        see https://arxiv.org/abs/1312.6120
        """
        if weight_mat.ndim < 2:
            raise ValueError("Only parameters with 2 or more dimensions are supported.")

        rows, cols = weight_mat.shape
        
        new_param = np.random.randn(rows, cols)
        
        if rows < cols:
            new_param = new_param.T
        
        # Compute QR factorization
        q, r = np.linalg.qr(new_param)
        
        # Make Q uniform according to https://arxiv.org/pdf/math-ph/0609050.pdf
        d = np.diag(r, 0)
        ph = np.sign(d)
        q *= ph

        if rows < cols:
            q = q.T
        
        new_param = q
        
        return new_param
    
    def params(self, init_orthogonal=init_orthogonal):
        # initialize weights
        # input gate
        W_i = np.random.randn(self.d_hidden, self.d_hv)
        # forget gate
        W_f = np.random.randn(self.d_hidden, self.d_hv)
        # output gate
        W_o = np.random.randn(self.d_hidden, self.d_hv)
        # candidate
        W_g = np.random.randn(self.d_hidden, self.d_hv)
        # outout (for the entire model)
        W_v = np.random.randn(self.vocab_size, self.d_hidden)

        # orthoganalize (see https://arxiv.org/abs/1312.6120)
        W_f = init_orthogonal(W_f)
        W_i = init_orthogonal(W_i)
        W_g = init_orthogonal(W_g)
        W_o = init_orthogonal(W_o)
        W_v = init_orthogonal(W_v)
        

        # initialize biases
        # input gate
        b_i = np.zeros((self.d_hidden, 1))
        # forget gate
        b_f = np.zeros((self.d_hidden, 1))
        # output gate
        b_o = np.zeros((self.d_hidden, 1))
        # candidate
        b_g = np.zeros((self.d_hidden, 1))
        # outout (for the entire model)
        b_v = np.zeros((self.vocab_size, 1))
        
        return W_f, W_i, W_g, W_o, W_v, b_f, b_i, b_g, b_o, b_v


In [None]:

d_hidden = 128
init_lstm = my_lstm(d_hidden=d_hidden, vocab_size=vocab_size)
param = init_lstm.params()

In [None]:
def sigmoid(x, derivative=False):
    """
    Computes the element-wise sigmoid activation function for an array x.

    Args:
     `x`: the array where the function is applied
     `derivative`: if set to True will return the derivative instead of the forward pass
    """
    x_safe = x + 1e-12
    f = 1 / (1 + np.exp(-x_safe))
    
    if derivative: # Return the derivative of the function evaluated at x
        return f * (1 - f)
    else: # Return the forward pass of the function at x
        return f
    
def softmax(x, derivative=False):
    """
    Computes the softmax for an array x.
    
    Args:
     `x`: the array where the function is applied
     `derivative`: if set to True will return the derivative instead of the forward pass
    """
    x_safe = x + 1e-12
    f = np.exp(x_safe) / np.sum(np.exp(x_safe))
    
    if derivative: # Return the derivative of the function evaluated at x
        pass # We will not need this one
    else: # Return the forward pass of the function at x
        return f
    
def tanh(x, derivative=False):
    """
    Computes the element-wise tanh activation function for an array x.

    Args:
     `x`: the array where the function is applied
     `derivative`: if set to True will return the derivative instead of the forward pass
    """
    x_safe = x + 1e-12
    f = (np.exp(x_safe)-np.exp(-x_safe))/(np.exp(x_safe)+np.exp(-x_safe))
    
    if derivative: # Return the derivative of the function evaluated at x
        return 1-f**2
    else: # Return the forward pass of the function at x
        return f
    
def clip_gradient_norm(grads, max_norm=0.25):
    """
    Clips gradients to have a maximum norm of `max_norm`.
    This is to prevent the exploding gradients problem.
    """ 
    # Set the maximum of the norm to be of type float
    max_norm = float(max_norm)
    total_norm = 0
    
    # Calculate the L2 norm squared for each gradient and add them to the total norm
    for grad in grads:
        grad_norm = np.sum(np.power(grad, 2))
        total_norm += grad_norm
    
    total_norm = np.sqrt(total_norm)
    
    # Calculate clipping coeficient
    clip_coef = max_norm / (total_norm + 1e-6)
    
    # If the total norm is larger than the maximum allowable norm, then clip the gradient
    if clip_coef < 1:
        for grad in grads:
            grad *= clip_coef
    
    return grads

In [None]:
# forward pass
def forward(inputs, d_hidden, h_prev, C_prev, p=param):
    """
    args in:
    inputs: x_t of shape (n_x, m).
    h_prev: h_(t-1) of shape (n_a, m)
    C_prev: c_(t-1) of shape (n_a, m)
    p: list of params from my_lstm:
        W_f: (n_a, n_a + n_x)
        b_f: n_a, 1)
        W_i: (n_a, n_a + n_x)
        b_i: (n_a, 1)
        W_g: (n_a, n_a + n_x)
        b_g: (n_a, 1)
        W_o: (n_a, n_a + n_x)
        b_o: (n_a, 1)
        W_v: (n_v, n_a)
        b_v: n_v, 1)
    Returns:
    z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s -- lists of size m containing the computations in each forward pass
    outputs -- prediction at timestep "t", numpy array of shape (n_v, m)
    """
    assert h_prev.shape == (d_hidden, 1)
    assert C_prev.shape == (d_hidden, 1)

    # unpack params
    W_f, W_i, W_g, W_o, W_v, b_f, b_i, b_g, b_o, b_v = p
    
    # Save a list of computations for each of the components in the LSTM
    z_s, f_s, i_s,  = [], [] ,[], []
    g_s, C_s, o_s, h_s = [], [] ,[], []
    v_s, output_s =  [], [] 
    
    # Append the initial cell and hidden state to their respective lists
    h_s.append(h_prev)
    C_s.append(C_prev)
    
    for x in inputs:
        
        # stack input and hidden state
        z = np.row_stack((h_prev, x))
        z_s.append(z)
        
        # forget gate
        f = sigmoid(np.dot(W_f, z) + b_f)
        f_s.append(f)
        
        # input gate
        i = sigmoid(np.dot(W_i, z) + b_i)
        i_s.append(i)
        
        # candidate
        g = tanh(np.dot(W_g, z) + b_g)
        g_s.append(g)
        
        # memory state
        C_prev = f * C_prev + i * g 
        C_s.append(C_prev)
        
        # output gate
        o = sigmoid(np.dot(W_o, z) + b_o)
        o_s.append(o)
        
        # hidden state
        h_prev = o * tanh(C_prev)
        h_s.append(h_prev)

        # logits / model output layer
        v = np.dot(W_v, h_prev) + b_v
        v_s.append(v)
        
        # softmax the output logits
        output = softmax(v)
        output_s.append(output)

    return z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, output_s



In [None]:
# Initialize hidden state as zeros
h = np.zeros((d_hidden, 1))
c = np.zeros((d_hidden, 1))

# Forward pass
z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, outputs = forward(param, h, c, param)

output_sentence = [word_tokenizer['decoder'][np.argmax(output)] for output in outputs]
print('Input sentence:')
print(inputs)

print('\nTarget sequence:')
print(targets)

print('\nPredicted sequence:')
print([word_tokenizer['decoder'][np.argmax(output)] for output in outputs])

In [None]:
# backward pass
def backward(z, f, i, g, C, o, h, v, outputs, targets, p=param):
    """
    Arguments:
    z: list of np.row_stack((h_prev, x)) of size m.
    f: list of forget gate computations of size m.
    i: list of input gate computations of size m.
    g: list of candidate computations of size m.
    C: list of Cell states of size m+1.
    o: list of output gate computations of size m.
    h: list of Hidden state computations of size m+1.
    v: list of logit computations of size m.
    outputs: list ofoutputs of size m.
    targets: list oftargets of size m.
    p: tuple of params
    Returns:
    loss -- crossentropy loss for all elements in output
    grads -- lists of gradients of every element in p
    """

    # unpack parameters
    W_f, W_i, W_g, W_o, W_v, b_f, b_i, b_g, b_o, b_v = p

    # Initialize gradients as zero
    W_f_d = np.zeros_like(W_f)
    b_f_d = np.zeros_like(b_f)

    W_i_d = np.zeros_like(W_i)
    b_i_d = np.zeros_like(b_i)

    W_g_d = np.zeros_like(W_g)
    b_g_d = np.zeros_like(b_g)

    W_o_d = np.zeros_like(W_o)
    b_o_d = np.zeros_like(b_o)

    W_v_d = np.zeros_like(W_v)
    b_v_d = np.zeros_like(b_v)
    
    # Set the next cell and hidden state equal to zero
    dh_next = np.zeros_like(h[0])
    dC_next = np.zeros_like(C[0])
        
    # Track loss
    loss = 0
    
    for t in reversed(range(len(outputs))):
        
        # Compute the cross entropy
        loss += -np.mean(np.log(outputs[t]) * targets[t])
        # Get the previous hidden cell state
        C_prev= C[t-1]
        
        # Compute the derivative of the relation of the hidden state to the output layer
        dv = np.copy(outputs[t])
        dv[np.argmax(targets[t])] -= 1

        # Update the gradient of the relation of the hiddenstate to the output layer
        W_v_d += np.dot(dv, h[t].T)
        b_v_d += dv

        # Compute the derivative of the hidden state and output gate
        dh = np.dot(W_v.T, dv)        
        dh += dh_next
        do = dh * tanh(C[t])
        do = sigmoid(o[t], derivative=True)*do
        
        # Update the gradients with respect to the output gate
        W_o_d += np.dot(do, z[t].T)
        b_o_d += do

        # Compute the derivative of the cell state and candidate g
        dC = np.copy(dC_next)
        dC += dh * o[t] * tanh(tanh(C[t]), derivative=True)
        dg = dC * i[t]
        dg = tanh(g[t], derivative=True) * dg
        
        # Update the gradients with respect to the candidate
        W_g_d += np.dot(dg, z[t].T)
        b_g_d += dg

        # Compute the derivative of the input gate and update its gradients
        di = dC * g[t]
        di = sigmoid(i[t], True) * di
        W_i_d += np.dot(di, z[t].T)
        b_i_d += di

        # Compute the derivative of the forget gate and update its gradients
        df = dC * C_prev
        df = sigmoid(f[t]) * df
        W_f_d += np.dot(df, z[t].T)
        b_f_d += df

        # Compute the derivative of the input and update the gradients of the previous hidden and cell state
        dz = (np.dot(W_f.T, df)
             + np.dot(W_i.T, di)
             + np.dot(W_g.T, dg)
             + np.dot(W_o.T, do))
        dh_prev = dz[:d_hidden, :]
        dC_prev = f[t] * dC
        
    grads= W_f_d, W_i_d, W_g_d, W_o_d, W_v_d, b_f_d, b_i_d, b_g_d, b_o_d, b_v_d
    
    # Clip gradients
    grads = clip_gradient_norm(grads)
    
    return loss, grads



In [None]:
# Perform a backward pass
loss, grads = backward(z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, outputs, targets)

print('We get a loss of:')
print(loss)

In [None]:
def update_parameters(params, grads, lr=1e-3):
    # Take a step
    for param, grad in zip(params, grads):
        param -= lr * grad
    
    return params

In [None]:
# Training loop

# id data
training_set = zip(train_x, train_y)
validation_set = zip(test_x, test_y)

# hyper-parameters
num_epochs = 50

# initialize
d_hidden = 128
init_lstm = my_lstm(d_hidden=d_hidden, vocab_size=vocab_size)
param = init_lstm.params()

# Track loss
training_loss, validation_loss = [], []

# For each epoch
for i in range(num_epochs):
    
    # Track loss
    epoch_training_loss = 0
    
    # For each sentence in training set
    for inputs, targets in training_set:

        # Initialize hidden state and cell state as zeros
        h = np.zeros((d_hidden, 1))
        c = np.zeros((d_hidden, 1))

        # Forward pass
        z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, outputs = forward(inputs, h, c, params)
        
        # Backward pass
        loss, grads = backward(z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, outputs, targets, params)
        
        # Update parameters
        params = update_parameters(params, grads, lr=1e-1)
        
        # Update loss
        epoch_training_loss += loss
                
    # Save loss for plot
    training_loss.append(epoch_training_loss/len(training_set))

    # Print loss every 5 epochs
    if i % 5 == 0:
        print(f'Epoch {i}, training loss: {training_loss[-1]}}')


In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# Plot training loss
epoch = np.arange(len(training_loss))
plt.figure()
plt.plot(epoch, training_loss, 'r', label='Training loss')
plt.legend()
plt.xlabel('Epoch'), plt.ylabel('NLL')
plt.show()

In [None]:
# For each sentence and target pair

for inputs, targets in validation_set:

    # Initialize hidden state and cell state as zeros
    h = np.zeros((d_hidden, 1))
    c = np.zeros((d_hidden, 1))

    # Forward pass
    z_s, f_s, i_s, g_s, C_s, o_s, h_s, v_s, outputs = forward(inputs, h, c, params)

    # Print example
    print('Input sentence:')
    print(inputs)

    print('\nTarget sequence:')
    print(targets)

    print('\nPredicted sequence:')
    print([decoder[np.argmax(output)] for output in outputs])


