In [1]:
from qdrant_client import QdrantClient
from qdrant_client.http import models
import openai
import groq
from typing import List, Dict, Optional

In [2]:
from dotenv import load_dotenv
import os

load_dotenv()

True

In [3]:
client = QdrantClient(
    url="https://3973cdf9-4ba6-40b1-ae92-b2f952f82fb9.europe-west3-0.gcp.cloud.qdrant.io:6333", 
    api_key=os.getenv("QDRANT_CLOUD_API_KEY"),
)

In [4]:
client.get_collections()

CollectionsResponse(collections=[CollectionDescription(name='podcasts')])

In [5]:
openai_client = openai.Client()

openai.api_key = os.getenv("OPENAI_API_KEY")

VECTOR_SIZE = 1536  

def get_embedding(text: str) -> List[float]:
    """Get OpenAI embedding for the given text."""

    response = openai_client.embeddings.create(input=text, model="text-embedding-3-small")
    return response.data[0].embedding

In [9]:
KEYWORD_PROMPT="""
Your task is to analyse the query and identify the entities in the query.
The output must contain only the entities separated by comma and no other details. 
Do not share anything other than what you are asked to.
You must strictly follow the instruction.
only provide the keywords found and nothing else.
"""

groq_client = groq.Groq(
    api_key=os.getenv("GROQ_API_KEY"),
)

def get_entities(text: str) -> List[str]:
    """Get entities from the given text using GROQ."""
    response = groq_client.chat.completions.create(
        messages=[{"role": "system", "content": KEYWORD_PROMPT}, {"role": "user", "content": text}],
        model="llama3-8b-8192",
    )
    return response.choices[0].message.content.split(", ")

In [33]:
def hybrid_search(
    collection_name: str,
    query: str,
    limit: int = 5,
    subtopic: Optional[str] = None,
    speakers: Optional[List[str]] = None,
    title: Optional[str] = None,
    full_text_search: bool = True
) -> List[Dict]: 
    """Search for similar documents in the collection using a hybrid search approach.
    
    Args:
        collection_name: The name of the collection to search in.
        query: The query text.
        limit: The number of results to return.
        subtopic: The subtopic of the document.
        speakers: The speakers of the document.
        title: The title of the document.

    Returns:
        A list of dictionaries containing the search results.
    """

    # Get the embeddings for the query text.
    query_embedding = get_embedding(query)

    must_conditions = []
    should_conditions = []

    final_result = []

    # Metadata filtering
    if subtopic:
        must_conditions.append(models.FieldCondition(key="subtopic", match=models.MatchValue(value=subtopic)))
    if speakers:
        must_conditions.append(models.FieldCondition(key="metadata.speakers", match=models.MatchAny(any=speakers)))
    if title:
        must_conditions.append(models.FieldCondition(key="metadata.title", match=models.MatchValue(value=title)))

    # Full-text search condition
    if full_text_search == True:
        entities = get_entities(query)
        for word in entities:
            should_conditions.append(models.FieldCondition(key="content", match=models.MatchText(text=word)))

    # search with and without full-text search

    if full_text_search == True:
        search_result = client.search(
            collection_name=collection_name,
            query_vector=query_embedding,
            query_filter=models.Filter(
                must=must_conditions,
                should=should_conditions
            ),
            limit=limit,
            with_payload=True,
            score_threshold=0.0
        )
        final_result = search_result

    search_result = client.search(
        collection_name=collection_name,
        query_vector=query_embedding,
        query_filter=models.Filter(
            must=must_conditions
        ),
        limit=limit,
        with_payload=True,
        score_threshold=0.0
    )
    final_result += search_result
    
    retrieved_docs = [
        {
            "id": hit.id,
            "subtopic": hit.payload.get("subtopic"),
            "speakers": hit.payload.get("speakers"),
            "content": hit.payload.get("content"),
            "title": hit.payload.get("title"),
            "url": hit.payload.get("url"),
            "timestamp": hit.payload.get("timestamp"),
            "score": hit.score
        }
        for hit in final_result
    ]

    # remove duplicates and sort by score
    seen = set()
    unique_docs = []
    for doc in retrieved_docs:
        if doc["id"] not in seen:
            seen.add(doc["id"])
            unique_docs.append(doc)
    unique_docs = sorted(unique_docs, key=lambda x: x["score"], reverse=True)
    return unique_docs

In [34]:
results = hybrid_search(
    collection_name="podcasts",
    query="Is Israel doing the correct thing attacking Gaza?",
)

In [35]:
results

[{'id': 'd1b9f130-24d6-4878-ba77-344767d23313',
  'subtopic': 'Israel-Palestine',
  'speakers': ['Fridman', 'Destiny', 'Shapiro'],
  'content': 'Fridman: Before we go to Ukraine, can I ask about Israel? So you’re both mostly in agreement, but what is Israel? \n Destiny: I don’t know if I’d say that. \n Fridman: Okay, but as I’m learning what is Israel doing right? What is Israel doing wrong in this very specific current war in Gaza? \n Shapiro: I mean, frankly, I think that what Israel’s doing wrong is if I were Israel, again, America’s interests are not coincident with Israel’s interests. If I were an Israeli leader, I would’ve swiveled up and I would’ve knocked the bleep out of Hezbollah early. What does that mean mean? What does that mean? So I would have Yoav Galant, who is the defense minister of Israel, was encouraging Netanyahu, who’s the prime minister and the war cabinet, including Benny Gantz. People talk about the Netanyahu government. That’s not what’s in place right now. T