In [169]:
! pip install scikit-learn pandas tabulate -q

In [170]:
from sklearn.datasets import fetch_20newsgroups
newsgroups = fetch_20newsgroups(subset='all')
target_names = newsgroups['target_names']

In [171]:
import pandas as pd

df = pd.DataFrame({
    'documents': newsgroups['data'],
    # 'target_names': newsgroups['target_names'],
    'target': [ target_names[_] for _ in newsgroups['target'] ]
})

In [172]:
import random
random.seed(20)

emails = df['documents'].to_list()
shuffled_emails = random.sample(emails, len(emails))[:20]

# print(df.iloc[1, 0])

In [173]:
from math import sqrt
from typing import List


def cosine(vector1: List[float], vector2: List[float]) -> float:
    if len(vector1) != len(vector2):
        raise Exception("Both vector length should be same")

    dot_product_vector = [ vector1[i] * vector2[i] for i in range(len(vector1)) ]
    dot_product = sum(dot_product_vector)

    vector1_abs = sqrt(sum([ val**2 for val in vector1 ]))
    vector2_abs = sqrt(sum([ val**2 for val in vector2 ]))

    return (dot_product / (vector1_abs * vector2_abs))

In [187]:
from math import log
import re
from typing import Dict, List
from collections import Counter
from tabulate import tabulate

NON_WORDS = re.compile("[^a-z' ]")

class Corpus:

    def __init__(self, corpus: List[str]):
        self.raw_docs = corpus
        self.corpus = [ NON_WORDS.sub(' ', document.lower()).split() for document in corpus ]
        self.document_set = [ set(document) for document in self.corpus ]
        self._fit()
        self.vector_space = [ 
           self._transform(document) for document in self.corpus
        ]

    def get_scores(self, words: List[str]):
        count_dict = Counter(words)

        word_list = []
        tf = []
        df = []
        idf = []
        freq = []
        tf_idf = []

        for word, value in count_dict.items():
            word_list.append(word)

            _tf = value / len(count_dict.items())
            _df = len([ document for document in self.document_set if word in document ])
            _idf = log((1 + len(self.corpus)) /  (1 + _df))

            tf.append(_tf)
            idf.append(_idf)
            freq.append(value)
            tf_idf.append(_tf * _idf)
            df.append(_df)

        return {
            'words': word_list,
            'freq': freq,
            'tf': tf,
            'idf': idf,
            'tf-idf': tf_idf,
            'df': df
        }
    
    def get_document_scores(self, document_id: int):
        return self.get_scores(self.corpus[document_id])
    
    def show_df(self, table: Dict[str, List[float]], limit: int = 3):
        table_data = list(zip(table['words'], table['freq'], table['tf'], table['idf'], table['tf-idf'], table['df']))

        # Sort by TF*IDF in descending order
        table_data.sort(key=lambda x: x[4], reverse=True)

        print(tabulate(table_data[:limit], headers=["Word", "Frequency", "TF", "IDF", "TF*IDF", "DF"], tablefmt="pretty"))

    def _fit(self):
        vocabulary = set()
        for document in self.document_set:
            vocabulary.update(document)
        
        vocabulary = list(vocabulary)
        self.vocabulary_ = {}
        for i in range(len(vocabulary)):
            self.vocabulary_[vocabulary[i]] = i

    def _transform(self, query_words: List[str]) -> List[float]:
        if not hasattr(self, 'vocabulary_'):
            raise Exception("Please call _fit() method first")
        
        query_scores = self.get_scores(query_words)
        word_tf_idf_tuple = list(zip(query_scores['words'], query_scores['tf-idf']))
        word_to_tf_idf_mapping = {}
        for pair in word_tf_idf_tuple:
            word_to_tf_idf_mapping[pair[0]] = pair[1]

        query_vector = [0.0] * len(self.vocabulary_)

        for word in query_words:
            index = self.vocabulary_.get(word)
            
            if not index:
                continue
            
            query_vector[index] = word_to_tf_idf_mapping[word]

        return query_vector
    
    def transform(self, query: str) -> List[float]:
        query_words = NON_WORDS.sub(' ', query.lower()).split()
        return self._transform(query_words)
    
    def search(self, query: str, limit: int = 3) -> List[tuple[str, float]]:
        query_vector = self.transform(query)

        rankings = []
        for i in range(len(self.vector_space)):
            vector = self.vector_space[i]
            cosine_rank = cosine(query_vector, vector)
            rankings.append((self.raw_docs[i], cosine_rank))

        rankings.sort(key=lambda x: x[1], reverse=True)

        return rankings[:limit]


corpus = Corpus(shuffled_emails)
table = corpus.get_document_scores(4)
# corpus.show_df(table, 5)

In [189]:
results = corpus.search("University Karlsruhe spot from organization")
results

[("From: franjion@spot.Colorado.EDU (John Franjione)\nSubject: Re: Jack Morris\nNntp-Posting-Host: spot.colorado.edu\nOrganization: University of Colorado, Boulder\nLines: 15\n\ntedward@cs.cornell.edu (Edward [Ted] Fischer) writes:\n\n>-Valentine\n>(No, I'm not going to be cordial.  Roger Maynard is a complete and\n>total dickhead.  Send me e-mail if you insist on details.)\n\nIn fact, he's a complete and total dickhead on at least 2 newsgroups\n(this one and rec.sport.hockey).  Since hockey season is almost over,\nhe's back to being a dickhead in r.s.bb.\n\n-- \nJohn Franjione\nDepartment of Chemical Engineering\nUniversity of Colorado, Boulder\nfranjion@spot.colorado.edu\n",
  0.2277065800916448),
 ("From: S_BRAUN@IRAV19.ira.uka.de (Thomas Braun)\nSubject: sources for shading wanted\nOrganization: University of Karlsruhe, FRG\nLines: 22\nDistribution: world\nNNTP-Posting-Host: irav19.ira.uka.de\nX-News-Reader: VMS NEWS 1.25\n\nI'm looking for shading methods and algorithms.\nPlease l