In [46]:
import warnings

warnings.filterwarnings("ignore")

In [47]:
import spacy
from spacy import displacy
import nltk
import random
import contractions
import langdetect
import pandas as pd
import numpy as np
import string
import re
from textblob import TextBlob
from selenium import webdriver
import unicodedata
import arrow
from pattern.en import number as num_string_to_digit
from langdetect.lang_detect_exception import LangDetectException
import time
import matplotlib.pyplot as plt
import networkx as nx
import pickle
import tensorflow as tf

<h2 style="color:red">List of Constants</h2>

In [48]:
class Constants:
    IGNORED_WORDS = ["what", "who", "when", "how", "which", "whom", "where"]
    LANGUAGE_SUPPORTED = ["en"]
        
    # nltk chunk regexp
    NOUN_PHRASE_REGX_PATTERN = {"label": "Noun Phrase", "include": "{<DT>*<NN.*|JJ.*>*<NN.*>}", "exclude": "}<VB.*|JJ.*|RB.*|IN|DT>+{", "tag": "NP"}
    WH_PRO_REGX_PATTERN = {"label": "Wh-pronoun", "example": ["who", "which", "whome", "whose", "what"], "include": "{<WP.*>}", "tag": "WH_PRO", "valid_words": ["what"]}
    WH_DET_REGX_PATTERN = {"label": "Wh-determiner", "example": ["what", "which", "whose"], "include": "{<WDT>}", "tag": "WH_DET", "valid_words": ["what"]}
    WH_ADV_REGX_PATTERN = {"label": "Wh-adverb", "example": ["how", "why", "where", "when"], "include": "{<WRB>}", "tag": "WH_ADV", "valid_words": ["how"]}
    PERIOD_REGX_PATTERN = {"label": "Time Periods", "include": "{<DT>*<JJ.>*<CD>*<NN.>}", "tag": "PERIOD"}
    
    # Revenue chunk extractors WDT
    KPI_EXACT_REGX_PATTERN = {"label": "KPI", "include": ["{<WP.*>?<VB.*>*<PRP.><NN.*>}", "{<WP.*>*<VB.*>*<PRP.><JJ.*>*<NN.*><NN.*>?}"], "tag": "KPI_EXACT"}
    
    # Google search noun subjects
    SEARCH_REGX_PATTERN = {"label": "Google Search", "include": ["{<WP.*|WDT|WRB><VB.*><JJ.*>*<NN.*><IN>?<NN.*>?}", "{<IN|UH>*<VB.*><RP>*<IN|UH>?<DT>?<PRP>?<JJ.*>*<IN>?<NN.*><IN>?<NN.*>?}"], "exclude": "}<WP.*|WRB|WDT|VB.*|JJ.*|RB.*|RP|PRP|IN|UH|DT>+{", "tag": "DEFN"}
    
    # Defination chunk extraction
    DEFINATION_REGX_PATTERN = {"label": "Definations or search", "include": []}
    
    CHUNK_TAGS = [NOUN_PHRASE_REGX_PATTERN.get("tag"), WH_PRO_REGX_PATTERN.get("tag"), WH_ADV_REGX_PATTERN.get("tag"), PERIOD_REGX_PATTERN.get("tag"), KPI_EXACT_REGX_PATTERN.get("tag")]
    
    ## Default Ramses Response
    DEFAULT_RESPONSE = ["sorry don't understand you. Please try saying this in another way."]
    
    # Valid KPI Types
    KPI_TYPES = ["revenue", "expense", "refund", "income", "refund", "earning"]
    KPI_TYPE_DICT = {"revenue": "REVENUE", "expense": "EXPENSE", "refund": "REFUNDS"}
    
    ## Error Responses
    LANGUAGE_ERROR = "Invalid language detected."
    
    ## Corpus language tenses
    TENSE = {"past": "past_tense", "future": "future_tense"}
    PAST_TENSE = ["past", "previous", "last", "ago"]
    
    ## Generals
    BASE_PERIODS = ["day", "week", "month", "year"]
    IMMEDIATE_PERIODS = ["yesterday", "today", "tomorrow"]
    BASE_PERIOD_ERROR = f"Invalid period. valid periods are: {BASE_PERIODS}"
    SERVER_ERROR = "Server Error"
    ANALYSIS_SCOPE = {"detail": "DETAIL", "exact": "EXACT"}
    SENTENCE_TYPES = {"simple": "SIMPLE_SENTENCE", "compound": "COMPOUND_SENTENCE", "complex": "COMPLEX_SENTENCE", "compound_complex": "COMPOUND_COMPLEX_SENTENCE", "phrase": "PHRASE"}
    
    # Context model constants
    CONTEXT_MODEL_PATH = "./data/kpi_identifier.hdf5"
    CONTEXT_MODEL_DATA_PATH = "./data/data.pickle"

<h2 style="color:red">Exceptions</h2>

In [3]:
class GlobalException(Exception):
    pass

<h2 style="color:red">Model initializers</h2>

In [4]:
class SpacyDoc:
    """ This is a singleton class implementation """
    
    __nlp_library = "en_core_web_lg"
    __nlp_instance = spacy.load(__nlp_library)

    @staticmethod 
    def getInstance(doc: str):
        """ Static access method. """
        
        if SpacyDoc.__nlp_instance == None:
            SpacyDoc()
            
        return SpacyDoc.__nlp_instance(doc)

    def __init__(self):
        """ Virtually private constructor. """
        
        if SpacyDoc.__nlp_instance != None:
            raise Exception("This class is a singleton!")
            
        else:
            SpacyDoc.__nlp_instance = self

<h2 style="color:red">Subject | Object & Predicate extraction</h2>

In [5]:
class SPO:
    """
        This class will extract the following: SPO => subject, predicate, object
    """
    
    def __init__(self, doc: str):
        self.__doc = SpacyDoc.getInstance(doc=doc)
    
    
    def __get_entities(self):
        ## chunk 1
        entity_1 = ""
        entity_2 = ""

        prv_tok_dep = ""    # dependency tag of previous token in the sentence
        prv_tok_text = ""   # previous token in the sentence

        prefix = ""
        modifier = ""
        
        for tok in self.__doc:
            ## chunk 2
            # if token is a punctuation mark then move on to the next token
            if tok.dep_ != "punct":
                # check: token is a compound word or not
                if tok.dep_ == "compound":
                    prefix = tok.text
                    # if the previous word was also a 'compound' then add the current word to it
                    if prv_tok_dep == "compound":
                        prefix = prv_tok_text + " "+ tok.text

                # check: token is a modifier or not
                if tok.dep_.endswith("mod") == True:
                    modifier = tok.text
                    # if the previous word was also a 'compound' then add the current word to it
                    if prv_tok_dep == "compound":
                        modifier = prv_tok_text + " "+ tok.text

                ## chunk 3
                if tok.dep_.find("subj") == True:
                    entity_1 = modifier +" "+ prefix + " "+ tok.text
                    prefix = ""
                    modifier = ""
                    prv_tok_dep = ""
                    prv_tok_text = ""      

                ## chunk 4
                if tok.dep_.find("obj") == True:
                    entity_2 = modifier +" "+ prefix +" "+ tok.text

                ## chunk 5  
                # update variables
                prv_tok_dep = tok.dep_
                prv_tok_text = tok.text
        
        
        if entity_1:
            entity_1.strip()
        else:
            entity_1 = None
            
        if entity_2:
            entity_2.strip()
        else:
            entity_2 = None
            
        return entity_1, entity_2
    
    
    def __get_predicate(self):
        matcher = spacy.matcher.Matcher(self.__doc.vocab)
        
        #define the pattern 
        pattern = [ {'DEP':'ROOT'},
                    {'DEP':'prep','OP':"?"},
                    {'DEP':'agent','OP':"?"},  
                    {'POS':'ADJ','OP':"?"}] 

        matcher.add("PREDICATE_MATCH", None, pattern) 
        
        matches = matcher(self.__doc)
        k = len(matches) - 1
        
        span = self.__doc[matches[k][1]:matches[k][2]]
        
        return span.text
    
    
    def main(self):
        predicate = self.__get_predicate()
        subj, obj = self.__get_entities()
        
        if subj is None and obj is None:
            try:
                subj = [noun.text for noun in self.__doc.noun_chunks][0]
                
            except IndexError:
                pass
            
        return subj, predicate, obj

In [6]:
SPO("revenue").main()

('revenue', 'revenue', None)

<h2 style="color:red">Sentence Classification</h2>

In [7]:
def doc_classifier(doc: str):
    doc = SpacyDoc.getInstance(doc=doc)
    subjects = []
    objects = []
    verbs = []
    dep_clause = []
    
    for token in doc:
        if token.dep_ != "punct":
            if token.dep_.find("subj") == True:
                subjects.append(token.text)
            
            if token.dep_.find("obj") == True:
                objects.append(token.text)
            
            if "VB" in token.tag_:
                verbs.append(token.text)
            
            if token.pos_ == "SCONJ" or token.dep_ == "mark":
                dep_clause.append(token.text)
    
    if len(subjects) == 1 and len(verbs) >= 1:
        return Constants.SENTENCE_TYPES.get("simple")
    
    elif len(subjects) >= 2 and not dep_clause and len(verbs) >= 2:
        return Constants.SENTENCE_TYPES.get("compound")
    
    elif dep_clause:
        return random.choice([Constants.SENTENCE_TYPES.get("complex"), Constants.SENTENCE_TYPES.get("compound_complex")])
    
    else:
        pass
    
    return Constants.SENTENCE_TYPES.get("phrase")

In [8]:
doc_classifier("Need some insights on revenue.")

'PHRASE'

<h2 style="color:red">Corpus Normalization</h2>

In [9]:
class CorpusNormalization:
    def __init__(self, corpus: str):
        self.corpus = corpus
    
    def normalize(self, contraction_expansion: bool = True,
                     accented_char_removal: bool = True, text_lower_case: bool = True, 
                     text_lemmatization: bool = False, special_char_removal: bool = True, 
                     stopword_removal: bool = True, remove_digits: bool = False) -> str:
        
        # validate corpus language
        CorpusNormalization.__language_validator(corpus=self.corpus)

        normalized_corpus = []
        corpus = CorpusNormalization.__nltk_sent_tokenize(corpus=self.corpus)

        # normalize each document in the corpus
        for doc in corpus:
            # strip spaces
            doc = doc.strip()

            # remove accented characters
            if accented_char_removal:
                doc = CorpusNormalization.__remove_accented_chars(doc=doc)

            # expand contractions    
            if contraction_expansion:
                doc = CorpusNormalization.__expand_contractions(doc=doc)

            # lowercase the text    
            if text_lower_case:
                doc = doc.lower()
            
            # remove extra newlines
            doc = re.sub(r'[\r|\n|\r\n]+', ' ', doc)
            
            # remove repeated characters and correcting words
            doc = CorpusNormalization.__remove_repeated_characters(doc=doc)
            doc = CorpusNormalization.__correct_text(doc=doc)

            # lemmatize text
            if text_lemmatization:
                doc = CorpusNormalization.__lemmatize_text(doc=doc)

            # remove special characters and\or digits    
            if special_char_removal:
                # insert spaces between special characters to isolate them    
                special_char_pattern = re.compile(r'([{.(-)!}])')
                doc = special_char_pattern.sub(" \\1 ", doc)
                doc = CorpusNormalization.__remove_special_characters(doc=doc, remove_digits=remove_digits)  

            # remove extra whitespace
            doc = re.sub(' +', ' ', doc)

            # remove stopwords
            if stopword_removal:
                doc = CorpusNormalization.__remove_stopwords(doc=doc)

            normalized_corpus.append(doc.strip())

        # combining results
        normalized_corpus = " ".join(doc for doc in normalized_corpus)
        normalized_corpus = normalized_corpus.strip()

        return normalized_corpus
    
    # spacy lemmatization
    @staticmethod
    def __lemmatize_text(doc: str) -> str:
        lemmatizer = nltk.stem.WordNetLemmatizer()
        doc = " ".join(lemmatizer.lemmatize(token) for token in doc.split())
        
        return doc
    
    # Correcting an english word
    @staticmethod
    def __correct_text(doc: str) -> str:
        doc = TextBlob(doc)
        
        return doc.correct().__str__()
    
    # Removing Accented Characters
    @staticmethod
    def __remove_accented_chars(doc: str):
        doc = unicodedata.normalize('NFKD', doc).encode('ascii', 'ignore').decode('utf-8', 'ignore')
        return doc
    
    # NLTK sentence tokenization
    @staticmethod
    def __nltk_sent_tokenize(corpus: str) -> list:
        corpus = nltk.tokenize.sent_tokenize(corpus)
        
        return corpus
    
    # Removing special characters
    @staticmethod
    def __remove_special_characters(doc: str, remove_digits=False) -> str:
        pattern = r'[^a-zA-z0-9\s]' if not remove_digits else r'[^a-zA-z\s]'
        doc = re.sub(pattern, '', doc)

        return doc
    
    # Removing repeating characters
    @staticmethod
    def __remove_repeated_characters(doc: str) -> str:
        repeat_pattern = re.compile(r'(\w*)(\w)\2(\w*)')
        match_substitution = r'\1\2\3'

        tokens = nltk.tokenize.word_tokenize(doc)

        def replace(old_word):
            if nltk.corpus.wordnet.synsets(old_word):
                return old_word
            new_word = repeat_pattern.sub(match_substitution, old_word)

            return replace(new_word) if new_word != old_word else new_word

        correct_tokens = [replace(word) for word in tokens]
        return ' '.join(correct_tokens)
    
    # Removing stopwords
    @staticmethod
    def __remove_stopwords(doc: str) -> str:
        stop_words = nltk.corpus.stopwords.words("english")
        response = []
    
        for token in nltk.tokenize.word_tokenize(doc):
            if token not in stop_words or token in Constants.IGNORED_WORDS:
                response.append(token)
        
        return " ".join(word for word in response)
    
    # expand contracted document
    @staticmethod
    def __expand_contractions(doc: str) -> str:
        response = []

        for word_token in doc.split():
            response.append(contractions.fix(word_token))

        return " ".join(word_token for word_token in response)
    
    # Convert number string to digit
    @staticmethod
    def __num_str_to_digit(string: str) -> int:
        return num_string_to_digit(string)
    
    # language validator
    @staticmethod
    def __language_validator(corpus: str, using_textblob: bool = True):
        try:
            language = TextBlob(corpus).detect_language()
            
        except Exception:
            language =  langdetect.detect(corpus)
            
        finally:
            if language not in Constants.LANGUAGE_SUPPORTED:
                raise GlobalException(Constants.LANGUAGE_ERROR)

<h2 style="color:red">Extracting start_date and stop_date from normalized document</h2>

In [10]:
class PeriodExtractor:
    """
        This is a period extractor class. it will go through the user query and identify the date ranges of our query.
    """
    
    def __init__(self, corpus: str):
        self.__immediate_period = False
        self.__tense = Constants.TENSE.get("future")
        self.__doc = SpacyDoc.getInstance(doc=corpus)
        self.__doc_entities = [ent.label_ for ent in self.__doc.ents]
        
    def __get_query_date(self, period: str, date: str, shift_range: int):
        date = arrow.get(date)
        response = date

        if period == Constants.BASE_PERIODS[0]:
            response = date.shift(days=shift_range)

        elif period == Constants.BASE_PERIODS[1]:
            response = date.shift(weeks=shift_range)

        elif period == Constants.BASE_PERIODS[2]:
            response = date.shift(months=shift_range)

        elif period == Constants.BASE_PERIODS[3]:
            response = date.shift(years=shift_range)

        else:
            pass

        return response
    
    
    def __period_shift_range(self, period: str, shift_range: int, start_date: str = None, stop_date: str = None):
        if not self.__immediate_period:
            if period not in Constants.BASE_PERIODS:
                return None, None
#                 raise GlobalException(Constants.BASE_PERIOD_ERROR)

            if start_date is None and stop_date is None:
                return None, None
#                 raise GlobalException("Date is required.")
            
            if start_date is not None and stop_date is None:
                stop_date = self.__get_query_date(period=period, date=start_date, shift_range=shift_range)

            elif  stop_date is not None and start_date is None:
                start_date = self.__get_query_date(period=period, date=stop_date, shift_range=-shift_range)

            else:
                return None, None
#                 raise GlobalException(Constants.SERVER_ERROR)

        elif self.__immediate_period:
            date = arrow.now()
            
            if period == Constants.IMMEDIATE_PERIODS[0]:
                start_date = date.shift(days=-1)
                stop_date = date

            elif period == Constants.IMMEDIATE_PERIODS[1]:
                start_date = date
                stop_date = date

            elif period == Constants.IMMEDIATE_PERIODS[2]:
                start_date = date
                stop_date = date.shift(days=1)

            else:
                return None, None
#                 raise GlobalException(Constants.SERVER_ERROR)

        else:
            return None, None
#             raise GlobalException(Constants.SERVER_ERROR)

        return arrow.get(start_date).date(), arrow.get(stop_date).date()
    
    
    def __get_period_date_range(self, doc) -> int:
        period_tags = []
        numbers = []
        period = None
        start_date = None
        stop_date = None
        
        for token in doc:
            period_tags.append(token.tag_)
            
            if token.lemma_ in Constants.IMMEDIATE_PERIODS:
                self.__immediate_period = True
                start_date, stop_date = self.__period_shift_range(period=token.lemma_, shift_range=None)

                return start_date, stop_date
            
            if token.lemma_ in Constants.BASE_PERIODS:
                period = token.lemma_
            
            if token.pos_ == "NUM":
                numbers.append(token.text)
        
        num_digit = num_string_to_digit(s = " ".join(num for num in numbers))
        num_digit = 1 if num_digit == 0 else num_digit

        if self.__tense == Constants.TENSE.get("past"):
            stop_date = arrow.now()
            start_date, stop_date = self.__period_shift_range(period=period, shift_range=num_digit, stop_date=stop_date, start_date=None)

        elif self.__tense == Constants.TENSE.get("future"):
            start_date = arrow.now()
            start_date, stop_date = self.__period_shift_range(period=period, shift_range=num_digit, stop_date=None, start_date=start_date)

        else:
            pass        

        return start_date, stop_date
    
    
    def has_period(self) -> bool:
        has_period = False
        doc_entity_labels = [ent.label_ for ent in self.__doc.ents]
        
        if "DATE" in self.__doc_entities or "TIME" in self.__doc_entities:
            has_period = True
        
        return has_period
    
    
    def __set_doc_tense(self):
        for token in self.__doc:
            if token.tag_ == "VBD" or token.tag_ == "VBN" or token.text in Constants.PAST_TENSE:
                self.__tense = Constants.TENSE.get("past")
                
                break
        
        if "DATE" not in self.__doc_entities and "TIME" in self.__doc_entities:
            self.__tense = Constants.TENSE.get("future")
        
        return None    
    
    
    def __get_doc_periods(self) -> list:
        available_periods = []
        available_periods = [(ent.as_doc(), ent.label_) for ent in self.__doc.ents if ent.label_ == "DATE"]
        available_periods = [(SpacyDoc.getInstance(doc="today"), "TIME")] if not available_periods else available_periods
        
        return available_periods
    
    
    def extract(self):
        """
            VBD -> past test
            VBN -> past participle
        """
        if not self.has_period():
            return None, None, None, None
        
        self.__set_doc_tense()
        periods = self.__get_doc_periods()
        
        period = periods[0][0]
        start_date, stop_date = self.__get_period_date_range(doc=period)
        
        return start_date, stop_date, period, self.__tense

In [11]:
start, stop, period, tense = PeriodExtractor(corpus="what was my revenue at 4pm").extract()
[start, stop, period, tense]

[datetime.date(2021, 2, 9), datetime.date(2021, 2, 9), today, 'future_tense']

<h2 style="color:red">Chunking & Chinking</h2>

In [12]:
class ChunkParser:
    def __init__(self, doc: str, subject: str = None):
        self.doc = doc
        self.subject = subject
        
        self.found_chunk_labels = []
    
    def __get_grammar_rules(self):
        DEFAULT_MATCHER = r"""  {np_tag}: {np_include}  # This is a noun phrase chunk
                                
                                {np_exclude}  # This is chinking regex
                                
                                {period_tag}: {period}
                                {wh_pro_tag}: {wh_pro} # ["who", "which", "what"]
                                {wh_det_tag}: {wh_det} # ["what", "which", "whose"]
                                {wh_adv_tag}: {wh_adv} # ["how", "why", "where", "when"]
                            """.format(np_include=Constants.NOUN_PHRASE_REGX_PATTERN.get("include"), np_tag=Constants.NOUN_PHRASE_REGX_PATTERN.get("tag"), np_exclude=Constants.NOUN_PHRASE_REGX_PATTERN.get("exclude"), period=Constants.PERIOD_REGX_PATTERN.get("include"), period_tag=Constants.PERIOD_REGX_PATTERN.get("tag"), wh_pro=Constants.WH_PRO_REGX_PATTERN.get("include"), wh_pro_tag=Constants.WH_PRO_REGX_PATTERN.get("tag"), wh_det=Constants.WH_DET_REGX_PATTERN.get("include"), wh_det_tag=Constants.WH_DET_REGX_PATTERN.get("tag"), wh_adv=Constants.WH_ADV_REGX_PATTERN.get("include"), wh_adv_tag=Constants.WH_ADV_REGX_PATTERN.get("tag"))
        
        search = r"""
                      {search_tag}: {search_1}
                      {search_tag}: {search_2}

                      {exclude}
                """.format(search_tag=Constants.SEARCH_REGX_PATTERN.get("tag"), search_1=Constants.SEARCH_REGX_PATTERN.get("include")[0], search_2=Constants.SEARCH_REGX_PATTERN.get("include")[1], exclude=Constants.SEARCH_REGX_PATTERN.get("exclude"))
        
        kpi = r""" 
                    {kpi_exact_tag}: {kpi_exact_1}
                    {kpi_exact_tag}: {kpi_exact_2}
                """.format(kpi_exact_tag=Constants.KPI_EXACT_REGX_PATTERN.get("tag"), kpi_exact_1=Constants.KPI_EXACT_REGX_PATTERN.get("include")[0], kpi_exact_2=Constants.KPI_EXACT_REGX_PATTERN.get("include")[1])
        
        return [search, kpi, DEFAULT_MATCHER]
    
    
    def __chunk_extractor(self) -> tuple:
        grammar_rules = self.__get_grammar_rules()
        chunks = []
        
        for grammar_rule in grammar_rules:
            chunks.append(self.__parser(grammar=grammar_rule))
        
        chunks = list(set([chunk for chunk_ in chunks for chunk in chunk_]))
        labels = [chunk[0] for chunk in chunks]
        
        return chunks, labels
    
    
    def __nltk_grammer_selector(self, subject: str = None) -> str:
        DEFAULT_MATCHER = r"""  {np_tag}: {np_include}  # This is a noun phrase chunk
                                
                                {np_exclude}  # This is chinking regex
                                
                                {period_tag}: {period}
                                {wh_pro_tag}: {wh_pro} # ["who", "which", "what"]
                                {wh_det_tag}: {wh_det} # ["what", "which", "whose"]
                                {wh_adv_tag}: {wh_adv} # ["how", "why", "where", "when"]
                            """.format(np_include=Constants.NOUN_PHRASE_REGX_PATTERN.get("include"), np_tag=Constants.NOUN_PHRASE_REGX_PATTERN.get("tag"), np_exclude=Constants.NOUN_PHRASE_REGX_PATTERN.get("exclude"), period=Constants.PERIOD_REGX_PATTERN.get("include"), period_tag=Constants.PERIOD_REGX_PATTERN.get("tag"), wh_pro=Constants.WH_PRO_REGX_PATTERN.get("include"), wh_pro_tag=Constants.WH_PRO_REGX_PATTERN.get("tag"), wh_det=Constants.WH_DET_REGX_PATTERN.get("include"), wh_det_tag=Constants.WH_DET_REGX_PATTERN.get("tag"), wh_adv=Constants.WH_ADV_REGX_PATTERN.get("include"), wh_adv_tag=Constants.WH_ADV_REGX_PATTERN.get("tag"))
        
        matchers = {
            "search": r"""
                          {search_tag}: {search_1}
                          {search_tag}: {search_2}
                          
                          {exclude}
                        """.format(search_tag=Constants.SEARCH_REGX_PATTERN.get("tag"), search_1=Constants.SEARCH_REGX_PATTERN.get("include")[0], search_2=Constants.SEARCH_REGX_PATTERN.get("include")[1], exclude=Constants.SEARCH_REGX_PATTERN.get("exclude")),
            "kpi": r""" 
                        {kpi_exact_tag}: {kpi_exact_1}
                        {kpi_exact_tag}: {kpi_exact_2}
                    """.format(kpi_exact_tag=Constants.KPI_EXACT_REGX_PATTERN.get("tag"), kpi_exact_1=Constants.KPI_EXACT_REGX_PATTERN.get("include")[0], kpi_exact_2=Constants.KPI_EXACT_REGX_PATTERN.get("include")[1]),
        }
        
        if subject is not None:
            return matchers.get(subject, DEFAULT_MATCHER)
        
        return matchers.get(self.subject, DEFAULT_MATCHER)
    
    @staticmethod
    def noun_extractor(doc: str) -> str:
        subj, pred, obj = SPO(doc=doc).main()
        entities = [subj, obj]
        entities = filter(lambda x: x if x is not None else 0, entities)
        entities = [entity for entity in entities]
        
        return entities
    
    def __parser(self, grammar: str):
        parser = nltk.RegexpParser(grammar=grammar)
        chunked = parser.parse(nltk.pos_tag(nltk.tokenize.word_tokenize(self.doc)))
        chunk_labels = []
        
        for chunk in chunked.subtrees():
            chunk_phrase = " ".join(word for word in [text[0] for text in chunk.leaves()])
            noun_subject = None
            
            try:
                noun_subject = self.noun_extractor(chunk_phrase)[0]
                noun_subject = noun_subject.strip()
                
            except IndexError:
                pass
                
            finally:
                
                chunk_labels.append((chunk.label(), chunk_phrase, noun_subject))
        
        return chunk_labels
    
    def main(self):
#         pos = nltk.pos_tag(nltk.tokenize.word_tokenize(self.doc))
#         print(pos)
#         displacy.render(SpacyDoc.getInstance(doc=self.doc), style="ent", jupyter=True)
#         grammar = self.__nltk_grammer_selector()
#         chunk_labels = self.__parser(grammar=grammar)
#         chunk_labels = chunk_labels if len(chunk_labels) > 1 else self.__parser(grammar=self.__nltk_grammer_selector(subject="default"))
        
        return self.__chunk_extractor()


In [13]:
SPO(doc="revenue").main()

('revenue', 'revenue', None)

In [27]:
labels = ChunkParser(doc="Need some help on income.", subject="search").main()
labels

([('NP', 'income', 'income'),
  ('DEFN', 'income', 'income'),
  ('DEFN', 'help', None),
  ('NP', 'help', None),
  ('S', 'Need some help on income .', 'income')],
 ['NP', 'DEFN', 'DEFN', 'NP', 'S'])

<h2 style="color:red">Google search</h2>

In [15]:
def google_search(query):
    user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.146 Safari/537.36"
    
    options = webdriver.ChromeOptions()
    options.headless = True
    options.add_argument(f'user-agent={user_agent}')
    options.add_argument("--window-size=1920,1080")
    options.add_argument('--ignore-certificate-errors')
    options.add_argument('--allow-running-insecure-content')
    options.add_argument("--disable-extensions")
    options.add_argument("--proxy-server='direct://'")
    options.add_argument("--proxy-bypass-list=*")
    options.add_argument("--start-maximized")
    options.add_argument('--disable-gpu')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--no-sandbox')
    options.add_argument('headless')
    
    driver = webdriver.Chrome("/usr/local/share/chromedriver",options=options)
    driver.get("https://www.google.com/search?q="+query.lower())
    
    results = None
    responses = ["From a google search on", "According to a google research on", "Here is what I found on"]
    response_head = random.choice(responses)
    
    try:
        result_obj = driver.find_element_by_xpath("//div[@data-attrid='description']").find_elements_by_tag_name("span")
        
        for obj in result_obj:
            if len(obj.text.split()) < 5:
                continue
            
            results = obj.text
        
        try:
            if results:
                results = TextBlob(u"{text}".format(text=results)).translate(to="en")
        
        except NotTranslated:
            pass
        
        finally:
            results = f'{response_head} "{query}"; {results}'
            return results
        
    except:
        return None

In [16]:
google_search("expenses")

'Here is what I found on "expenses"; Expenses are the outflow of money, or any form of wealth in general, to another person or group to pay for an item or service, or for a category of costs. For a tenant, rent is an expense. For students or parents, tuition is an expense.'

<h2 style="color:red">Bot Responses</h2>

In [17]:
class Responses:
    def __init__(self, subject: str, chunk_labels: list, chunk_tags: list, period: str, tense: str,  noun_subject: str = None, start_date: str = None, stop_date: str = None):
        self.subject = subject
        self.chunk_labels = chunk_labels
        self.chunk_tags = chunk_tags
        self.period = period
        self.tense = tense
        self.noun_subject = noun_subject
        
        self.start_date = start_date
        self.stop_date = stop_date
        self.suggested_periods = ["today", "this week", "this month"]
    
    def __db_get_kpi_value(self):
        """
            Still have to build this function using "self.start_date & self.stop_date"
        """
        
        return "$1000"
    
    def kpi(self, tag: str, kpi: str):
        tags = {
            "what_exactly_period": "What exactly do you want to know about your {kpi} for {period}?".format(period=self.period, kpi=kpi),
            "what_exactly_no_period": "What exactly do you want to know about your {kpi}?".format(kpi=kpi),
            "ask_period": "Do you mean your {kpi} for {period}? Or when exactly?".format(kpi=kpi, period=random.choice(self.suggested_periods) if not self.period else self.period),
            "success": "Your {kpi} for {period} {tense} {kpi_value}.".format(period=self.period, kpi=kpi, tense="was" if self.tense == Constants.TENSE.get("past") else "is", kpi_value=self.__db_get_kpi_value()),
        }
        
        return tags.get(tag)
    
    def search(self, query):
        response = google_search(query=query)
        
        return response
    
    def error(self, tag: str, kpi: str = None):
        tags = {
            "wh_pronoun_determiner_error": "Do you mean your {kpi} for {period}?".format(kpi=kpi, period=random.choice(self.suggested_periods) if not self.period else self.period),
        }
        
        return tags.get(tag)
    
    def response(self):
        response = None
        success = False
        done = True
        
        wh_valid_words = Constants.WH_PRO_REGX_PATTERN.get("valid_words").copy()
        wh_valid_words.extend(Constants.WH_DET_REGX_PATTERN.get("valid_words"))
        wh_valid_words = list(set(wh_valid_words))
        wh_tags = ["WDT", "WP", "WPR", "WP$", "WRB"]
                
        for chunk in self.chunk_labels:
            [label, phrase, noun_subject] = chunk
            phrase_pos = [tag for word, tag in nltk.pos_tag(nltk.tokenize.word_tokenize(phrase))]
            
            if label == Constants.SEARCH_REGX_PATTERN.get("tag"):
                response = self.search(query=noun_subject)
                success = True if response is not None else False
                done = True
        
        return response, success, done
    
#     def response(self):
#         response = None
#         success = False
#         done = True
        
#         wh_valid_words = Constants.WH_PRO_REGX_PATTERN.get("valid_words").copy()
#         wh_valid_words.extend(Constants.WH_DET_REGX_PATTERN.get("valid_words"))
#         wh_valid_words = list(set(wh_valid_words))
#         wh_tags = ["WDT", "WP", "WPR", "WP$", "WRB"]
        
#         for chunk in self.chunk_labels:
#             [label, phrase, noun_subject] = chunk
#             phrase_pos = [tag for word, tag in nltk.pos_tag(nltk.tokenize.word_tokenize(phrase))]
            
#             if label in [Constants.WH_PRO_REGX_PATTERN.get("tag"), Constants.WH_DET_REGX_PATTERN.get("tag")] and self.noun_subject in Constants.KPI_TYPES:
#                 response = self.kpi(tag="success", kpi=self.noun_subject)
#                 success = True
#                 done = True
#                 break

#             if label is Constants.KPI_EXACT_REGX_PATTERN.get("tag"):
#                 valid_wh_word = False
#                 success = True
                
#                 if self.period is not None:
#                     for wh_word in wh_valid_words:
#                         if wh_word in phrase:
#                             valid_wh_word = True
#                             break
                    
#                     if valid_wh_word:
#                         response = self.kpi(tag="success", kpi=noun_subject)
#                         done = True
#                         break
                    
#                     else:
#                         invalid_wh_tag = False
                        
#                         for pos_tag in phrase_pos:
#                             if pos_tag in wh_tags:
#                                 invalid_wh_tag = True
#                                 break
                        
#                         response = None if invalid_wh_tag else self.kpi(tag="success", kpi=noun_subject)
#                         success = False if invalid_wh_tag else True
#                         done = True
#                         break
                
#                 else:
# #                     print(self.period, "############")
#                     response = self.kpi(tag="ask_period", kpi=noun_subject)
#                     done = False
#                     break
            
#             elif label is not Constants.KPI_EXACT_REGX_PATTERN.get("tag"):
#                 if self.subject == "kpi":
#                     response  = self.kpi(tag="what_exactly_period", kpi=self.noun_subject) if self.period is not None else self.kpi(tag="what_exactly_no_period", kpi=self.noun_subject)
#                     success = True
#                     done = False

#                 else:
#                     response = None
#                     success = False
#                     done = True
            
#             elif label is Constants.SEARCH_REGX_PATTERN.get("tag"):
#                 response = self.search(query=noun_subject)
#                 success = True if response is not None else False
#                 done = True
            
#             else:
#                 pass
                
                
#         return response, success, done


<h2 style="color:red">Get Subject Context</h2>

In [55]:
class ContextExtractor:
    def __init__(self, data_path, model_path):
        try:
            with open(data_path, 'rb') as file:
                self.words, self.classes, self.documents = pickle.load(file)
        except Exception as e:
            print(e)
            raise FileNotFoundError(f"{data_path} doesn't exist")
            
        try:
            self.model = tf.keras.models.load_model(model_path)
        except Exception as e:
            print(e)
            raise FileNotFoundError(f"{model_path} doesn't exist")
           
    @property
    def ignore_words(self):
        return ["'s"] + [s for s in string.punctuation]
    
    def clean_up_sentence(self, sentence):
        stemmer_ = nltk.stem.lancaster.LancasterStemmer()
        
        s_words = nltk.word_tokenize(sentence)
        s_words = [stemmer_.stem(w.lower()) for w in s_words if w not in self.ignore_words]

        return s_words

    def bow(self, sentence, words, show_details=True):
        s_words = self.clean_up_sentence(sentence)

        bag = [0 for _ in range(len(words))]

        for s in s_words:
            for i, w in enumerate(words):
                if w == s:
                    bag[i] = 1

                    if show_details:
                        print(f"found in bag: {s}")

        return np.array(bag)
    
    def classify_local(self, sentence):
        ERROR_THRESHOLD = 0.6

        input_data = pd.DataFrame([self.bow(sentence, self.words, show_details=False)], dtype=float, index=['input'])
        input_data = input_data.values.reshape(-1, 1, input_data.shape[1])

        results = self.model.predict([input_data])[0]

        #filter out prediction below a threshold, and provide intent index
        results = [[i, r] for i, r in enumerate(results) if r > ERROR_THRESHOLD]

        # sort by strength of probability
        results.sort(key=lambda x: x[1], reverse=False)

        return_list = []
        for r in results:
            return_list.append((self.classes[r[0]], str(r[1])))

        return return_list

    def run(self, doc: str):
        try:
            results = self.classify_local(doc)[0]
            return results[0], doc
        
        except Exception as e:
            raise(e)
            return None


In [56]:
context_instance = ContextExtractor(data_path=Constants.CONTEXT_MODEL_DATA_PATH, model_path=Constants.CONTEXT_MODEL_PATH)
context_instance.run(doc="Insights about my earnings for this week, please.")



('revenue', 'Insights about my earnings for this week, please.')

<h2 style="color:red">Test Bot</h2>

In [58]:
class Bot:
    def __init__(self):
        pass
    
    @staticmethod
    def run(text_lemmatization: bool = False, stopword_removal: bool = False):
        
        while True:
            try:
                input_data = input("Enter query: ")
                                
                if input_data.lower() in ["end", "stop", "exit", "quit", "terminate"] or len(input_data) == 0:
                    break
                
                # start time tracker for performance check
                start_time = time.time()
                
                # Normalization of input corpus
                normalized_corpus = CorpusNormalization(corpus=input_data).normalize(stopword_removal=stopword_removal, text_lemmatization=text_lemmatization)
                
                # Doc classifier to know if doc is [sent & type or phrase & type]
                doc_type = doc_classifier(doc=normalized_corpus)
                
                # Context extractor -- This is to know what exactly we are dealing with.
                subject, _ = ContextExtractor(data_path=Constants.CONTEXT_MODEL_DATA_PATH, model_path=Constants.CONTEXT_MODEL_PATH).run(doc=normalized_corpus)
                print(f"Context: {subject}")
                
                # Pass our normalized doc through our grammar rules -- Returns (chunk_tag, chunk_phrase, noun_subject)
                chunk_labels, chunk_tags = ChunkParser(doc=normalized_corpus, subject=subject).main()
                print(chunk_labels)
                
                if subject is None:
                    raise GlobalException(None)
                
                # Period Extractor class
                start_date, stop_date, period, tense = PeriodExtractor(corpus=str(normalized_corpus)).extract()
                
                # Bot response
                response, success, done = Responses(subject=subject, chunk_labels=chunk_labels, chunk_tags=chunk_tags, period=period, noun_subject=subject, tense=tense).response()
                response = random.choice(Constants.DEFAULT_RESPONSE) if not success and done else response
                
                print(f"Bot: {response} \n")
                
                # start time tracker for performance check
                stop_time = time.time()
                
                print(f"success: {success} \t done: {done} \t Runtime: {round(stop_time - start_time, 2)} secs \n")
            
            except (GlobalException, LangDetectException):
                print(f"Bot: {random.choice(Constants.DEFAULT_RESPONSE)} \n")
                continue

# Run test bot
Bot.run()

Enter query:  tell me something about expenses


Context: expenses
[('S', 'tell me something about expenses', 'expenses'), ('NP', 'something', 'something'), ('NP', 'expenses', 'expenses'), ('DEFN', 'expenses', 'expenses'), ('DEFN', 'something', 'something')]
Bot: sorry don't understand you. Please try saying this in another way. 

success: False 	 done: True 	 Runtime: 38.54 secs 



Enter query:  exit


In [None]:
# look-up on "Need some help on income"

In [61]:
text = "John of revenue"

rule = r""" DEFN: {<WP.*|WDT|WRB><VB.*><JJ.*>*<NN.*><IN>?<NN.*>?}
            DEFN: {<IN|UH>*<VB.*><RP>*<IN|UH>?<DT>?<PRP>?<JJ.*>*<IN>?<NN.*><IN>?<NN.*>?}
            DEFN: {<NN.*><IN>*<PRP>?<JJ.*>*<NN.*><NN.*>?}
        """
parser  = nltk.RegexpParser(rule)

chunk = parser.parse(nltk.pos_tag(nltk.tokenize.word_tokenize(text)))
# for tree in chunk.subtrees():
#     print(tree.label())
# chunk.label()

print(chunk)

(S (DEFN John/NNP of/IN revenue/NN))


In [82]:
text = "Definations of revenue"
doc = SpacyDoc.getInstance(doc=text)

print(nltk.pos_tag(nltk.tokenize.word_tokenize(text)), "\n")

data = [(token.text, token.lemma_, token.tag_, token.pos_, token.dep_) for token in doc if not token.is_punct]
noun_phrases = [noun.text for noun in doc.noun_chunks]
df = pd.DataFrame(data, columns=["text", "lemma", "tag", "pos", "dep"])

print(noun_phrases)

# displacy.render(doc, style="ent", jupyter=True)
# displacy.render(doc, style="dep", jupyter=True)
df

[('Definations', 'NNS'), ('of', 'IN'), ('revenue', 'NN')] 

['Definations', 'revenue']


Unnamed: 0,text,lemma,tag,pos,dep
0,Definations,defination,NNS,NOUN,ROOT
1,of,of,IN,ADP,prep
2,revenue,revenue,NN,NOUN,pobj
