<a href="https://colab.research.google.com/github/pmadhyastha/.config/blob/master/RNN_playground.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

In [None]:
# load text data

#txt_data = "this is the NLP course at City, University of London "
txt_data = "this NLP module is a module at a  "
# txt_data = open('input.txt', 'r').read() # test external files

chars = list(set(txt_data)) # split and remove duplicate characters. convert to list.

num_chars = len(chars) # the number of unique characters
txt_data_size = len(txt_data)

print("unique characters : ", num_chars) # You can see the number of unique characters in your input data.
print("txt_data_size : ", txt_data_size)


In [None]:
# one hot encode
char_to_int = dict((c, i) for i, c in enumerate(chars)) # "enumerate" retruns index and value. Convert it to dictionary
int_to_char = dict((i, c) for i, c in enumerate(chars))
print(char_to_int)
print("----------------------------------------------------")
print(int_to_char)
print("----------------------------------------------------")
# integer encode input data
integer_encoded = [char_to_int[i] for i in txt_data] # "integer_encoded" is a list which has a sequence converted from an original data to integers.
print(integer_encoded)
print("----------------------------------------------------")
print("data length : ", len(integer_encoded))

In [None]:
# Not actually used.

onehot_encoded = []

for ix in integer_encoded: # ix is an index mapped to a unique character.
    letter = [0 for _ in range(len(chars))] # A list len is equal to the number of unique characters and whose elements are all zero.
    letter[ix] = 1 # 'letter' is a one-hot vector.
    onehot_encoded.append(letter) # Add a 1d list(a vector for one character).
onehot_encoded = np.array(onehot_encoded) # list to np-array

print(onehot_encoded.shape)     #  = (len(data),len(chars))
print(onehot_encoded)

# invert encoding
inverted = int_to_char[np.argmax(onehot_encoded[0])] # "argmax" returns the index of the largest value.
print(inverted)


In [None]:
# hyperparameters

iteration = 5000
sequence_length = 10
batch_size = round((txt_data_size /sequence_length)+0.5) # = math.ceil
hidden_size = 100  # size of hidden layer of neurons.
learning_rate = 1e-1


# model parameters

W_xh = np.random.randn(hidden_size, num_chars)*0.01     # weight input -> hidden.
W_hh = np.random.randn(hidden_size, hidden_size)*0.01   # weight hidden -> hidden
W_hy = np.random.randn(num_chars, hidden_size)*0.01     # weight hidden -> output

b_h = np.zeros((hidden_size, 1)) # hidden bias
b_y = np.zeros((num_chars, 1)) # output bias

h_prev = np.zeros((hidden_size,1)) # h_(t-1)


In [None]:
def forwardprop(inputs, targets, h_prev):

    # Since the RNN receives the sequence, the weights are not updated during one sequence.
    xs, hs, ys, ps = {}, {}, {}, {} # dictionary
    hs[-1] = np.copy(h_prev) # Copy previous hidden state vector to -1 key value.
    loss = 0 # loss initialization

    for t in range(len(inputs)): # t is a "time step" and is used as a key(dic).

        xs[t] = np.zeros((num_chars,1))
        xs[t][inputs[t]] = 1
        hs[t] = np.tanh(np.dot(W_xh, xs[t]) + np.dot(W_hh, hs[t-1]) + b_h) # hidden state.
        ys[t] = np.dot(W_hy, hs[t]) + b_y # unnormalized log probabilities for next chars
        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t])) # probabilities for next chars.
        # Softmax. -> The sum of probabilities is 1 even without the exp() function, but all of the elements are positive through the exp() function.

        loss += -np.log(ps[t][targets[t],0]) # softmax (cross-entropy loss). Efficient and simple code

#         y_class = np.zeros((num_chars, 1))
#         y_class[targets[t]] =1
#         loss += np.sum(y_class*(-np.log(ps[t]))) # softmax (cross-entropy loss)

    return loss, ps, hs, xs

In [None]:


def backprop(ps, inputs, hs, xs):

    dWxh, dWhh, dWhy = np.zeros_like(W_xh), np.zeros_like(W_hh), np.zeros_like(W_hy) # make all zero matrices.
    dbh, dby = np.zeros_like(b_h), np.zeros_like(b_y)
    dhnext = np.zeros_like(hs[0]) # (hidden_size,1)

    # reversed
    for t in reversed(range(len(inputs))):
        dy = np.copy(ps[t]) # shape (num_chars,1).  "dy" means "dloss/dy"
        dy[targets[t]] -= 1 # backprop into y. After taking the soft max in the input vector, subtract 1 from the value of the element corresponding to the correct label.
        dWhy += np.dot(dy, hs[t].T)
        dby += dy
        dh = np.dot(W_hy.T, dy) + dhnext # backprop into h.
        dhraw = (1 - hs[t] * hs[t]) * dh # backprop through tanh nonlinearity #tanh'(x) = 1-tanh^2(x)
        dbh += dhraw
        dWxh += np.dot(dhraw, xs[t].T)
        dWhh += np.dot(dhraw, hs[t-1].T)
        dhnext = np.dot(W_hh.T, dhraw)
    for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
        np.clip(dparam, -5, 5, out=dparam) # clip to mitigate exploding gradients.

    return dWxh, dWhh, dWhy, dbh, dby



In [None]:
data_pointer = 0

# memory variables for Adagrad
mWxh, mWhh, mWhy = np.zeros_like(W_xh), np.zeros_like(W_hh), np.zeros_like(W_hy)
mbh, mby = np.zeros_like(b_h), np.zeros_like(b_y)


for i in range(iteration):
    h_prev = np.zeros((hidden_size,1)) # reset RNN memory
    data_pointer = 0 # go from start of data

    for b in range(batch_size):

        inputs = [char_to_int[ch] for ch in txt_data[data_pointer:data_pointer+sequence_length]]
        targets = [char_to_int[ch] for ch in txt_data[data_pointer+1:data_pointer+sequence_length+1]] # t+1

        if (data_pointer+sequence_length+1 >= len(txt_data) and b == batch_size-1): # processing of the last part of the input data.
#             targets.append(char_to_int[txt_data[0]])   # When the data doesn't fit, add the first char to the back.
            targets.append(char_to_int[" "])   # When the data doesn't fit, add space(" ") to the back.


        # forward
        loss, ps, hs, xs = forwardprop(inputs, targets, h_prev)
#         print(loss)

        # backward
        dWxh, dWhh, dWhy, dbh, dby = backprop(ps, inputs, hs, xs)


    # perform parameter update with Adagrad
        for param, dparam, mem in zip([W_xh, W_hh, W_hy, b_h, b_y],
                                    [dWxh, dWhh, dWhy, dbh, dby],
                                    [mWxh, mWhh, mWhy, mbh, mby]):
            mem += dparam * dparam # elementwise
            param += -learning_rate * dparam / np.sqrt(mem + 1e-8) # adagrad update

        data_pointer += sequence_length # move data pointer

    if i % 100 == 0:
        print ('iter %d, loss: %f' % (i, loss)) # print progress

In [None]:
def predict(test_char, length):
    x = np.zeros((num_chars, 1))
    x[char_to_int[test_char]] = 1
    ixes = []
    h = np.zeros((hidden_size,1))

    for t in range(length):
        h = np.tanh(np.dot(W_xh, x) + np.dot(W_hh, h) + b_h)
        y = np.dot(W_hy, h) + b_y
        p = np.exp(y) / np.sum(np.exp(y))
        ix = np.random.choice(range(num_chars), p=p.ravel()) # ravel -> rank0
        # "ix" is a list of indexes selected according to the soft max probability.
        x = np.zeros((num_chars, 1)) # init
        x[ix] = 1
        ixes.append(ix) # list
    txt = ''.join(int_to_char[i] for i in ixes)
    print ('----\n %s \n----' % (txt, ))

In [None]:
predict('a',30) # (char, len of output)

In [None]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class RNN(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding(input_size, input_size)
        self.rnn = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers)
        self.decoder = nn.Linear(hidden_size, output_size)

    def forward(self, input_seq, hidden_state):
        embedding = self.embedding(input_seq)
        output, hidden_state = self.rnn(embedding, hidden_state)
        output = self.decoder(output)
        return output, (hidden_state[0].detach(), hidden_state[1].detach())

def train():
    ########### Hyperparameters ###########
    hidden_size = 512   # size of hidden state
    seq_len = 100       # length of LSTM sequence
    num_layers = 3      # num of layers in LSTM layer stack
    lr = 0.002          # learning rate
    epochs = 100        # max number of epochs
    op_seq_len = 200    # total num of characters in output test sequence
    load_chk = False    # load weights from save_path directory to continue training
    save_path = "charRNN_shakespeare.pth"
    data_path = "input.txt"
    #######################################

    # load the text file
    data = open(data_path, 'r').read()
    chars = sorted(list(set(data)))
    data_size, vocab_size = len(data), len(chars)
    print("----------------------------------------")
    print("Data has {} characters, {} unique".format(data_size, vocab_size))
    print("----------------------------------------")

    # char to index and index to char maps
    char_to_ix = { ch:i for i,ch in enumerate(chars) }
    ix_to_char = { i:ch for i,ch in enumerate(chars) }

    # convert data from chars to indices
    data = list(data)
    for i, ch in enumerate(data):
        data[i] = char_to_ix[ch]

    # data tensor on device
    data = torch.tensor(data).to(device)
    data = torch.unsqueeze(data, dim=1)

    # model instance
    rnn = RNN(vocab_size, vocab_size, hidden_size, num_layers).to(device)

    # load checkpoint if True
    if load_chk:
        rnn.load_state_dict(torch.load(save_path))
        print("Model loaded successfully !!")
        print("----------------------------------------")

    # loss function and optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(rnn.parameters(), lr=lr)

    # training loop
    for i_epoch in range(1, epochs+1):

        # random starting point (1st 100 chars) from data to begin
        data_ptr = np.random.randint(100)
        n = 0
        running_loss = 0
        hidden_state = None

        while True:
            input_seq = data[data_ptr : data_ptr+seq_len]
            target_seq = data[data_ptr+1 : data_ptr+seq_len+1]

            # forward pass
            output, hidden_state = rnn(input_seq, hidden_state)

            # compute loss
            loss = loss_fn(torch.squeeze(output), torch.squeeze(target_seq))
            running_loss += loss.item()

            # compute gradients and take optimizer step
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # update the data pointer
            data_ptr += seq_len
            n +=1

            # if at end of data : break
            if data_ptr + seq_len + 1 > data_size:
                break

        # print loss and save weights after every epoch
        print("Epoch: {0} \t Loss: {1:.8f}".format(i_epoch, running_loss/n))
        torch.save(rnn.state_dict(), save_path)

        # sample / generate a text sequence after every epoch
        data_ptr = 0
        hidden_state = None

        # random character from data to begin
        rand_index = np.random.randint(data_size-1)
        input_seq = data[rand_index : rand_index+1]

        print("----------------------------------------")
        while True:
            # forward pass
            output, hidden_state = rnn(input_seq, hidden_state)

            # construct categorical distribution and sample a character
            output = F.softmax(torch.squeeze(output), dim=0)
            dist = Categorical(output)
            index = dist.sample()

            # print the sampled character
            print(ix_to_char[index.item()], end='')

            # next input is current output
            input_seq[0][0] = index.item()
            data_ptr += 1

            if data_ptr > op_seq_len:
                break

        print("\n----------------------------------------")

train()



In [None]:
import torch
import torch.nn as nn

class BiRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size):
        super(BiRNN, self).__init__()
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.birnn = nn.RNN(embedding_dim, hidden_dim, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        birnn_out, _ = self.birnn(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(birnn_out.view(len(sentence), -1))
        tag_scores = nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores


In [None]:
training_data = [(["The", "cat", "sat", "on", "the", "mat"], ["DET", "NOUN", "VERB", "ADP", "DET", "NOUN"]),
                 (["The", "dog", "ate", "my", "homework"], ["DET", "NOUN", "VERB", "ADJ", "NOUN"])]


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the training data
train_data = [("The", "DET"), ("dog", "NOUN"), ("chased", "VERB"), ("the", "DET"), ("cat", "NOUN")]

# Define the vocabulary
vocab = set(word for word, tag in train_data)

# Define the tagset
tagset = set(tag for word, tag in train_data)

# Define the mapping between words and indices
word_to_idx = {word: i for i, word in enumerate(vocab)}

# Define the mapping between tags and indices
tag_to_idx = {tag: i for i, tag in enumerate(tagset)}

# Define the PyTorch model
class SimpleTagger(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim):
        super(SimpleTagger, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores

# Instantiate the model
EMBEDDING_DIM = 10
HIDDEN_DIM = 10
model = SimpleTagger(len(vocab), len(tagset), EMBEDDING_DIM, HIDDEN_DIM)

# Define the loss function and optimizer
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

# Train the model
for epoch in range(100):
    for sentence, tags in train_data:
        # Convert the sentence and tags to PyTorch tensors
        sentence_in = torch.tensor([word_to_idx[word] for word in sentence], dtype=torch.long)
        targets = torch.tensor([tag_to_idx[tag] for tag in tags], dtype=torch.long)

        # Clear out the gradients
        model.zero_grad()

        # Run the forward pass
        tag_scores = model(sentence_in)

        # Compute the loss, gradients, and update the parameters
        loss = loss_function(tag_scores, targets)
        loss.backward()
        optimizer.step()

# Test the model on a new sentence
test_sentence = "The cat sat on the mat".split()
test_sentence_in = torch.tensor([word_to_idx[word] for word in test_sentence], dtype=torch.long)
tag_scores = model(test_sentence_in)
_, predicted_tags = torch.max(tag_scores, dim=1)
predicted_tags = [list(tag_to_idx.keys())[list(tag_to_idx.values()).index(idx)] for idx in predicted_tags]
print(test_sentence)
print(predicted_tags)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the LSTM-based POS tagger model
class LSTMTagger(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, tagset_size):
        super(LSTMTagger, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = nn.functional.log_softmax(tag_space, dim=1)
        return tag_scores

# Define the training function
def train(model, optimizer, loss_function, sentences, tags, num_epochs):
    for epoch in range(num_epochs):
        for sentence, tag in zip(sentences, tags):
            model.zero_grad()
            sentence = torch.tensor(sentence, dtype=torch.long)
            tag = torch.tensor(tag, dtype=torch.long)
            tag_scores = model(sentence)
            loss = loss_function(tag_scores, tag)
            loss.backward()
            optimizer.step()

# Define the evaluation function
def evaluate(model, sentences, tags):
    correct = 0
    total = 0
    with torch.no_grad():
        for sentence, tag in zip(sentences, tags):
            sentence = torch.tensor(sentence, dtype=torch.long)
            tag = torch.tensor(tag, dtype=torch.long)
            tag_scores = model(sentence)
            _, predicted = torch.max(tag_scores.data, 1)
            total += tag.size(0)
            correct += (predicted == tag).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# Example usage
# Define a sample corpus and its corresponding tags
corpus = [
    "The cat sat on the mat",
    "The dog chased the cat",
    "The mouse ran away from the cat",
    "The cat purred",
]
tags = [
    "DET NOUN VERB ADP DET NOUN",
    "DET NOUN VERB DET NOUN",
    "DET NOUN VERB ADV ADP DET NOUN",
    "DET NOUN VERB",
]
# Define the vocabulary and POS tagset
word_to_ix = {"<PAD>": 0, "<UNK>": 1, "The": 2, "cat": 3, "sat": 4, "on": 5, "the": 6,
              "mat": 7, "dog": 8, "chased": 9, "mouse": 10, "ran": 11, "away": 12, "from": 13, "purred": 14}
tag_to_ix = {"<PAD>": 0, "<UNK>": 1, "DET": 2, "NOUN": 3, "VERB": 4, "ADP": 5, "ADV": 6}

# Convert the corpus and tagset to indices
sentences = [[word_to_ix.get(word, word_to_ix["<UNK>"]) for word in sentence.split()] for sentence in corpus]
tags = [[tag_to_ix.get(tag, tag_to_ix["<UNK>"]) for tag in sentence.split()] for sentence in tags]

# Set hyperparameters and create model, optimizer, and loss function instances
vocab_size = len(word_to_ix)
embedding_dim = 16
hidden_dim = 16
tagset_size = len(tag_to_ix)
num_epochs = 10
learning_rate = 0.1
model = LSTMTagger(vocab_size, embedding_dim, hidden_dim, tagset_size)
optimizer = optim.SGD(model.parameters(), lr=learning_rate)
loss_function = nn.NLLLoss()
#Train the model
train(model, optimizer, loss_function, sentences, tags, num_epochs)

#Evaluate the model on the same corpus
accuracy = evaluate(model, sentences, tags)
print(f"Accuracy: {accuracy:.2f}%")
