In [1]:
import torch
import torch.nn as nn
import re

In [2]:
class TextReader:
    def __init__(self,filename,code_file=False,lower_case=False):
        self.filename = filename
        self.code_file = code_file
        self.lower_case = lower_case
        
    def get_all_words(self,regex_params=None):
        '''
        regex_params = {
            "uppercase": True,
            "digits":False,
            "punctuation_list":None
        }
        '''
        if regex_params:
            regex_string = "a-z"
            if regex_params["uppercase"]: regex_string += "A-Z"
            if regex_params["digits"]: regex_string += "0-9"
            if regex_params["punctuation_list"]: regex_string += "".join(regex_params["punctuation_list"])
            regex_string = "[" + regex_string +"]"
            with open(self.filename,"r") as f: text = f.read()
            words = re.findall(regex_string,text)
            return words
        else:
            with open(self.filename,"r") as f: words = f.read().split(" ")
            return words + [" "]
        
    def get_unique_words(self,regex_params=None, distinguish_casing=False):
        '''
        regex_params = {
            "uppercase": True,
            "digits":False,
            "punctuation_list":None
        }
        '''
        all_words = self.get_all_words(regex_params)
        if not distinguish_casing: return list(set([word.lower() for word in all_words]))
        else: return list(set(all_words))
    
    
    def get_X_and_Y(self,window_size=10):
        if self.code_file:
            X,Y = [],[]
            with open(self.filename,"r",errors="ignore") as f:
                for line in f:
                    words = line.strip().split() + ["\n"]
                    if len(words) <= 2: continue
                    x_w, y_w = [],[]
                    if len(words) > window_size:
                        for i in range(0,len(words)-window_size):
                            x_w.append(words[i:i+window_size])
                            y_w.append(words[i+1:i+1+window_size])
                    else:
                        x_w.append(words[:-1])
                        y_w.append(words[1:])
                    X += x_w
                    Y += y_w
                return X,Y
        else:
            with open(self.filename,"r",errors="ignore") as f:
                if self.lower_case: text = f.read().lower().split()
                else: text = f.read().split()
                X,Y = [],[]
                for i in range(len(text)-window_size):
                    X.append(text[i:i+window_size])
                    Y.append(text[i+1:window_size+i+1])
                return X,Y

In [4]:
text_reader = TextReader("data1.txt",code_file=True,lower_case=False)
X,Y = text_reader.get_X_and_Y(window_size=5)
print(len(X),len(Y),X[:4],Y[:4],X[-4:],Y[-4:])

437 437 [['import', 'torch'], ['import', 'torch.nn', 'as', 'nn'], ['from', 'sklearn.metrics', 'import', 'f1_score,', 'classification_report'], ['class', 'DataLoader:']] [['torch', '\n'], ['torch.nn', 'as', 'nn', '\n'], ['sklearn.metrics', 'import', 'f1_score,', 'classification_report', '\n'], ['DataLoader:', '\n']] [['"Jim', 'Prakash', 'is', 'talking', 'at'], ['Prakash', 'is', 'talking', 'at', 'Delhi".split("'], ['is', 'talking', 'at', 'Delhi".split("', '")'], ['print(predict(model,sentences_for_predictions_1,', 'max_length=10))']] [['Prakash', 'is', 'talking', 'at', 'Delhi".split("'], ['is', 'talking', 'at', 'Delhi".split("', '")'], ['talking', 'at', 'Delhi".split("', '")', '\n'], ['max_length=10))', '\n']]


In [5]:
class VocabBuilder:
    def __init__(self, X,Y, unknown_token="<UNK>",pad_token="<PAD>"):
        self.X = X
        self.Y = Y
        self.unknown_token = unknown_token
        self.pad_token = pad_token
    
    def get_word_vocab(self,for_X=False, for_Y=False, for_both=False):
        word_to_index, index_to_word = {},{}
        if for_X: all_words = list(set([word for el in self.X for word in el]))
        if for_Y: all_words = list(set([word for el in self.Y for word in el]))
        if for_both: all_words = list(set([word for el in self.Y for word in el] + [word for el in self.X for word in el]))
        for i,word in enumerate(all_words):
            word_to_index[word] = i
            index_to_word[i] = word
        len_vocab = len(word_to_index)
        word_to_index[self.unknown_token] = len_vocab
        index_to_word[len_vocab] = self.unknown_token
        word_to_index[self.pad_token] = len_vocab+1
        index_to_word[len_vocab+1] = self.pad_token
        return word_to_index, index_to_word

In [7]:
vocab_builder = VocabBuilder(X,Y)
X_w, X_i = vocab_builder.get_word_vocab(for_X=True)
Y_w, Y_i = vocab_builder.get_word_vocab(for_Y=True)
A_w, A_i = vocab_builder.get_word_vocab(for_both=True)
print(len(X_w),len(Y_w),len(A_w))
print(A_w['<PAD>'])

548 452 549
548


In [8]:
class GenerateEncoding:
    def __init__(self,data_x,vocab_x,data_y,vocab_y,unknown_token):
        self.data_x = data_x
        self.vocab_x = vocab_x
        self.data_y = data_y
        self.vocab_y = vocab_y
        self.unknown_token = unknown_token
        self.pure_vocab_x = self.remove_unknown_token_from_voab(vocab_x)
        self.pure_vocab_y = self.remove_unknown_token_from_voab(vocab_y)
    
    def remove_unknown_token_from_voab(self,vocab):
        return {k:v for k,v in vocab.items() if k != self.unknown_token}
        
    def get_encoding_X(self,raw_text=None):
        if raw_text: data_to_encode = raw_text
        else: data_to_encode = self.data_x
        encoded_X = []
        for word_list in data_to_encode:
            word_encoding = []
            for word in word_list: 
                if word not in self.pure_vocab_x: word_encoding.append(self.vocab_x[self.unknown_token])
                else: word_encoding.append(self.vocab_x[word])
            encoded_X.append(word_encoding)
        return encoded_X
    
    def get_encoding_Y(self,raw_text=None):
        if raw_text: data_to_encode = raw_text
        else: data_to_encode = self.data_y
        encoded_Y = []
        for word_list in data_to_encode:
            word_encoding = []
            for word in word_list: 
                if word not in self.pure_vocab_y: word_encoding.append(self.vocab_y[self.unknown_token])
                else: word_encoding.append(self.vocab_y[word])
            encoded_Y.append(word_encoding)
        return encoded_Y

In [9]:
encoding_generator = GenerateEncoding(data_x=X,data_y=Y,vocab_x=A_w,vocab_y=A_w, unknown_token="<UNK>")
X_enc = encoding_generator.get_encoding_X()
Y_enc = encoding_generator.get_encoding_Y()
print(len(X_enc),len(Y_enc),X_enc[:4],Y_enc[:4],X_enc[-4:],Y_enc[-4:])

437 437 [[357, 262], [357, 166, 217, 36], [322, 474, 357, 511, 276], [359, 29]] [[262, 397], [166, 217, 36, 397], [474, 357, 511, 276, 397], [29, 397]] [[285, 508, 139, 400, 52], [508, 139, 400, 52, 152], [139, 400, 52, 152, 501], [96, 148]] [[508, 139, 400, 52, 152], [139, 400, 52, 152, 501], [400, 52, 152, 501, 397], [148, 397]]


In [None]:
class BatchGenerator:
    def __init__(self,X,Y,batch_size):
        self.X = X
        self.Y = Y
        self.batch_size = batch_size
    
    def get_batch(self,batch_index,make_tensor=False):
        Xb = self.X[batch_index*self.batch_size:(batch_index+1)*self.batch_size]
        Yb = self.Y[batch_index*self.batch_size:(batch_index+1)*self.batch_size]
        if make_tensor: return torch.tensor(Xb),torch.tensor(Yb)
        return Xb,Yb

In [None]:
class MyWordLevelRNNModel(nn.Module):
    def __init__(self,vocab_size, embedding_dim, lstm_neurons, num_lstm_layers, num_classes,
                 make_birectional=False, debug_mode=False):
        super().__init__()
        self.debug_mode = debug_mode
        self.bidirectional = make_birectional
        self.lstm_neurons = lstm_neurons
        self.num_lstm_layers = num_lstm_layers
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=lstm_neurons, 
                            num_layers=num_lstm_layers, bidirectional=make_birectional, batch_first=True)
        
        in_features = lstm_neurons
        if self.bidirectional: in_features = 2*lstm_neurons
        self.linear1 = nn.Linear(in_features=in_features, out_features=100)
        self.relu = nn.LeakyReLU()
        self.linear2 = nn.Linear(in_features=100, out_features=num_classes)
        self.log_softmax = nn.LogSoftmax(dim=1)
    
    def forward(self,x,ht,ct):
        if self.debug_mode: print("Before embedding layer:",x.shape)
        x = self.embedding(x)
        if self.debug_mode: print("After embedding layer:",x.shape)
        x, (ht, ct) = self.lstm(x,(ht,ct))
        if self.debug_mode: print("After lstm layer:",x.shape,ht.shape,ct.shape)
        x = x.reshape(-1, x.shape[2])
        if self.debug_mode: print("After reshaping:",x.shape)
        x = self.linear1(x)
        x = self.relu(x)
        if self.debug_mode: print("After 1st linear layer:",x.shape)
        x = self.linear2(x)
        x = self.log_softmax(x)
        if self.debug_mode: print("After 2nd linear layer:",x.shape)
        return x, ht,ct
    
    def init_state_of_lstm(self,batch_size):
        if self.bidirectional: first_param = 2*self.num_lstm_layers
        else: first_param = self.num_lstm_layers
        return (
            torch.randn(first_param, batch_size, self.lstm_neurons),
            torch.randn(first_param, batch_size, self.lstm_neurons),
        )

In [None]:
# text_reader = TextReader("data1.txt")
text_reader = TextReader("data2.txt",code_file=False,lower_case=True)
window_size = 10
X,Y = text_reader.get_X_and_Y(window_size=window_size)
unknown_token = "<UNK>"
pad_token = "<PAD>"
vocab_builder = VocabBuilder(X,Y,unknown_token=unknown_token,pad_token=pad_token)
word_to_index, index_to_word = vocab_builder.get_word_vocab(for_both=True)
print(len(word_to_index),len(index_to_word))

encoding_generator = GenerateEncoding(
    data_x=X,data_y=Y,vocab_x=word_to_index,vocab_y=word_to_index, unknown_token=unknown_token
)
X_enc = encoding_generator.get_encoding_X()
Y_enc = encoding_generator.get_encoding_Y()
print(len(X_enc),len(Y_enc),X_enc[:4],Y_enc[:4],X_enc[-4:],Y_enc[-4:])

batch_size = 5
batch_generator = BatchGenerator(X_enc,Y_enc,batch_size)
Xb,Yb = batch_generator.get_batch(batch_index=1)
print(len(Xb),len(Xb[0]),len(Yb),len(Yb[0]))

In [None]:
epochs = 100
batch_size = 8
batch_generator = BatchGenerator(X_enc,Y_enc,batch_size)
num_batches = len(X_enc)//batch_size
embedding_dim = 50
vocab_size = len(index_to_word)
num_classes = len(index_to_word)
num_lstm_layers = 4
lstm_neurons = 100
make_bidirectional = False

In [None]:
model = MyWordLevelRNNModel(vocab_size=vocab_size, embedding_dim=embedding_dim, lstm_neurons=lstm_neurons, 
                   num_lstm_layers=num_lstm_layers, num_classes = num_classes,
                   make_birectional=make_bidirectional, debug_mode=True)
optimizer = torch.optim.Adam(model.parameters(),lr=0.1)
loss_function = nn.NLLLoss()
(ht,ct) = model.init_state_of_lstm(batch_size)
Y_actual, Y_pred = [], []

optimizer.zero_grad()
Xb, Yb = batch_generator.get_batch(2,make_tensor=True)

op, ht,ct = model(Xb,ht,ct)
print(op.shape)
# print(op[0])
Yb = Yb.reshape(-1)
print(op.shape, Yb.shape)
loss = loss_function(op, Yb)
print(loss)
ht = ht.detach()
ct = ct.detach()
loss.backward()
optimizer.step()
Y_pred += [int(el) for el in torch.argmax(op,axis=1)]
Y_actual += [int(el) for el in Yb]

optimizer.zero_grad()
Xb, Yb = batch_generator.get_batch(3,make_tensor=True)
op, ht,ct = model(Xb,ht,ct)
print(op.shape)
# print(op[0])
Yb = Yb.reshape(-1)
print(op.shape, Yb.shape)
loss = loss_function(op, Yb)
print(loss)
ht = ht.detach()
ct = ct.detach()
loss.backward()
optimizer.step()
Y_pred += [int(el) for el in torch.argmax(op,axis=1)]
Y_actual += [int(el) for el in Yb]

In [None]:
model = MyWordLevelRNNModel(vocab_size=vocab_size, embedding_dim=embedding_dim, lstm_neurons=lstm_neurons, 
                   num_lstm_layers=num_lstm_layers, num_classes = num_classes,
                   make_birectional=make_bidirectional, debug_mode=False)
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
loss_function = nn.NLLLoss()

In [None]:
for e in range(epochs):
    model.train()
    (ht,ct) = model.init_state_of_lstm(batch_size)
    epoch_loss = 0
    Y_actual, Y_pred = [], []
    for i in range(num_batches):
        if i%20 == 0: print(i, end=' ')
        optimizer.zero_grad()
        Xb, Yb = batch_generator.get_batch(i,make_tensor=True)
        op, ht,ct = model(Xb,ht,ct)
        Yb = Yb.reshape(-1)
        loss = loss_function(op, Yb)
        epoch_loss += loss.item()
        ht = ht.detach()
        ct = ct.detach()
        loss.backward()
        optimizer.step()
    print("\nEpoch: {}, Loss: {}".format(e+1,epoch_loss))

In [None]:
from copy import deepcopy

In [None]:
test_string = ["we believe in the power of books".split(" ")]
test_string_enc = encoding_generator.get_encoding_X(raw_text=test_string)
pred_op = deepcopy(test_string_enc)
print(pred_op)

model.eval()
if make_bidirectional: first_param = 2*num_lstm_layers
else: first_param = num_lstm_layers
ht_pred = torch.randn(first_param, 1, lstm_neurons)
ct_pred = torch.randn(first_param, 1, lstm_neurons)

unigram = True # unigram will work, becuase it is a statefulRNN (ht and ct is getting updated for every character)
window_size = 5
num_chars = len(test_string[0])+500

with torch.no_grad():
    for i in range(num_chars):
        input_vec = torch.tensor([pred_op[0][i:i+1]])
        op,ht_pred,ct_pred = model(input_vec,ht_pred,ct_pred)
        op = torch.argmax(op,axis=1).tolist()
        if i >= len(test_string_enc[0])-1: pred_op[0].append(op[0])
    pred_word = " ".join([index_to_word[el] for el in pred_op[0]])
    print(pred_word)

In [None]:
test_string = ["reading about ai from a book should be good way to learn".split(" ")]
test_string_enc = encoding_generator.get_encoding_X(raw_text=test_string)
pred_op = deepcopy(test_string_enc)
print(pred_op)

model.eval()
if make_bidirectional: first_param = 2*num_lstm_layers
else: first_param = num_lstm_layers
ht_pred = torch.randn(first_param, 1, lstm_neurons)
ct_pred = torch.randn(first_param, 1, lstm_neurons)

unigram = True # unigram will work, becuase it is a statefulRNN (ht and ct is getting updated for every character)
window_size = 5
num_chars = len(test_string[0])+500

with torch.no_grad():
    for i in range(num_chars):
        input_vec = torch.tensor([pred_op[0][i:i+1]])
        op,ht_pred,ct_pred = model(input_vec,ht_pred,ct_pred)
        op = torch.argmax(op,axis=1).tolist()
        if i >= len(test_string_enc[0])-1: pred_op[0].append(op[0])
    pred_word = " ".join([index_to_word[el] for el in pred_op[0]])
    print(pred_word)