<a href="https://colab.research.google.com/github/pmadhyastha/INM434/blob/main/RNN_language_models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Neural sequence models

In [None]:
__author__ = "Pranava Madhyastha" 
__version__ = "INM434/IN3045 City, University of London, Spring 2023"

## Simple character level RNN language model from scratch. 

We will begin by examining the internals of a character based RNN language model. The code below is implementing a minimal character-level Vanilla RNN model. It is a simple neural network architecture that is trained to predict the next character in a sequence of characters.


Let's begin with some preliminaries, we will first try with a very simple sentence: `this is the NLP lab for this NLP module`. 

We will first compute some statistics on the given text data 

### TODO: The text data can either be supplied as a string or read from a file - supply a toy text file and try running the code.

The code below only uses the numpy module. 

The code converts the text data to a list of characters, removes any duplicate characters, and assigns the resulting list to the variable chars. The number of unique characters in the text data is then computed using the len() function and assigned to the variable num_chars. Finally, the size of the text data (i.e., the total number of characters in the text data) is computed and assigned to the variable txt_data_size.



In [None]:
import numpy as np


# load text data
txt_data = "this is the NLP lab for this NLP module " 

# or open some file with text based data. 
# txt_data = open('some_file.txt', 'r').read() 

chars = list(set(txt_data)) 

num_chars = len(chars) 
txt_data_size = len(txt_data)

print("unique characters : ", num_chars) 
print("txt_data_size : ", txt_data_size)

The char_to_int and int_to_char dictionaries are created using dictionary comprehensions. These dictionaries map each character in chars to a unique integer, and vice versa.
The input data is encoded by replacing each character in txt_data with its corresponding integer using a list comprehension. The resulting integer_encoded list contains the encoded data.

In [None]:
# One-hot encode
char_to_int = {c: i for i, c in enumerate(chars)}
int_to_char = {i: c for i, c in enumerate(chars)}

print("Character to integer mapping:", char_to_int)
print("----------------------------------------------------")
print("Integer to character mapping:", int_to_char)
print("----------------------------------------------------")

# Integer encode input data
integer_encoded = [char_to_int[c] for c in txt_data]
print("Integer encoded input data:", integer_encoded)
print("----------------------------------------------------")
print("Data length:", len(integer_encoded))

We will now set the hyperparameters of a neural network that will be trained on some text data, and initializes the model parameters (i.e., weights and biases) with random values.


In [None]:
# Define hyperparameters
iteration = 5000
sequence_length = 10
batch_size = round((txt_data_size / sequence_length) + 0.5) # Divide the data into batches
hidden_size = 100 # Number of neurons in the hidden layer
learning_rate = 0.1 # Learning rate for optimization algorithm

# Initialize model parameters
W_xh = np.random.randn(hidden_size, num_chars) * 0.01 # Weight matrix for input to hidden layer
W_hh = np.random.randn(hidden_size, hidden_size) * 0.01 # Weight matrix for hidden to hidden layer
W_hy = np.random.randn(num_chars, hidden_size) * 0.01 # Weight matrix for hidden to output layer

b_h = np.zeros((hidden_size, 1)) # Bias vector for hidden layer
b_y = np.zeros((num_chars, 1)) # Bias vector for output layer

h_prev = np.zeros((hidden_size, 1)) # Previous hidden state, initialized as all zeros


We will now define the forward loop for the RNN. This function implements forward propagation through an RNN. It takes in a sequence of inputs, a sequence of targets, and a previous hidden state, and outputs the loss, the probabilities for the next characters, the hidden states, and the one-hot encoded input vectors.

The function initializes empty dictionaries to store the input vectors, hidden states, unnormalized log probabilities, and probabilities for each time step. It then initializes the hidden state at time step -1 to the previous hidden state, and initializes the loss to 0.

For each time step, the function one-hot encodes the current input character, computes the hidden state using the previous hidden state and the current input character, computes the unnormalized log probabilities for the next characters using the hidden state and the output weights, and computes the probabilities for the next characters using softmax.

The function also computes the loss using the cross-entropy loss formula, which measures the difference between the predicted probabilities and the actual targets.

Finally, the function returns the loss, the probabilities for the next characters, the hidden states, and the one-hot encoded input vectors.

In [None]:
def forwardprop(inputs, targets, h_prev):
        
    # initialize variables
    xs, hs, ys, ps = {}, {}, {}, {} # create empty dictionaries to store values
    hs[-1] = np.copy(h_prev) # copy previous hidden state vector to -1 key value
    loss = 0 # initialize loss variable
    
    # loop through the sequence
    for t in range(len(inputs)): # t is a "time step" and is used as a key in the dictionaries
        
        xs[t] = np.zeros((num_chars,1)) # initialize input vector
        xs[t][inputs[t]] = 1 # set the index of the current character to 1, one-hot encoding
        hs[t] = np.tanh(np.dot(W_xh, xs[t]) + np.dot(W_hh, hs[t-1]) + b_h) # compute hidden state
        ys[t] = np.dot(W_hy, hs[t]) + b_y # compute unnormalized log probabilities for next characters
        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t])) # compute probabilities for next characters using softmax
        
        loss += -np.log(ps[t][targets[t],0]) # compute loss using cross-entropy loss formula

    return loss, ps, hs, xs


We will now define the backprop function: the purpose of backpropagation is to compute the gradients of the loss function with respect to the parameters of the RNN, which can then be used to update those parameters via gradient descent.

In [None]:


def backprop(ps, inputs, hs, xs):

    dWxh, dWhh, dWhy = np.zeros_like(W_xh), np.zeros_like(W_hh), np.zeros_like(W_hy) # make all zero matrices.
    dbh, dby = np.zeros_like(b_h), np.zeros_like(b_y)
    dhnext = np.zeros_like(hs[0]) # (hidden_size,1) 

    # reversed
    for t in reversed(range(len(inputs))):
        dy = np.copy(ps[t]) # shape (num_chars,1).  "dy" means "dloss/dy"
        dy[targets[t]] -= 1 # backprop into y. After taking the soft max in the input vector, subtract 1 from the value of the element corresponding to the correct label.
        dWhy += np.dot(dy, hs[t].T)
        dby += dy 
        dh = np.dot(W_hy.T, dy) + dhnext # backprop into h. 
        dhraw = (1 - hs[t] * hs[t]) * dh # backprop through tanh nonlinearity #tanh'(x) = 1-tanh^2(x)
        dbh += dhraw
        dWxh += np.dot(dhraw, xs[t].T)
        dWhh += np.dot(dhraw, hs[t-1].T)
        dhnext = np.dot(W_hh.T, dhraw)
    for dparam in [dWxh, dWhh, dWhy, dbh, dby]: 
        np.clip(dparam, -5, 5, out=dparam) # clip to mitigate exploding gradients.  
    
    return dWxh, dWhh, dWhy, dbh, dby



Let us now train the RNN: In the code below: the outer loop of the code iterates over a given number of training iterations, and within each iteration, the training data is divided into batches of a specified size, and the model is trained on each batch. The Adagrad optimisation algorithm is used to update the model parameters after each batch. Here's a bit more about the adagrad algorithm: https://optimization.cbe.cornell.edu/index.php?title=AdaGrad. 

In [None]:
data_pointer = 0

# memory variables for optimiser
mWxh, mWhh, mWhy = np.zeros_like(W_xh), np.zeros_like(W_hh), np.zeros_like(W_hy)
mbh, mby = np.zeros_like(b_h), np.zeros_like(b_y) 


for iteration_index in range(iteration):
    h_prev = np.zeros((hidden_size,1)) # reset RNN memory
    data_pointer = 0 # go from start of data
    
    for batch_index in range(batch_size):
        
        inputs = [char_to_int[ch] for ch in txt_data[data_pointer:data_pointer+sequence_length]]
        targets = [char_to_int[ch] for ch in txt_data[data_pointer+1:data_pointer+sequence_length+1]] # t+1        
            
        if (data_pointer+sequence_length+1 >= len(txt_data) and batch_index == batch_size-1): # processing of the last part of the input data. 
            targets.append(char_to_int[" "])   # When the data doesn't fit, add space(" ") to the back.


        # forward
        loss, ps, hs, xs = forwardprop(inputs, targets, h_prev)
    
        # backward
        dWxh, dWhh, dWhy, dbh, dby = backprop(ps, inputs, hs, xs) 
        
        
        # perform parameter update with optimiser (adagrad)
        for param, dparam, mem in zip([W_xh, W_hh, W_hy, b_h, b_y], 
                                      [dWxh, dWhh, dWhy, dbh, dby], 
                                      [mWxh, mWhh, mWhy, mbh, mby]):
            mem += dparam * dparam # elementwise
            param += -learning_rate * dparam / np.sqrt(mem + 1e-8)      
    
        data_pointer += sequence_length # move data pointer
        
    if iteration_index % 100 == 0:
        print(f'iteration {iteration_index}, loss: {loss}') # print progress


Now let us generate some samples from our model. We will use a predict function for doing this: the function generates new text by sampling from the trained RNN model. The function takes two arguments: test_char, which is the starting character for generating new text, and length, which is the length of the generated text.

`predict` initializes an empty array x and sets the value at the index corresponding to test_char to 1, which represents the one-hot encoding of the input character. It also initializes an empty list ixes to store the indices of the predicted characters.

The function then uses a for loop to generate length number of characters. In each iteration, it computes the hidden state h and the output y of the RNN using the current input x and the previous hidden state h. It then calculates the probability distribution p over all the possible characters using the softmax function applied to the output y. The function then samples the next character index ix randomly from the probability distribution p, and sets the value at the corresponding index of x to 1. It appends ix to the ixes list, which stores the index of the predicted character.

Finally, the function converts the list of indices ixes to the corresponding characters using the int_to_char dictionary, concatenates them into a string txt, and prints the generated text.


In [None]:
def predict(test_char, length):
    x = np.zeros((num_chars, 1)) 
    x[char_to_int[test_char]] = 1
    ixes = []
    h = np.zeros((hidden_size, 1))

    for t in range(length):
        h = np.tanh(np.dot(W_xh, x) + np.dot(W_hh, h) + b_h) 
        y = np.dot(W_hy, h) + b_y
        p = np.exp(y) / np.sum(np.exp(y)) 
        ix = np.random.choice(range(num_chars), p=p.ravel())  # ravel -> rank0
        ixes.append(ix)  # list
        x = np.zeros((num_chars, 1))  # init
        x[ix] = 1 

    txt = ''.join(int_to_char[i] for i in ixes)
    print('----\n%s\n----' % txt)


Let us sample now: 

In [None]:
predict('t',10) # (the input characted, length of output)

### TODO: 
- Try a bigger length of output. 
- Try with a large input file (download the file locally using `!wget <url> -O some_file.txt` (use the exclamation mark too). 
 

## Neural language model using LSTM with pytorch 

We have to use GPU for this part of the code - so change the runtime to support the GPU backend. 

The code below implements a simple LSTM based language model. The RNN class is a subclass of the nn.Module class, which is a base class for all neural network modules in PyTorch. The init method defines the structure of the RNN model by initializing the input embedding layer with nn.Embedding, the LSTM layer with nn.LSTM, and the output layer with nn.Linear.

The forward method takes an input sequence and a hidden state as input, applies the embedding layer to convert input sequence to a sequence of embeddings, passes the embeddings sequence through the LSTM layer to get the hidden state and output, and applies the linear layer to the output to get the final output. Finally, it detaches the hidden state to avoid backpropagating through time, and returns both the output and the updated hidden state.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(input_size, input_size)
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = nn.Linear(hidden_size, output_size)
    
    def forward(self, input_seq, hidden_state):
        embedding = self.embedding(input_seq)
        output, hidden_state = self.rnn(embedding, hidden_state)
        output = self.decoder(output)
        hidden_state = (hidden_state[0].detach(), hidden_state[1].detach())
        return output, hidden_state


We will now get shakespearean text to get started. We will download the text from the link below. Click on the text to see text and read it. 

In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

We will now define the main train loop with adagrad as the optimiser. The training loop iterates over the epochs, and for each epoch, it randomly selects a starting point in the data and trains the RNN model on sequences of length seq_len. The loss is computed, and the optimizer updates the model parameters. After each epoch, the model's state is saved to the specified file, and a new sequence of text is generated by randomly selecting a starting character and iteratively sampling the RNN output to generate the next character until the total number of characters in the generated sequence (op_seq_len) is reached. The generated text is printed to the console after each epoch.

In [None]:
def train():
    ########### Hyperparameters ###########
    hidden_size = 512   # size of hidden state
    seq_len = 100       # length of LSTM sequence
    num_layers = 3      # num of layers in LSTM layer stack
    lr = 0.002          # learning rate
    epochs = 100        # max number of epochs
    op_seq_len = 200    # total num of characters in output test sequence
    save_path = "charRNN_shakespeare.pth"
    data_path = "input.txt"
    #######################################

    # load the text file
    data = open(data_path, 'r').read()
    chars = sorted(list(set(data)))
    data_size, vocab_size = len(data), len(chars)
    print("----------------------------------------")
    print("Data has {} characters, {} unique".format(data_size, vocab_size))
    print("----------------------------------------")

    # char to index and index to char maps
    char_to_ix = { ch:i for i,ch in enumerate(chars) }
    ix_to_char = { i:ch for i,ch in enumerate(chars) }

    # convert data from chars to indices
    data = list(data)
    for i, ch in enumerate(data):
        data[i] = char_to_ix[ch]

    # data tensor on device
    data = torch.tensor(data).to(device)
    data = torch.unsqueeze(data, dim=1)

    # model instance
    rnn = RNN(vocab_size, vocab_size, hidden_size, num_layers).to(device)

    # loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adagrad(rnn.parameters(), lr=lr)

    # training loop
    for i_epoch in range(1, epochs+1):

        # random starting point (1st 10000 chars) from data to begin
        data_ptr = np.random.randint(10000)
        n = 0
        running_loss = 0
        hidden_state = None

        while True:
            input_seq = data[data_ptr : data_ptr+seq_len]
            target_seq = data[data_ptr+1 : data_ptr+seq_len+1]

            # forward pass
            output, hidden_state = rnn(input_seq, hidden_state)

            # compute loss
            loss = loss_fn(torch.squeeze(output), torch.squeeze(target_seq))
            running_loss += loss.item()

            # compute gradients and take optimizer step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # update the data pointer
            data_ptr += seq_len
            n +=1

            # if at end of data : break
            if data_ptr + seq_len + 1 > data_size:
                break

        # print loss and save weights after every epoch
        print("Epoch: {0} \t Loss: {1:.8f}".format(i_epoch, running_loss/n))
        torch.save(rnn.state_dict(), save_path)

        # sample / generate a text sequence after every epoch
        data_ptr = 0
        hidden_state = None

        # random character from data to begin
        rand_index = np.random.randint(data_size-1)
        input_seq = data[rand_index : rand_index+1]

        print("----------------------------------------")
        while True:
            # forward pass
            output, hidden_state = rnn(input_seq, hidden_state)

            # construct categorical distribution and sample a character
            output = F.softmax(torch.squeeze(output), dim=0)
            dist = Categorical(output)
            index = dist.sample()

            # print the sampled character
            print(ix_to_char[index.item()], end='')

            # next input is
            input_seq[0][0] = index.item()
            data_ptr += 1
            
            if data_ptr > op_seq_len:
                break
            
        print("\n----------------------------------------")
        


Let us now begin training. The code is going to sample after every epoch - notice the evolution of the generated text. 

In [None]:
train() # call the training loop

## Neural sequence taggers

We will now explore an LSTM-based Part-Of-Speech (POS) tagger model using pytorch.

Notice that the structure is very similar to the previous example. However, here, the forward method takes a sentence as input and returns the predicted tag scores for each word in the sentence.



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Define the LSTM-based POS tagger model
class LSTMTagger(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size):
        super(LSTMTagger, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores

# Define the training function
def train(model, optimizer, loss_function, sentences, tags, num_epochs):
    for epoch in range(num_epochs):
        for sentence, tag in zip(sentences, tags):
            model.zero_grad()
            sentence = torch.tensor(sentence, dtype=torch.long).to(device)
            tag = torch.tensor(tag, dtype=torch.long).to(device)
            tag_scores = model(sentence)
            loss = loss_function(tag_scores, tag)
            loss.backward()
            optimizer.step()

# Define the evaluation function
def evaluate(model, sentences, tags):
    correct = 0
    total = 0
    with torch.no_grad():
        for sentence, tag in zip(sentences, tags):
            sentence = torch.tensor(sentence, dtype=torch.long).to(device)
            tag = torch.tensor(tag, dtype=torch.long).to(device)
            tag_scores = model(sentence)
            _, predicted = torch.max(tag_scores.data, 1)
            total += tag.size(0)
            correct += (predicted == tag).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# Example usage
# Define a sample corpus and its corresponding tags
corpus = [
    "The cat sat on the mat",
    "The dog chased the cat",
    "The mouse ran away from the cat",
    "The cat purred",
]
tags = [
    "DET NOUN VERB ADP DET NOUN",
    "DET NOUN VERB DET NOUN",
    "DET NOUN VERB ADV ADP DET NOUN",
    "DET NOUN VERB",
]
# Define the vocabulary and POS tagset
word_to_ix = {"<PAD>": 0, "<UNK>": 1, "The": 2, "cat": 3, "sat": 4, "on": 5, "the": 6,
              "mat": 7, "dog": 8, "chased": 9, "mouse": 10, "ran": 11, "away": 12, "from": 13, "purred": 14}
tag_to_ix = {"<PAD>": 0, "<UNK>": 1, "DET": 2, "NOUN": 3, "VERB": 4, "ADP": 5, "ADV": 6}

# Convert the corpus and tagset to indices
sentences = [[word_to_ix.get(word, word_to_ix["<UNK>"]) for word in sentence.split()] for sentence in corpus]
tags = [[tag_to_ix.get(tag, tag_to_ix["<UNK>"]) for tag in sentence.split()] for sentence in tags]

# Set hyperparameters and create model, optimizer, and loss function instances
vocab_size = len(word_to_ix)
embedding_dim = 16
hidden_dim = 16
tagset_size = len(tag_to_ix)
num_epochs = 10
learning_rate = 0.1
model = LSTMTagger(vocab_size, embedding_dim, hidden_dim, tagset_size).to(device)
optimizer = optim.SGD(model.parameters(), lr=learning_rate)
loss_function = nn.NLLLoss()
#Train the model
train(model, optimizer, loss_function, sentences, tags, num_epochs)

#Evaluate the model on the same corpus
accuracy = evaluate(model, sentences, tags)
print(f"Accuracy: {accuracy:.2f}%")


## Todo: 
- Can you train this on a standard POS dataset from https://github.com/dan-oak/pos/tree/master/data (look for the tagged ones as train and dev set). 