In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

from openai import OpenAI
oai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

from qdrant_client import QdrantClient
qdrant = QdrantClient(path="qdrant_local")

COLLECTION = "toy_agent_docs"
EMBED_MODEL = "text-embedding-3-small"
CHAT_MODEL  = "gpt-4o-mini"


In [2]:
def embed_one(text: str):
    return oai.embeddings.create(
        model=EMBED_MODEL,
        input=[text]
    ).data[0].embedding


In [3]:
def qdrant_similarity_search(client, collection_name, query_vector, limit=5):
    # Old API
    if hasattr(client, "search"):
        return client.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=limit,
            with_payload=True
        )
    # New API
    if hasattr(client, "query_points"):
        res = client.query_points(
            collection_name=collection_name,
            query=query_vector,
            limit=limit,
            with_payload=True
        )
        return res.points

    raise AttributeError("No compatible Qdrant search method found.")


In [4]:
def retrieve(question: str, top_k: int = 5):
    qv = embed_one(question)
    hits = qdrant_similarity_search(
        qdrant,
        collection_name=COLLECTION,
        query_vector=qv,
        limit=top_k
    )

    results = []
    for h in hits:
        payload = getattr(h, "payload", {}) or {}
        score = float(getattr(h, "score", 0.0))

        results.append({
            "score": score,
            "chunk_id": payload.get("chunk_id"),
            "doc_id": payload.get("doc_id"),
            "title": payload.get("title"),
            "text": payload.get("text"),
            "metadata": payload.get("metadata", {}),
        })
    return results


In [5]:
r = retrieve("What is hypertension?", top_k=3)
[(x["score"], x["title"], x["metadata"].get("file_name")) for x in r]


[(0.5011598937534347, 'Hypertension Basics', 'hypertension_basics.html'),
 (0.48651523947195147, 'Hypertension Basics', 'hypertension_basics.xml'),
 (0.48459389587869073, 'Hypertension Basics', 'hypertension_basics.pdf')]

In [6]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any


In [7]:
class AgentState(TypedDict):
    question: str
    retrieved: List[Dict[str, Any]]
    answer: str
    sources: List[Dict[str, Any]]


In [8]:
def retrieve_node(state: AgentState) -> AgentState:
    docs = retrieve(state["question"], top_k=5)
    return {
        **state,
        "retrieved": docs
    }


In [9]:
def generate_node(state: AgentState) -> AgentState:
    context_blocks = []
    sources = []

    for i, d in enumerate(state["retrieved"], start=1):
        meta = d["metadata"] or {}

        sources.append({
            "id": i,
            "title": d["title"],
            "file_name": meta.get("file_name"),
            "source_path": meta.get("source_path"),
            "chunk_id": d["chunk_id"],
            "score": d["score"],
            "snippet": d["text"][:250]
        })

        context_blocks.append(
            f"[{i}] {d['title']} ({meta.get('file_name')})\n{d['text']}"
        )

    prompt = f"""
You are a retrieval-grounded assistant.

RULES:
- Use ONLY the sources below.
- If the answer is not in the sources, say:
  "I don't know based on the provided documents."
- Keep the answer concise.
- Add citations like [1], [2].

QUESTION:
{state['question']}

SOURCES:
{chr(10).join(context_blocks)}
"""

    response = oai.chat.completions.create(
        model=CHAT_MODEL,
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2
    )

    answer = response.choices[0].message.content

    return {
        **state,
        "answer": answer,
        "sources": sources
    }


In [10]:
graph_builder = StateGraph(AgentState)

graph_builder.add_node("retrieve", retrieve_node)
graph_builder.add_node("generate", generate_node)

graph_builder.set_entry_point("retrieve")
graph_builder.add_edge("retrieve", "generate")
graph_builder.add_edge("generate", END)

rag_graph = graph_builder.compile()


In [11]:
result = rag_graph.invoke({
    "question": "What is an asthma action plan?",
    "retrieved": [],
    "answer": "",
    "sources": []
})

result["answer"]


'An asthma action plan is a personalized document that outlines how to manage asthma symptoms and attacks. It typically includes information on recognizing symptoms, medication usage, and steps to take during an asthma episode. The plan is designed to help individuals understand their specific asthma triggers and how to respond effectively. Diagnosis and treatment decisions should be made by licensed clinicians, as symptoms and experiences can vary from person to person [1][2][3][4][5].'

In [None]:
result["sources"][:2]


[{'id': 1,
  'title': 'Asthma Action Plan',
  'file_name': 'asthma_action_plan.xml',
  'source_path': 'docs\\asthma_action_plan.xml',
  'chunk_id': 'asthma_action_plan::0',
  'score': 0.6560706392531483,
  'snippet': 'Asthma Action Plan (Educational) 2025-12-29 demo, rag, medical, asthma Synthetic (generated for demo) generated Summary This is a fictional educational document about Asthma Action Plan (Educational). It is for software testing and does not provide m'},
 {'id': 2,
  'title': 'Asthma Action Plan',
  'file_name': 'asthma_action_plan.html',
  'source_path': 'docs\\asthma_action_plan.html',
  'chunk_id': 'asthma_action_plan::0',
  'score': 0.6246622457238499,
  'snippet': 'Asthma Action Plan (Educational) Asthma Action Plan (Educational) Fictional educational content for software testing only. Metadata created: 2025-12-29 tags: demo, rag, medical, asthma license: Synthetic (generated for demo) source: generated Summary'}]

: 