In [1]:
import math
from collections import defaultdict, Counter
import ipywidgets as widgets
from IPython.display import display

class VSM:
    # 1. Initialize variables for dictionary, document lengths, term frequencies, and document names
    def __init__(self):
        self.dictionary = {}
        self.doc_lengths = {}
        self.doc_term_freqs = defaultdict(dict)
        self.doc_names = {}
        self.N = 0

    def index_documents(self, documents):
        # 2. Index documents to build term frequency and dictionary with postings lists
        self.N = len(documents)
        for doc_id, (doc_name, content) in documents.items():
            self.doc_names[doc_id] = doc_name
            terms = content.lower().split()
            term_freqs = Counter(terms)
            self.doc_term_freqs[doc_id] = term_freqs
            for term, tf in term_freqs.items():
                if term not in self.dictionary:
                    self.dictionary[term] = {'df': 0, 'postings': []}
                self.dictionary[term]['df'] += 1
                self.dictionary[term]['postings'].append((doc_id, tf))

        # 3. Calculate document lengths for normalization later
        for doc_id in documents.keys():
            self.doc_lengths[doc_id] = self.calculate_doc_length(doc_id)

    def calculate_doc_length(self, doc_id):
        # 4. Calculate the length of each document using term frequency (TF) and inverse document frequency (IDF)
        length = 0
        for term, tf in self.doc_term_freqs[doc_id].items():
            if term in self.dictionary:
                df = self.dictionary[term]['df']
                idf = math.log10(self.N / df)
                tf_weight = 1 + math.log10(tf)
                length += (tf_weight * idf) ** 2
        return math.sqrt(length) if length > 0 else 1

    def process_query(self, query):
        # 5. Process the query by computing the term weights based on term frequency (TF) and inverse document frequency (IDF)
        query_terms = query.lower().split()
        term_freqs = Counter(query_terms)
        query_vector = {}
        for term, tf in term_freqs.items():
            if term in self.dictionary:
                df = self.dictionary[term]['df']
                idf = math.log10(self.N / df)
                query_vector[term] = (1 + math.log10(tf)) * idf
        return query_vector

    def rank_documents(self, query):
        # 6. Rank documents based on cosine similarity between query and documents using term weights
        query_vector = self.process_query(query)
        document_scores = defaultdict(float)
        for term, query_weight in query_vector.items():
            if term in self.dictionary:
                for doc_id, tf in self.dictionary[term]['postings']:
                    tf_weight = 1 + math.log10(tf)
                    document_scores[doc_id] += query_weight * tf_weight
        
        # 7. Normalize document scores by their length and return top 10 results
        for doc_id in document_scores:
            length = self.doc_lengths.get(doc_id, 1)
            document_scores[doc_id] /= length
        ranked_docs = sorted(document_scores.items(), key=lambda x: (-x[1], x[0]))
        return [(self.doc_names.get(doc_id, 'Unknown'), score) for doc_id, score in ranked_docs][:10]

def load_corpus_from_widget(upload_widget):
    # 8. Load corpus from uploaded text files and prepare them for indexing
    documents = {}
    for i, file_info in enumerate(upload_widget.value.values()):
        doc_name = file_info['metadata']['name']
        content = file_info['content'].decode('utf-8')
        documents[i + 1] = (doc_name, content)
    return documents

def on_button_click(b):
    # 9. Handle file upload, index documents, and run queries to rank results
    if upload_widget.value:
        documents = load_corpus_from_widget(upload_widget)
        vsm = VSM()
        vsm.index_documents(documents)
        query1 = "Developing your Zomato business account and profile is a great way to boost your restaurant’s online reputation"
        results1 = vsm.rank_documents(query1)
        print(f"\n\nQuery1: {query1}")
        print("Top documents for Query 1:")
        for doc_name, score in results1:
            print(f"{doc_name}: {score}")

        query2 = "Warwickshire, came from an ancient family and was the heiress to some land"
        results2 = vsm.rank_documents(query2)
        print(f"\n\nQuery: {query2}")
        print("\nTop documents for Query 2:")
        for doc_name, score in results2:
            print(f"{doc_name}: {score}")

        query3 = "Hewlett-Packard (HP) was founded in 1939"
        results3 = vsm.rank_documents(query3)
        print(f"\n\nQuery: {query3}")
        print("\nThis is Query to check correctness ( For my Own Use) \nTop documents for Query 3:")
        for doc_name, score in results3:
            print(f"{doc_name}: {score}")

# 10. Create file upload widget and trigger the process after files are uploaded
upload_widget = widgets.FileUpload(accept='.txt', multiple=True)
upload_widget.observe(on_button_click, names='value')
display(upload_widget)
print("\nUpload all txt Files and Wait for few second code will automatically executed")


FileUpload(value={}, accept='.txt', description='Upload', multiple=True)


Upload all txt Files and Wait for few second code will automatically executed
