In [11]:
import sys, os
SRC_PATH = os.path.abspath(os.path.join(os.getcwd(), "..",".."))
if SRC_PATH not in sys.path:
    sys.path.insert(0, SRC_PATH)
print(SRC_PATH)

/home/prashant-agrawal/projects/netflix_talk2data/src


In [10]:
# üöÄ Import your utility loaders
from utils.qdrant_client_loader import get_qdrant_collection_name
from utils.path_config import get_base_dir, get_data_path, get_qdrant_store_path, get_schema_path

# %% üìÅ Paths
BASE_DIR = get_base_dir()
DATA_PATH = get_data_path()
SCHEMA_OUTPUT_PATH = get_schema_path()
qdrant_store_path = get_qdrant_store_path()
COLLECTION_NAME = get_qdrant_collection_name()

print(f"üìå Base Dir: {BASE_DIR}")
print(f"üìå CSV Path: {DATA_PATH}")
print(f"üìå Qdrant Local Path: {qdrant_store_path}")
print(f"üìå Collection Name: {COLLECTION_NAME}")
print(f"üìå Schema Path: {SCHEMA_OUTPUT_PATH}")

üìå Base Dir: /home/prashant-agrawal/projects/netflix_talk2data/src
üìå CSV Path: /home/prashant-agrawal/projects/netflix_talk2data/src/Data/Enriched_Indian_Startup_Dataset.csv
üìå Qdrant Local Path: /home/prashant-agrawal/projects/netflix_talk2data/src/database/qdrant_store_local_db/collection
üìå Collection Name: indian_startups
üìå Schema Path: /home/prashant-agrawal/projects/netflix_talk2data/src/schema/payload_schema.json


In [12]:
# --- Utility: Normalization ---
def normalize_field_name(field: str) -> str:
    return (
        field.strip().lower()
        .replace(" ", "_").replace("(", "").replace(")", "")
        .replace("/", "_")
    )

def normalize_field_value(value) -> str:
    return str(value).strip().lower()

In [13]:
# src/tools/qdrant_tool.py

import re
from typing import List, Dict, Any, Union

from qdrant_client import QdrantClient
from qdrant_client.http.models import FieldCondition, MatchValue, Range, Filter
from langchain_openai import OpenAIEmbeddings


class QdrantSearchTool:
    """
    Tool for performing hybrid semantic + metadata searches against a Qdrant collection.
    """

    def __init__(
        self,
        host: str,
        port: int,
        collection_name: str,
        embedding_model: OpenAIEmbeddings,
    ):
        self.client = QdrantClient(host=host, port=port)
        self.collection = collection_name
        self.embedding_model = embedding_model

    @staticmethod
    def _normalize_field_name(field: str) -> str:
        f = field.strip().lower()
        f = re.sub(r"[ ()/]", "_", f)
        return re.sub(r"[^a-z0-9_]", "", f)

    @staticmethod
    def _normalize_field_value(value: Any) -> str:
        return str(value).strip().lower()

    def _build_filter(self, filters: Dict[str, Union[str, int, float, Dict[str, Any]]]) -> Filter:
        """
        Convert a user-provided dict of filters into a Qdrant Filter object.
        Supports:
          - exact match: {"state": "delhi"}
          - range match: {"year_founded": {"gte": 2000, "lte": 2010}}
        """
        conditions = []
        for raw_field, cond in filters.items():
            key = self._normalize_field_name(raw_field)

            if isinstance(cond, dict) and ("gte" in cond or "lte" in cond):
                conditions.append(
                    FieldCondition(
                        key=key,
                        range=Range(gte=cond.get("gte"), lte=cond.get("lte")),
                    )
                )
            else:
                val = self._normalize_field_value(cond)
                conditions.append(
                    FieldCondition(key=key, match=MatchValue(value=val))
                )

        return Filter(must=conditions)

    def search(
        self,
        query: str,
        filters: Dict[str, Union[str, int, float, Dict[str, Any]]] = None,
        k: int = 5,
    ) -> List[Dict[str, Any]]:
        """
        Perform a similarity search with optional metadata filtering.
        Returns a list of dicts: { "id", "score", "payload" }.
        """
        # 1. Embed the query
        vector = self.embedding_model.embed_query(query)

        # 2. Build Qdrant filter if provided
        q_filter = self._build_filter(filters) if filters else None

        # 3. Execute search
        results = self.client.search(
            collection_name=self.collection,
            query_vector=vector,
            query_filter=q_filter,
            limit=k,
            with_payload=True,
        )

        # 4. Format output
        output = []
        for pt in results:
            output.append({
                "id": pt.id,
                "score": pt.score,
                "payload": pt.payload,
            })
        return output


In [15]:
# 4Ô∏è‚É£ Instantiate once
embedding_model = OpenAIEmbeddings()
tool = QdrantSearchTool(
    host="localhost",
    port=6333,
    collection_name=COLLECTION_NAME,
    embedding_model=embedding_model
)

# 5Ô∏è‚É£ Test functions
def test_semantic():
    print("üîç Test: pure semantic (no filters)")
    for r in tool.search(query="emerging fintech startups", k=3):
        print(f" ‚Ä¢ [{r['score']:.4f}] {r['payload'].get('company_name')}")

def test_metadata():
    print("üîç Test: metadata-only filter state=delhi")
    for r in tool.search(query="", filters={"state": "delhi"}, k=5):
        print(f" ‚Ä¢ {r['payload']['company_name']} (state={r['payload']['state']})")

def test_range():
    print("üîç Test: range filter year_founded in [2000,2010]")
    for r in tool.search(
        query="",
        filters={"year_founded": {"gte": 2000, "lte": 2010}},
        k=5
    ):
        print(f" ‚Ä¢ {r['payload']['company_name']} (founded={r['payload']['year_founded']})")

# 6Ô∏è‚É£ Run all tests
test_semantic()
print()
test_metadata()
print()
test_range()


üîç Test: pure semantic (no filters)


  results = self.client.search(


 ‚Ä¢ [0.7726] boat
 ‚Ä¢ [0.7726] None
 ‚Ä¢ [0.7726] None

üîç Test: metadata-only filter state=delhi
 ‚Ä¢ cred (state=delhi)
 ‚Ä¢ curefit (state=delhi)
 ‚Ä¢ tork motors (state=delhi)
 ‚Ä¢ lenskart (state=delhi)
 ‚Ä¢ yulu (state=delhi)

üîç Test: range filter year_founded in [2000,2010]
