<a href="https://colab.research.google.com/github/saivarshitnune/NLP_Projects/blob/Varshith/search_question_service.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import ast
from typing import Any
import numpy as np
from nltk.stem import WordNetLemmatizer, PorterStemmer
from api.main.utility.helpers.greetings_detection import detect_greeting
from api.main.utility.helpers.keywords_helper import extract_keywords
from sentence_transformers import util
from api.initializer import logger_instance, model, embeddings_load
from api.main.utility.helpers.profanity_filter import profanity_detection
from spellchecker import SpellChecker
from api.main.utility.helpers.spell_correction import extract_unique_words, extract_dictionary
import copy

inference_data_timestamp = embeddings_load.current_timestamp
df_embeddings = None

logger = logger_instance.get_logger(__name__)

if embeddings_load.exists:
    df_embeddings = copy.deepcopy(embeddings_load.dataframe)
    df_list = np.array(df_embeddings["embedding"].to_list())
    list_of_all_questions = df_embeddings["questions"].tolist()

    words = extract_unique_words(list_of_all_questions)
    correction_dict = extract_dictionary()

    spell = SpellChecker()
    spell.word_frequency.load_words(words)

class SearchQuestionService(object):
    def __init__(self) -> None:
        pass

    @staticmethod
    async def find_question(input_question: str) -> Any:
        global df_embeddings
        global df_list
        global inference_data_timestamp
        global words
        global spell
        global correction_dict
        global list_of_all_questions

        if df_embeddings is not None or embeddings_load.exists:
            if inference_data_timestamp != embeddings_load.current_timestamp:
                logger.info('Loading Updated Dataframe in Memory')
                df_embeddings = copy.deepcopy(embeddings_load.dataframe)
                df_list = np.array(df_embeddings['embedding'].to_list())
                list_of_all_questions = df_embeddings['questions'].tolist()
                words = extract_unique_words(list_of_all_questions)
                correction_dict = extract_dictionary()
                spell = SpellChecker()
                spell.word_frequency.load_words(words)
                inference_data_timestamp = embeddings_load.current_timestamp

        # Remove punctuation and normalize input_question
        input_question = input_question.lower().split()
        counter = 0

        while counter < len(input_question):
            if input_question[counter].endswith('?') and input_question[counter] != '?':
                input_question[counter] = input_question[counter][:-1]
                input_question.insert(counter + 1, '?')
            elif input_question[counter].endswith('.') and input_question[counter] != '.':
                input_question[counter] = input_question[counter][:-1]
                input_question.insert(counter + 1, '.')
            else:
                counter += 1
        del counter

        # Helper function to check if a word exists in the correction dictionary
        def check_word(word):
            for key, value in correction_dict.items():
                if word in value:
                    return key
            return None

        # Correct the input question
        corrected_inp = []

        for i in input_question:
            dict_check = check_word(i)

            if i in ['a', 'i']:
                corrected_inp.append(i)
            elif dict_check is not None:
                corrected_inp.append(dict_check)
            elif i in words:
                corrected_inp.append(i)
            elif len(i) == 1:
                corrected_inp.append(i)
            else:
                temp = spell.correction(i)
                corrected_inp.append(temp if temp is not None else i)

        input_question = ' '.join(str(e) for e in corrected_inp)
        del corrected_inp

        if input_question == '':
            return [], {"is_profane": False, "is_greeting": False}

        question_embedding = model.encode(input_question, convert_to_tensor=True)
        cosine_score = util.cos_sim(df_list, question_embedding)
        df_embeddings['score'] = [round(float(score), 2) for score in cosine_score]

        result = df_embeddings[df_embeddings['score'] >= 0.70].reset_index(drop=True)
        if len(result) <= 6:
            result = df_embeddings[df_embeddings['score'] >= 0.65].reset_index(drop=True)
        if len(result) <= 6:
            result = df_embeddings[df_embeddings['score'] >= 0.50].reset_index(drop=True)

        def stem_keywords(tokens):
            stemmer = PorterStemmer()
            return [stemmer.stem(y) for y in tokens]

        question_keywords_new = await extract_keywords(input_question, method='Rake')
        lemmatize_question = await extract_keywords(question_keywords_new, method='Rake_ques')
        question_keywords = stem_keywords(lemmatize_question)

        is_profane = await profanity_detection(input_question)
        is_greeting = await detect_greeting(input_question)

        def lemmatize_text(tokens):
            x = ast.literal_eval(str(tokens))
            lemmatizer = WordNetLemmatizer()
            return [lemmatizer.lemmatize(y) for y in x]

        def find_keywords(question_keys):
            return list(question_keys.intersection(set(question_keywords)))

        if len(result) == 0:
            return [], {"is_profane": is_profane, "is_greeting": is_greeting}

        result['lematized_keys'] = result["keywords_rake"].apply(lemmatize_text)
        result['stemmed_keys'] = result['lematized_keys'].apply(stem_keywords)
        result["similar_keywords"] = result['stemmed_keys'].apply(find_keywords)
        result["keywords_count"] = result["similar_keywords"].apply(len)
        result.sort_values(by=["score", "keywords_count"], ascending=[False, False], inplace=True)

        result = result.loc[result.groupby("mid")["score"].idxmax()].reset_index(drop=True)
        max_count = result["keywords_count"].max()
        result = result.loc[result['keywords_count'] >= (max_count - 2)]
        result["relative_score"] = result['score'] / result["score"].max()

        match = []

        for _, row in result.iterrows():
            match.append({
                "mid": row["mid"],
                "score": row["score"],
                "relative_score": round(row["relative_score"], 3),
                "keywords": row["similar_keywords"],
                "keywords_count": row["keywords_count"],
                "question_digest": row["master_question_digest"],
                "master_question": row["master_question"],
                "uuids": row["master_question_uuids"]
            })

        if match:
            return match, {"is_profane": is_profane, "is_greeting": False}
        else:
            return [], {"is_profane": is_profane, "is_greeting": is_greeting}
