In [None]:
# Importing Libraries

import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 

import time
import os
import re

from tqdm.notebook import tqdm

In [None]:
##### Configuration

class CFG:
    PATH = '../input/harry-potter/'
    MAX_LEN = 100
    GRAD_MIN = -5
    GRAD_MAX = 5
    HIDDEN_SIZE = 256
    LR = 3e-4
    TRAIN_SIZE_FRACTION = 0.9
    EPOCHS = 5
    TEMPERATURE = 0.5
    
    SEED = 42
    
    BETA1 = 0.9
    BETA2 = 0.999
    EPS = 1e-8
    USE_ADAM = True

In [None]:
# For Reproducing results

def seed_everything(seed):
    np.random.seed(seed)

seed_everything(CFG.SEED)

In [None]:
# Function to load book at 'path'

def load_book(path):
    with open(path, encoding='latin1') as f:
        book = f.read()
    return book

In [None]:
# Loading books

book_names = os.listdir(CFG.PATH)
books = [load_book(CFG.PATH+book_name) for book_name in book_names]
print("Number of books: ",len(books))

In [None]:
# Preliminary Analysis for preprocessing

for i in range(len(books)):
    if i==0:
        s = set(books[i])
    else:
        s.update(set(books[i]))

print('Set of unique characters: ', end='\n\n')
print(s)

In [None]:
# Preprocess text

def preprocess(text):

    text = re.sub(r'[{}@_*>()\\#%+=\[\]]','', text)
    text = re.sub('a0','', text)
    text = re.sub('\'92t','\'t', text)
    text = re.sub('\'92s','\'s', text)
    text = re.sub('\'92m','\'m', text)
    text = re.sub('\'92ll','\'ll', text)
    text = re.sub('\'91','', text)
    text = re.sub('\'92','', text)
    text = re.sub('\x93','',text)  
    text = re.sub('\x96','',text)  
    text = re.sub('\'93','', text)
    text = re.sub('\'94','', text)
    text = re.sub('\.','. ', text)
    text = re.sub('\!','! ', text)
    text = re.sub('\?','? ', text)
    text = re.sub(' +',' ', text)
    text = re.sub(r'[`^~\x90\x91\x92\x95\xad\x1f¦$«»éü0123456789/]', '', text)
    
    return text

In [None]:
# Preprocessing books

cleaned_books = []

for book in books:
    cleaned_books.append(preprocess(book))


In [None]:
# Doing a quick check on preprocessing

print(books[0][:1000])
print()
print('###########')
print()
print(cleaned_books[0][:1000])

In [None]:
# Building Vocabulary

char_to_int = {}
idx = 0

for book in cleaned_books:
    for char in book:
        if char not in char_to_int:
            char_to_int[char] = idx
            idx += 1

int_to_char = {}
for char, idx in char_to_int.items():
    int_to_char[idx] = char

In [None]:
# Some variables to use later

vocab_size = len(int_to_char)
CFG.VOCAB_SIZE = vocab_size

all_unique_chars = list(char_to_int.keys())
all_unique_chars.sort()

print('Set of unique characters after preprocessing:', end='\n\n')
print(all_unique_chars)

In [None]:
# Defining inputs and targets for our language model

inputs = []
targets = []

for book in cleaned_books:
    i = 0
    
    while(i + CFG.MAX_LEN < len(book)):
        
        inputs.append(book[i : i + CFG.MAX_LEN])
        targets.append(book[(i+1) : (i+1) + CFG.MAX_LEN])
        i += CFG.MAX_LEN

print('Total Number of Data Points:')
print(len(inputs), '\n')
print("First 5 input/target pairs", '\n')
print(inputs[:5])
print()
print(targets[:5])

In [None]:
# Tokenizing inputs and targets to feed to model

int_inputs = []
int_targets = []

print('Tokenization and Integer representation...')
for i in tqdm(range(len(inputs))):
    int_input = []
    int_target = []
    
    input = inputs[i]
    target = targets[i]
    
    for char_i in range(len(input)):
        int_input.append(char_to_int[input[char_i]])
        int_target.append(char_to_int[target[char_i]])        
    
    int_inputs.append(int_input)
    int_targets.append(int_target)    


In [None]:
# Sanity check for inputs/targets

print(int_inputs[0][:10])
print(int_targets[0][:10])

In [None]:
# Train-Validation split

cnt = CFG.TRAIN_SIZE_FRACTION * len(int_inputs)

train_inputs = int_inputs[ : int(cnt)]
train_targets = int_targets[ : int(cnt)]

valid_inputs = int_inputs[int(cnt) : ]
valid_targets = int_targets[int(cnt) : ]

In [None]:
# Batch generator

def get_batch(inputs, targets):
    
    for batch_num in range(len(inputs)):
        input = inputs[batch_num]
        target = targets[batch_num]        
        
        yield(input, target)

In [None]:
class RNN():
    
    def __init__(self, vocab_size, hidden_size, max_len):
        '''
            DESCRIPTION: 
                Initializing Weights for RNN cell
            INPUTS:
                vocab_size: Size of vocabulary
                hidden_size: Size of hidden nodes( RNN cells )
                max_len: Length of input sequence 
        '''
        self.vocab_size = vocab_size
        self.hidden_size = hidden_size
        self.max_len = max_len
        
        # Initializing parameters by taking reference from Bengio's paper : "Understanding the difficulty of training deep feedforward neural networks [Xavier, Bengio]"
        self.U = np.random.uniform(- np.sqrt(1. / vocab_size), np.sqrt(1. / vocab_size),
                                  size = (hidden_size, vocab_size))
        
        self.W = np.random.uniform(- np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size),
                                  size = (hidden_size, hidden_size))
        
        self.V = np.random.uniform(- np.sqrt(1. / hidden_size), np.sqrt(1. / hidden_size),
                                  size = (vocab_size, hidden_size))
        
        self.b = np.zeros(shape = (hidden_size, 1))
        self.c = np.zeros(shape = (vocab_size, 1))
        
        # Parameters for Adam
        self.mU, self.vU = np.zeros_like(self.U), np.zeros_like(self.U)
        self.mW, self.vW = np.zeros_like(self.W), np.zeros_like(self.W)
        self.mV, self.vV = np.zeros_like(self.V), np.zeros_like(self.V)
        self.mb, self.vb = np.zeros_like(self.b), np.zeros_like(self.b)
        self.mc, self.vc = np.zeros_like(self.c), np.zeros_like(self.c)
        
    
    def forward(self, input, h_prev):
        '''
            Fordward prop. for a fixed number of time-steps
        '''
        xs, hs, os, ypreds = {}, {}, {}, {}
        hs[-1] = np.copy(h_prev)
        
        for t in range(len(input)):
            
            xs[t] = np.zeros((self.vocab_size, 1))
            xs[t][input[t]] = 1                                                          # one-hot representation of input char
            hs[t] = np.tanh(np.dot(self.W, hs[t-1]) + np.dot(self.U, xs[t]) + self.b)    # hidden state: h(t) = tanh(W*h(t-1) + U*X(t))
            os[t] = np.dot(self.V, hs[t]) + self.c                                       # Output: o(t) = V*h(t) + c
            ypreds[t] = self.softmax(os[t])                                              # Predicted: softmax(o(t)) for non-max supression
            
        return xs, hs, os, ypreds
        
    
    def softmax(self, x):
        '''
            Computes softmax of vector x of any dimension
        '''
        p = np.exp(x - np.max(x))
        return p / np.sum(p)
    
    
    def backward(self, xs, hs, ps, ys):
        '''
        Arguments are computed using forward pass
            
            xs : input embeddings for each time step
            hs : hidden states for each time step
            ps : normalized probabilities (softmax) for each time step
            ys : True targets for each time step
        '''
        dU = np.zeros_like(self.U)
        dW = np.zeros_like(self.W)
        dV = np.zeros_like(self.V)
        db = np.zeros_like(self.b)
        dc = np.zeros_like(self.c)
        
        # For last time step, dhnext is zero
        dhnext = np.zeros_like(hs[0])
        
        # Backpropagating in time from the last time step
        for t in reversed(range(self.max_len)):
            
            # Derivative of Loss wrt output o
            dy = np.copy(ps[t])
            dy[ys[t]] -= 1
            
            dc += dy
            dV += np.dot(dy, hs[t].T)
            dh = np.dot(self.V.T, dy) + dhnext
            
            dh_inter = (1 - hs[t] * hs[t]) * dh
            
            db += dh_inter
            dW += np.dot(dh_inter, hs[t-1].T)
            dU += np.dot(dh_inter, xs[t].T) 
            
            # Updating del L/ del h(t+1) for intermediate layers
            dhnext = np.dot(self.W.T, dh_inter)
        
        # Clipping off large gradient values
        for param in [dW, dV, dU, db, dc]:
            np.clip(param, CFG.GRAD_MIN, CFG.GRAD_MAX, out = param)
        
        return dW, dV, dU, db, dc
    
    
    def update_weights(self, t, dW, dV, dU, db, dc):
        '''
            Updating weights using:    SGD / ADAM
        '''
        if CFG.USE_ADAM:
            # ADAM optimizer used
            for param, grad, m, v in zip( [self.W, self.V, self.U, self.b, self.c],
                                          [dW,      dV,         dU,      db,   dc],
                                          [self.mW, self.mV, self.mU, self.mb, self.mc],
                                          [self.vW, self.vV, self.vU, self.vb, self.vc]):


                m = CFG.BETA1 * m + (1. - CFG.BETA1) * grad
                mt = m / (1. - CFG.BETA1**t)
                v = CFG.BETA2 * v + (1 - CFG.BETA2) * (grad**2)
                vt = v / (1. - CFG.BETA2**t)

                # Adam update
                param += - CFG.LR * mt / (np.sqrt(vt) + CFG.EPS)
        
        else:
            # SGD optimizer
            for param, grad in zip( [self.W, self.V, self.U, self.b, self.c],
                                   [dW,      dV,         dU,      db,   dc]):
                
                param += - CFG.LR * grad

In [None]:
# Softmax loss/ Multiclass cross-entropy loss

def criterion(ps, ys):
    '''
        ps: normalized probabilities (softmax) for each time step
        ys: True targets for each time step
    '''
    return sum(-np.log(ps[t][ys[t], 0]) for t in range(CFG.MAX_LEN))
    

In [None]:
# Initializing model object

model = RNN(vocab_size = CFG.VOCAB_SIZE, hidden_size = CFG.HIDDEN_SIZE,
            max_len = CFG.MAX_LEN)



def accuracy(ypreds, target):
    '''
        Computes accuracy of output sequence
    '''
    preds = []
    for t in range(len(ypreds)):
        '''Sampling a word from computed probability distribution over entire vocabulary'''
        preds.append(np.random.choice(range(CFG.VOCAB_SIZE), p = ypreds[t].ravel()))
        
    acc = 0
    for i in range(len(preds)):
        acc += preds[i]==target[i]
    
    return acc*100. / len(preds)



def get_sample(model, h, start_idx, n):
    '''
        Generated sample text given an input character
    '''
    x = np.zeros(shape = (CFG.VOCAB_SIZE, 1))
    x[start_idx] = 1
    result = []
    
    for t in range(n):
        
        h = np.tanh(np.dot(model.W, h) + np.dot(model.U, x) + model.b)    
        o = np.dot(model.V, h) + model.c  
        
        # Sampling using temperature to change model confidence
        o = o / CFG.TEMPERATURE
        
        # Subtracting max of unnormalized probabilities, to prevent NaNs in softmax
        o = o - np.max(o)
        p = np.exp(o) / np.sum(np.exp(o))
        
        # Sampling a word from computed probability distribution over entire vocabulary
        idx = np.random.choice(range(CFG.VOCAB_SIZE), p = p.ravel())

        x = np.zeros(shape = (CFG.VOCAB_SIZE, 1))
        x[idx] = 1
        
        result.append(idx)
    
    txt = ''.join(int_to_char[i] for i in result)
    
    return txt



def train():
    
    train_loss = []
    train_acc = []
    eval_loss = []
    
    iter_ = 1
    start = time.time()
    
    for epoch in range(CFG.EPOCHS):
        
        epoch_loss = 0
        epoch_acc = 0
        
        #training on small subset
        batch = enumerate(get_batch(train_inputs, train_targets))
        
        # Initialize prev hidden state with 0 before every epoch
        hprev = np.zeros(shape = (CFG.HIDDEN_SIZE, 1))
        
        for i, (input, target) in tqdm(batch):
            
            xs, hs, os, ypreds = model.forward(input, hprev)
            loss = criterion(ypreds, target)
            acc = accuracy(ypreds, target)
            epoch_loss += loss / len(train_inputs)
            epoch_acc += acc / len(train_inputs)
            
            dW, dV, dU, db, dc = model.backward(xs, hs, ypreds, target)
            model.update_weights(iter_, dW, dV, dU, db, dc)

            # Storing hprev as prev epoch's last hidden state
            hprev = hs[CFG.MAX_LEN-1]
            iter_ += 1
            
            if iter_%500 == 0:
                sample = get_sample(model, hprev, input[0], CFG.MAX_LEN)
                
                print()
                print('Input:')
                inp = ''.join([int_to_char[idx] for idx in input])
                print(inp)
                print()
                print("SAMPLE:")
                print(sample)
                print()
                print("TARGET")
                true = ''.join([int_to_char[idx] for idx in target])
                print(true)
                print()
#                 acc = _accuracy(sample, true)
                print(f"ITER: {iter_+1}\t\tLoss: {loss}")
                print()
                print()
                
        
        print()
        print('###############')
        print(f"EPOCH: {epoch+1}\tTRAIN_LOSS: {epoch_loss}\tTRAIN_ACC: {epoch_acc}")
        print('#############')
        print()
        train_loss.append(epoch_loss)
        train_acc.append(epoch_acc)
    
    print(f'TOTAL TIME TAKEN for {CFG.EPOCHS} epochs: {time.time()-start}')
    return train_loss, train_acc, hprev
        
        

In [None]:
train_loss, train_acc, hs, grads = train()

In [None]:
plt.plot(train_loss)
plt.title('Training loss')
plt.show()
plt.plot(train_acc)
plt.title('Training acc')
plt.show()

In [None]:
def generate(model, hprev, initial_text, n = 1000):
    '''
        Generates a sentence of length 'n' given initial words
    '''

    input = [char_to_int[char] for char in initial_text]
    
    xs, hs, os, ypreds = model.forward(input, hprev)
    
    result = input

    for t in range(len(input) - 1, len(input) - 1 + n):
        
        hs[t] = np.tanh(np.dot(model.W, hs[t-1]) + np.dot(model.U, xs[t]) + model.b)    
        os[t] = np.dot(model.V, hs[t]) + model.c  
        
        ypreds[t] = model.softmax(os[t])
        idx = np.random.choice(range(CFG.VOCAB_SIZE), p=ypreds[t].ravel())
        xs[t+1] = np.zeros(shape = (CFG.VOCAB_SIZE, 1))
        xs[t+1][idx] = 1
        result.append(idx)

    txt = ''.join(int_to_char[idx] for idx in result)
    
    return txt

In [None]:
for short_text in ['"alohomora" opened', 'After all this time ?', 'Wingardium Leviosa', 'Prakashit Bhavah', 'Voldemort killed']:
    
    hprev = np.zeros(shape = (CFG.HIDDEN_SIZE, 1))
    result = generate(model, hprev, short_text)
    print('INPUT STRING: \t', short_text)
    print()
    print('GENERATED: \n')
    print(result)
    print()
    print('#####'*30)
    print()