In [1]:
import json
import numpy as np
import re
import spacy

from collections import Counter
from sklearn.metrics.pairwise import cosine_similarity

spacy.prefer_gpu()


class DataLoader:

    def __init__(self, path):
        self.path = path


    def load_data(self):
        with open(self.path, 'r') as file:
            data = json.load(file)
        return data


    def save_data(self, data ,save_path):
        with open(save_path, 'w') as file:
            json.dump(data, file)


class TextProcessor:

    def __init__(self):
        self.nlp = spacy.load('en_core_web_sm')

        
    def _lowercase(self, text):
        return self.nlp(text.lower())


    def _rm_stop_punct(self, text):
        return [t for t in text if not t.is_punct and not t.is_stop]


    def _lemmatizer(self, text):
        return [t.lemma_ for t in text if t.dep_]


    def preprocess_text(self, text):
        doc = self.nlp(text)
        lower = self._lowercase(doc.text)
        no_stop = self._rm_stop_punct(lower)
        lemma = self._lemmatizer(no_stop)
        return lemma


class TFIDFVectFromScratch:

    """
    This class is based on the solution provided for the liveproject milestone
    """

    def __init__(self, text_data):
        self.data = text_data


    def _build_flatten_vocab(self, text_var):
        tokens = [token[text_var] for token in self.data]
        return list(set([token for sub in tokens for token in sub]))


    def _token_counter_within(self, text_var):
        return [Counter(doc[text_var]) for doc in self.data]

    
    def _token_counter_across(self, token_counts, vocab):
        return {token: sum([1 for doc in token_counts if token in doc ]) for token in vocab}

    
    def _generate_tfidf(self, tokens_within, vocabulary, term_text, tokens_across):
        ### Function based on solution provided via LiveProject ###

        # Iterate over tokens counted within docs
        for idx, doc in enumerate(tokens_within):
            tfidf_vector = []
            # Iterate ober tokens counted across docs
            for token in vocabulary:
                # TF -> count per term per doc / doc length
                tf = doc[token] / len(self.data[idx][term_text]) 
                # IDF -> num documents / num documents with term
                idf = np.log(len(self.data) / tokens_across[token])
                tfidf = tf * idf
                tfidf_vector.append(tfidf)
            self.data[idx]['tf_idf'] = tfidf_vector
        return self.data
  

    def _generate_tfidf_query(self, vocabulary, tokens_across, q_tokenized):
         ### Function based on solution provided via LiveProject ###

        q_vector = TextProcessor().preprocess_text(q_tokenized)
        q_counted = Counter(q_vector)
        
        q_vec = []
        for doc in vocabulary:
            tf = q_counted[doc] / len(q_tokenized)
            idf = np.log(len(self.data) / tokens_across[doc])
            tfidf = tf * idf
            q_vec.append(tfidf)
        return q_vec


    def tfidf_generator(self, term_text, q_tokens, is_query=False):
        vocab = self._build_flatten_vocab(term_text)
        within = self._token_counter_within(term_text)
        across = self._token_counter_across(within, vocab)
        if not is_query:
            return self._generate_tfidf(within, vocab, term_text, across)
        else:
            return self._generate_tfidf_query(vocab, across, q_tokens)


class SimilaritySearch:

    """
    This class is based on the solution provided for the liveproject milestone
    """

    def __init__(self, text_data):
        self.data = text_data


    def similarity_rankings(self, term_text, query_tfidf):
        """ Function based on solution provided"""
        q_vector = query_tfidf
        q_array = np.array(q_vector)

        doc_rankings = []
        for doc in self.data:
            ranking = {}
            doc_array = np.array(doc[term_text])
            similarity = cosine_similarity(q_array.reshape(1, -1), doc_array.reshape(1, -1))[0][0]
            if similarity > 0:
                ranking['title'] = doc['title']
                ranking['ranking'] = similarity
                doc_rankings.append(ranking)
        return sorted(doc_rankings, key=lambda x: x['ranking'], reverse=True)



if __name__== '__main__':
    loader = DataLoader('../data_hub/processed_data.json')
    data = loader.load_data()

    tp = TextProcessor()
    tf_vect = TFIDFVectFromScratch(data)
    fs = SimilaritySearch(data)

    query = "pandemic prevention organizations"

    tfidf = tf_vect.tfidf_generator('tokenized_text', q_tokens=query)
    q_tfidf = tf_vect.tfidf_generator('tokenized_text', q_tokens=query, is_query=True)
    results = fs.similarity_rankings('tf_idf', q_tfidf)
    print(results)
    

[{'title': 'Pandemic prevention', 'ranking': 0.30402262201155605}, {'title': 'HIV/AIDS', 'ranking': 0.09659164974175692}, {'title': 'Event 201', 'ranking': 0.06555233534603662}, {'title': 'Pandemic Severity Assessment Framework', 'ranking': 0.05943509640611115}, {'title': 'Crimson Contagion', 'ranking': 0.046156414441719223}, {'title': 'HIV/AIDS in Yunnan', 'ranking': 0.04607708829458858}, {'title': 'Disease X', 'ranking': 0.03269277376102613}, {'title': 'Science diplomacy and pandemics', 'ranking': 0.026768805724627844}, {'title': 'Pandemic', 'ranking': 0.022854684339078644}, {'title': 'Swine influenza', 'ranking': 0.022307599487677646}, {'title': 'Spanish flu', 'ranking': 0.015809196450560063}, {'title': 'Pandemic severity index', 'ranking': 0.01265838869003447}, {'title': 'COVID-19 pandemic', 'ranking': 0.008168664115353613}, {'title': 'Plague of Cyprian', 'ranking': 0.006858360288458347}, {'title': 'PREDICT (USAID)', 'ranking': 0.0067443391786392365}, {'title': 'Antonine Plague', '