In [None]:
#hybrid_search.py
import logging
from pydantic import BaseModel, Field
from typing import List

#Define Pydantic Model for Search Query
class SearchQuery(BaseModel):
    query_text: str = Field(..., min_length=3, description="User query for searching compliance rules")
    top_n: int = Field(default=5, gt=0, description="Number of results to return")

#Define Pydantic Model for Search Results
class SearchResult(BaseModel):
    id: str
    score: float

class HybridSearch:
    """ Combines BM25 (Keyword Search) + FAISS (Vector Search) for optimized retrieval """

    def __init__(self):
        logging.basicConfig(filename="logs/service_logs.log", level=logging.INFO, format="%(asctime)s - %(message)s")
        self.bm25_search = BM25Search()
        self.vector_search = VectorSearch()

    def retrieve_documents(self, query: SearchQuery) -> List[SearchResult]:
        """ Hybrid search combining BM25 keyword search and FAISS vector search """
        try:
            #Validate input using Pydantic
            query = SearchQuery(**query.dict())

            # Run BM25 keyword search
            bm25_results = self.bm25_search.search(query.query_text, query.top_n)

            # Run Vector search (Semantic Search)
            vector_results = self.vector_search.search(query.query_text, query.top_n)

            #Combine & Rank Results (BM25 + Vector)
            combined_results = self.rank_results(bm25_results, vector_results)
            logging.info(f"Hybrid Search Results: {combined_results}")

            return combined_results

        except Exception as e:
            logging.error(f"Hybrid Search Failed: {e}")
            return []

    def rank_results(self, bm25_results: List[SearchResult], vector_results: List[SearchResult]) -> List[SearchResult]:
        """ Merges BM25 & Vector Search results using a ranking function """
        combined = {}

        #Merge BM25 & Vector scores
        for doc in bm25_results:
            combined[doc.id] = combined.get(doc.id, 0) + doc.score * 0.6  # BM25 has 60% weight
        for doc in vector_results:
            combined[doc.id] = combined.get(doc.id, 0) + doc.score * 0.4  # Vector has 40% weight

        # Sort by highest ranking score
        sorted_results = sorted(combined.items(), key=lambda x: x[1], reverse=True)
        return [SearchResult(id=doc_id, score=score) for doc_id, score in sorted_results]


In [None]:
pip install rank_bm25



In [None]:
#keyword_search.py
import logging
from pydantic import BaseModel
from typing import List
import rank_bm25


# Define Pydantic Model for Compliance Documents
class ComplianceDocument(BaseModel):
    id: str
    text: str

# Define Pydantic Model for Search Results
class BM25SearchResult(BaseModel):
    id: str
    score: float

class BM25Search:
    """ BM25 Keyword Search for Compliance Documents """

    def __init__(self):
        logging.basicConfig(filename="logs/service_logs.log", level=logging.INFO, format="%(asctime)s - %(message)s")
        self.documents, self.corpus, self.bm25 = self.load_documents()

    def load_documents(self) -> List[ComplianceDocument]:
        """ Loads all compliance documents into BM25 search model """
        try:
            documents = [ComplianceDocument(**doc) for doc in get_all_documents()]
            corpus = [doc.text.split() for doc in documents]  # Tokenize text
            bm25 = rank_bm25.BM25Okapi(corpus)
            logging.info(f"BM25 Index Loaded with {len(documents)} Documents")
            return documents, corpus, bm25

        except Exception as e:
            logging.error(f"BM25 Index Load Failed: {e}")
            return [], [], None

    def search(self, query_text: str, top_n: int = 5) -> List[BM25SearchResult]:
        """ Searches BM25 index and returns top matching documents """
        try:
            query_tokens = query_text.split()
            scores = self.bm25.get_scores(query_tokens)
            ranked_results = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)[:top_n]

            return [BM25SearchResult(id=self.documents[i].id, score=score) for i, score in ranked_results]

        except Exception as e:
            logging.error(f"BM25 Search Failed: {e}")
            return []


In [None]:
pip install faiss-cpu

Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl (30.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/30.7 MB[0m [31m20.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.10.0


In [None]:
import logging
import faiss
import numpy as np

class VectorSearch:
    """ FAISS / ChromaDB-based Semantic Search """

    def __init__(self):
        logging.basicConfig(filename="logs/service_logs.log", level=logging.INFO, format="%(asctime)s - %(message)s")
        self.model = SentenceTransformer("all-MiniLM-L6-v2")  #Lightweight transformer model for embeddings
        self.index, self.document_map = self.load_faiss_index()

    def load_faiss_index(self):
        """ Loads FAISS Index with Pre-Encoded Compliance Rules """
        try:
            documents = get_all_vectors()
            embeddings = np.array([doc["vector"] for doc in documents]).astype("float32")

            index = faiss.IndexFlatL2(embeddings.shape[1])  #L2 Distance for Nearest Neighbor Search
            index.add(embeddings)

            logging.info(f"FAISS Index Loaded with {len(documents)} Documents")
            return index, {i: doc["id"] for i, doc in enumerate(documents)}

        except Exception as e:
            logging.error(f"FAISS Index Load Failed: {e}")
            return None, {}

    def search(self, query_text, top_n=5):
        """ Searches FAISS for Semantic Matches """
        try:
            query_vector = self.model.encode(query_text).astype("float32")
            distances, indices = self.index.search(np.array([query_vector]), top_n)

            return [{"id": self.document_map[i], "score": 1 - distances[0][j]} for j, i in enumerate(indices[0])]

        except Exception as e:
            logging.error(f"FAISS Search Failed: {e}")
            return []
