In [196]:
from __future__ import annotations

import difflib
import string
from functools import cached_property
from pathlib import Path

import docx
import nltk
import pandas as pd
import PyPDF2
from nltk.corpus import stopwords

# Download necessary package for nltk
nltk.download("punkt", quiet=True)
nltk.download("wordnet", quiet=True)
nltk.download("averaged_perceptron_tagger", quiet=True)
nltk.download("stopwords", quiet=True)

True

In [197]:
class Document:
    """A class representing a processed document with tokens and optional name."""

    TOTAL_DOCUMENTS = 0

    def __init__(self, tokens: list[str], name: str | None = None) -> None:  # noqa: D107
        self.id_ = self.generate_id()
        self.tokens = tokens
        self.name = name

    @classmethod
    def generate_id(cls) -> int:
        """Generate a unique ID for each document instance.

        Returns:
            int: The unique document ID.

        """
        cls.TOTAL_DOCUMENTS += 1
        return cls.TOTAL_DOCUMENTS


class Preprocessor:
    """A class for preprocessing documents for information retrieval tasks."""

    def __init__(self) -> None:  # noqa: D107
        # Getting english stop words
        self._stop_words = {*stopwords.words(fileids="english")}
        # Creating a word net lemmatizer
        # Using lemmatizer as the corpus size for this assignment is less
        self._word_net_lemmatizer = nltk.stem.WordNetLemmatizer()
        self._word_net_lemmatizer = nltk.stem.WordNetLemmatizer()

    def _tokenize(self, text: str) -> list[str]:
        return nltk.tokenize.word_tokenize(text=text, language="english")

    def _lower_case(self, text: str) -> str:
        return text.lower()

    def _is_punctuation(self, token: str) -> bool:
        return token in string.punctuation

    def _is_stop_word(self, token: str) -> bool:
        return token in self._stop_words - {"and", "not", "or"}

    def _lemmatize(self, token: str) -> str:
        return self._word_net_lemmatizer.lemmatize(token)

    def _read_text(self, document_path: Path) -> str:
        with document_path.open("r", encoding="utf-8") as fp:
            return fp.read()

    def _read_csv(self, document_path: Path) -> str:
        # Read CSV files as DataFrame and return as space separated strings without the headers
        return pd.read_csv(document_path).to_csv(index=False, header=False, sep=" ")

    def _read_excel(self, document_path: Path) -> str:
        # Read excel files and return as space separated strings without the headers
        return pd.read_excel(document_path).to_csv(index=False, header=False, sep=" ")

    def _read_pdf(self, document_path: Path) -> str:
        with document_path.open("rb") as file:
            reader = PyPDF2.PdfReader(file)
            return "\n".join(page.extract_text() or "" for page in reader.pages)

    def _read_docx(self, document_path: Path) -> str:
        doc = docx.Document(document_path)
        return "\n".join(paragraph.text for paragraph in doc.paragraphs)

    def _read_document(self, document_path: Path) -> str:
        _read_function_map = {
            ".txt": self._read_text,
            ".xlsx": self._read_excel,
            ".xls": self._read_excel,
            ".csv": self._read_csv,
            ".pdf": self._read_pdf,
            ".docx": self._read_docx,
        }
        document_extension = document_path.suffix

        if document_extension in _read_function_map:
            return _read_function_map[document_extension](document_path=document_path)
        msg = f"{document_extension} document type is not supported."
        raise TypeError(msg)

    def process_document(self, document_path: Path) -> Document:
        """Preprocess a document for indexing.

        Process the document by reading, normalizing, tokenizing, removing stopwords and punctuation,
        lemmatizing, and returning a Document object.

        Returns:
            Document: The processed Document object containing lemmatized tokens.

        """
        text = self._read_document(document_path=document_path)
        lemmatized_tokens = self.process_text(text=text)
        return Document(tokens=lemmatized_tokens, name=document_path.stem)

    def process_text(self, text: str) -> list[str]:
        """Process raw text by normalizing, tokenizing, removing stopwords and punctuation, and lemmatizing.

        Args:
            text (str): The raw text to process.

        Returns:
            list[str]: List of lemmatized tokens after preprocessing.

        """
        lower_case_text = self._lower_case(text=text)
        replace_punctuations = lower_case_text.translate(str.maketrans("", "", string.punctuation))
        tokens = self._tokenize(text=replace_punctuations)

        # Remove punctuations and stop words
        filtered_tokens = filter(
            lambda token: not (self._is_stop_word(token=token)),
            tokens,
        )

        # Lemmatized words
        return list(map(self._lemmatize, filtered_tokens))


In [198]:
from collections import defaultdict


class PositionalInvertedIndex:
    def __init__(self):
        self.__index = defaultdict(lambda: defaultdict(list))
        self.__term_frequencies = defaultdict(int)
        self.__documents = {}

    def add_document(self, document: Document):
        for position, token in enumerate(document.tokens):
            self.__index[token][document.id_].append(position)
            self.__term_frequencies[token] += 1
        self.__documents[document.id_] = document.name

    def get_posting_list(self, term: str):
        return self.__index[term]

    def get_term_frequency(self, term: str):
        return self.__term_frequencies[term]

    @property
    def documents(self) -> dict[int, str]:
        return self.__documents

    @property
    def terms(self) -> list[str]:
        return self.__index.keys()

    def __contains__(self, term: str):
        return term in self.__index

    def __getitem__(self, term: str):
        return self.__index[term]


class MedicalSearchEngine:
    def __init__(self, positional_index: PositionalInvertedIndex, preprocessor: Preprocessor):
        self.__positional_index = positional_index
        self.__preprocessor = preprocessor

    @cached_property
    def medical_terms(self) -> list[str]:
        # MeSH (Medical Subject Headings) is the NLM controlled
        # vocabulary thesaurus used for indexing articles for PubMed.
        # These terms list is generated using the terms extract for 2025 available in the site.
        terms = []
        with Path("./data/medical-terms-corpus.bin").open("r", encoding="utf-8") as f:
            for line in f:
                if line.startswith("MH = "):
                    term = line[5:].strip()
                    terms.append(term.lower())
        return terms

    def add_document(self, document_path: Path):
        document = self.__preprocessor.process_document(document_path=document_path)
        self.__positional_index.add_document(document=document)

    def documents(self):
        return self.__positional_index.documents

    def and_operator(self, left_posting_list: list[int], right_posting_list: list[int]) -> list[int]:
        return sorted(set(left_posting_list).intersection(right_posting_list))

    def or_operator(self, left_posting_list: list[int], right_posting_list: list[int]) -> list[int]:
        return sorted(set(left_posting_list).union(right_posting_list))

    def not_operator(self, posting_list: list[int]) -> list[int]:
        return sorted(set(self.__positional_index.documents) - set(posting_list))

    def optimize_query(self, terms: list[str]):
        return sorted(
            terms,
            key=lambda term: self.__positional_index.get_term_frequency(term=term),
        )

    def execute_query(self, query: str, *, apply_spell_correction: bool = True):
        original_query = query

        if apply_spell_correction:
            query = self.spelling_correct_query(query)
            if query != original_query:
                print(f"Spelling correction applied: '{original_query}' -> '{query}'")

        terms = self.__preprocessor.process_text(query)
        filtered_operator_keywords = [term for term in terms if term not in ["and", "not", "or"]]
        optimized_order_terms = self.optimize_query(filtered_operator_keywords)
        print(f"Optimized processing order: {optimized_order_terms}")

        query = query.lower()
        or_query_splits = query.split(" or ")

        and_not_results = [self.execute_and_not_query(query=or_query_split) for or_query_split in or_query_splits]

        result = set()
        for and_not_result in and_not_results:
            result = self.or_operator(result, and_not_result)

        return sorted(result)

    def execute_and_not_query(self, query: str):
        terms = self.__preprocessor.process_text(query)
        # one main assumption made is we will get only straight forward queries
        # That is after a keyword AND, OR, NOT there will always be a term
        # NOT will always precede by AND or OR
        # No nested queries in parentheses
        # A term occurs only once in a query

        result = set()
        processed_query_term_indices = []

        for index in range(len(terms)):
            if index in processed_query_term_indices:
                continue
            if index == 0:
                if terms[index] != "not":
                    result.update(self.__positional_index.get_posting_list(term=terms[index]))
                    processed_query_term_indices.append(index)
                else:
                    result.update(
                        self.not_operator(self.__positional_index.get_posting_list(term=terms[index + 1])),
                    )
                    processed_query_term_indices.append(index)
                    processed_query_term_indices.append(index + 1)
            elif terms[index] == "and":
                if terms[index + 1] != "not":
                    result = self.and_operator(
                        result,
                        self.__positional_index.get_posting_list(term=terms[index + 1]),
                    )
                    processed_query_term_indices.append(index)
                    processed_query_term_indices.append(index + 1)
                else:
                    not_result = self.not_operator(
                        self.__positional_index.get_posting_list(term=terms[index + 2]),
                    )
                    result = self.and_operator(result, not_result)
                    processed_query_term_indices.append(index)
                    processed_query_term_indices.append(index + 1)
                    processed_query_term_indices.append(index + 2)

        return sorted(result)

    def spelling_correction(self, term: str) -> str:
        # If term exists in our vocabulary, return as is
        processed_terms = self.__preprocessor.process_text(term)
        if processed_terms and processed_terms[0] in self.__positional_index:
            return term

        # Check against medical terminology
        if processed_terms:
            query_term = processed_terms[0]

            # Find closest term in our index
            index_matches = difflib.get_close_matches(
                query_term,
                self.__positional_index.terms,
                n=1,
                cutoff=0.6,
            )

            if index_matches:
                return index_matches[0]

            # Find closest medical term
            medical_matches = difflib.get_close_matches(
                query_term,
                self.medical_terms,
                n=1,
                cutoff=0.6,
            )

            if medical_matches:
                return medical_matches[0]

        return term  # Return original if no correction found

    def spelling_correct_query(self, query: str) -> str:
        tokens = query.split()
        corrected_tokens = []

        for token in tokens:
            if token.lower() in ["and", "or", "not"]:
                corrected_tokens.append(token)
            else:
                corrected_tokens.append(self.spelling_correction(token))

        return " ".join(corrected_tokens)


In [199]:
preprocessor = Preprocessor()
medical_index = PositionalInvertedIndex()

search_engine = MedicalSearchEngine(positional_index=medical_index, preprocessor=preprocessor)

In [200]:
for document_path in Path("./data/medical-documents/").iterdir():
    search_engine.add_document(document_path)

In [201]:
search_engine.execute_query("cvid-19 and vacine or not dischare")

Spelling correction applied: 'cvid-19 and vacine or not dischare' -> 'covid19 and vaccine or not discharge'
Optimized processing order: ['discharge', 'covid19', 'vaccine']


[1, 3, 4, 5, 6]

In [202]:
x = set()
x.update([1, 2, 3])
x

{1, 2, 3}