In [3]:
#!/usr/bin/env python3
# coding: utf-8
import os
import re
import numpy as np
import gensim
import pandas as pd
from gensim.models import KeyedVectors
from gensim.test.utils import datapath
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
import tornado
import tornado.httpserver
import tornado.ioloop
from tornado import web
import yaml
import requests
import traceback
import json
import nltk
# nltk.download('stopwords')
# nltk.download('punkt')
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
from spellchecker import SpellChecker

# Global vars:
global QUESTION_BANK, WV_EMBEDDINGS
WV_EMBEDDINGS  = None    # = google embeddings : is a 5GB data structure, hence hold only 1 instance
QUESTION_BANK  = None    # = Question_Bank() : holds all the question bank data, hence hold only 1 instance

# for text cleaning
REPLACE_BY_SPACE_RE = re.compile('[/(){}\[\]\|@,;]')
BAD_SYMBOLS_RE = re.compile('[^0-9a-z #+_]')
STOPWORDS = set(stopwords.words('english'))

# for word replacements
REPLACE = {
    'corona'              : 'virus',
    'coronavirus'         : 'virus',
    'covid'               : 'virus',
    'covid-19'            : 'virus',    
}
DICT_FILE = 'EnglishWords.txt'
SPELL = SpellChecker()
SPELL.word_frequency.load_text_file (DICT_FILE)

# optional vocabulary
EMB_VOCAB_SET = None
with open (DICT_FILE, 'r') as f:
    EMB_VOCAB_SET = set (f.readlines())
EMB_VOCAB_SET = {w.strip() for w in EMB_VOCAB_SET}

# Embedding Transformer

In [4]:
def fix_word (word, embeddings=None, isStemming=True, isCorrection=True):
    
    word = str (word)
    stemmer = PorterStemmer ()
    lemmatizer = WordNetLemmatizer ()
    word = word.lower ().strip ()
    if embeddings:
        if not word in embeddings:        
            word = lemmatizer.lemmatize (word)
        if not word in embeddings and isStemming:
            word = stemmer.stem (word)
        if word not in embeddings and isCorrection:
            word = SPELL.correction (word)
    else:
        word = lemmatizer.lemmatize (word)
        if isStemming:
            word = stemmer.stem (word)
        if isCorrection and not word in EMB_VOCAB_SET:
            word = SPELL.correction (word)        
    return word

In [5]:
def text_prepare (text, embeddings=None, isStemming=True, isCorrection=True):
    """
    text: a raw string        
    return: list of strings: modified list of words of the string
    """
    
    text = str (text)
    text = text.lower () # lowercase text
    text = re.sub (REPLACE_BY_SPACE_RE, " ", text) # replace REPLACE_BY_SPACE_RE symbols by space in text
    text = re.sub (BAD_SYMBOLS_RE, "", text) # delete symbols which are in BAD_SYMBOLS_RE from text
    word_tokens = word_tokenize (text)
    # delete stopwords from text
    text = [fix_word (w, embeddings, isStemming, isCorrection) for w in word_tokens if not w in STOPWORDS]
    return text

In [6]:
def sent2vec_wv (sent, embeddings, dim, isStemming, isCorrection):
    """
    sent       : one cleaned sentence (string)
    embeddings : embeddings dict e.g. WV_EMBEDDINGS 
    dim        : (int) dimention of a word vector (= 300 as in the google's pretrained file)
    """
    
    sent = str (sent)
    result = np.zeros (dim)
    if not sent is None and len (sent)>0:
        
        cnt = 0     
        words = text_prepare (sent, embeddings, isStemming, isCorrection)
        for word in words:
            
            if word in embeddings:
                result += np.array (embeddings[word])
                cnt += 1
        if cnt != 0:
            result /= cnt
    return result

In [7]:
# Use Pretrained Google Word2Vec Embeddings
class WVEmbeddingTransformer (BaseEstimator, TransformerMixin):
    
    def __init__ (self, embFile=None, vocabSet=None, isStemming=True, isCorrection=True, isinit=True):
        """
        embFile  : str, filePath
        vocabSet : Set of words (strings). if not None then self.isinit=True else False. 
                    If self.isinit==True then .fit() will do nothing, even when params are provided.
        init:
            if WV_EMBEDDINGS is already init then do not re-init but self.isinit=False.
            self.dim    : int
            self.isinit : boolean
        """
        
        self.isinit = isinit
        self.isStemming = isStemming
        self.isCorrection = isCorrection
        global WV_EMBEDDINGS
        if WV_EMBEDDINGS is None:
            
            WV_EMBEDDINGS = KeyedVectors.load_word2vec_format (datapath (embFile), binary=True)            
            if vocabSet and len (vocabSet)>0:
                
                vocabSet = text_prepare(' '.join(vocabSet), WV_EMBEDDINGS, isStemming, isCorrection)
                self.isinit = True
                temp = {w:WV_EMBEDDINGS[w] for w in vocabSet if w in WV_EMBEDDINGS}
                WV_EMBEDDINGS = temp        
        # fix dimension
        dim = 0
        if not type(WV_EMBEDDINGS) is dict:
            for w in WV_EMBEDDINGS.vocab:
                dim = len(WV_EMBEDDINGS[w])
                break
        else:
            for w in WV_EMBEDDINGS:
                dim = len(WV_EMBEDDINGS[w])
                break
        self.dim = dim
        self.embedding = WV_EMBEDDINGS
        return
    
    
    def fit (self, Xstr, Y=None, **fit_params):
        """
        Xstr : list of raw sentences (strings)
        WV_EMBEDDINGS should be already init, then filter it as per the vocab of the Xstr
        """
        
        if not self.isinit:
            vocabSet = {w for s in Xstr for w in text_prepare (s, self.embedding, True, True)}
            if not vocabSet is None and len (vocabSet)>0:
                temp = {w:self.embedding[w] for w in vocabSet if w in self.embedding}
                self.embedding = temp
            self.isinit = True
        return self
    
    
    def transform (self, Xstr, Y=None, **fit_params):
        """
        Xstr   : list of sentences (strings)
        return : np.array(list of corresponding sentence embeddings (avg. of word embeddings of the sent))
        """
        
        self.embedding = WV_EMBEDDINGS
        X   = [sent2vec_wv (sent, self.embedding, self.dim, self.isStemming, self.isCorrection) for sent in Xstr]
        return np.array(X)

# Nearest Neighbour embedding

In [8]:
# Nearest Neighbour Question Bank
def getCosineNearestNeighbours (q_vec, cand_vecs, k=1, dim=300):
    """
    q_vec : 1 array vec of shape 1*dim
    cand_vecs : an array of N vecs each vec is a row
    returns the index positions of the k closest condidate vecs ones having highest cosine similarity
    """
    
    q_vec       = q_vec.reshape ((1, -1))
    cand_vecs   = cand_vecs.reshape ((-1, dim))
    cosines     = np.array (cosine_similarity (q_vec, cand_vecs)[0])
    merged_list = list (zip (cosines, range (len (cand_vecs))))
    # print(merged_list)
    sorted_list = sorted (merged_list, key=lambda x: x[0], reverse=True)
    result = [b for a,b in sorted_list]
    if len(result) > k:
        result = result[:k]
    return result


class Question_Bank:
    
    def __init__ (self, questionBank_filename):
        
        # vectorize the data and store it in mem nearest neighbor 
        self.df = pd.read_excel (questionBank_filename)
        self.N  = self.df.shape[0]
        self.df.columns = ['QUESTIONS', 'ANSWERS', "META"]
        Xstr = list (self.df['QUESTIONS']) + list (self.df['ANSWERS']) + list (self.df['META'])
        embTransformer = WVEmbeddingTransformer ()
        self.CAND_VECS = embTransformer.transform (Xstr)
        return
    
    def get_nearest_QA_indices (self, qn_str):
        
        qn_str = str (qn_str)
        embTransformer = WVEmbeddingTransformer ()
        q_vec = embTransformer.transform ([qn_str])
        nearest_indices = getCosineNearestNeighbours (q_vec, self.CAND_VECS, k=4)
        qn_idxs = []
        for i in nearest_indices:
            
            if i+1 > 2*self.N:
                if i-2*self.N not in qn_idxs:
                    qn_idxs.append (i-2*self.N)
            elif i+1 > self.N:
                if i-self.N not in qn_idxs:
                    qn_idxs.append (i-self.N)
            elif i>=0 and i not in qn_idxs:
                qn_idxs.append (i)
        return qn_idxs

    def ask (self, qn_str):
        
        qn_str = str (qn_str)
        qn_idxs = self.get_nearest_QA_indices (qn_str)
        df = self.df.iloc[qn_idxs].reset_index (drop=True)
        answer = 'NOT_FOUND'
        suggestions = []
        if df.shape[0]>0:
            answer = str (df["ANSWERS"][0])
        if df.shape[0]>1:
            suggestions = df.loc[1:, ["QUESTIONS", "ANSWERS"]].reset_index(drop=True)
        return answer, suggestions

# Bert Pretrained Word Embeddings

In [10]:
# Bert server start
# !bert-serving-start -cpu -max_batch_size 16 -model_dir D:/Software/uncased_L-12_H-768_A-12

In [11]:
# Bert client
# bc = BertClient ()
# bc.encode (['First do it', 'then do it right', 'then do it better'])
# bert_emb = bc.encode (['All payments of the indebtedness evidenced by this Note, other than regularly scheduled payments, shall be applied to such indebtedness in the order of their maturities.'])
# bert_emb.shape

In [None]:
class Text_cleaner_transformer (BaseEstimator, TransformerMixin):
    
    def __init__(self, isStemming=True, isCorrection=False, embedding=None):
        
        self.isStemming   = isStemming
        self.isCorrection = isCorrection
        self.embedding    = embedding
        return
    
    def fit (self, Xstr, y=None, **fit_params):
        return self
    
    def transform (self, Xstr, y=None, **fit_params):
        
        Xstr   = [' '.join (text_prepare (sent, self.embedding, self.isStemming, self.isCorrection)) for sent in Xstr]
        return Xstr

In [None]:
def preProcess (rawfile, newFileName):
    
    df = pd.read_csv (rawfile)
    Xstr = df['text']
    text_cleaner = Text_cleaner_transformer (isCorrection=True, embedding=WV_EMBEDDINGS)
    Xstr = text_cleaner.transform (Xstr)
    df['text'] = Xstr
    pd.write_csv (newFileName)
    return