In [None]:
!pip install  dspy-ai==2.5.0 

In [22]:
import importlib.metadata
print(importlib.metadata.version('dspy-ai'))

2.5.0


In [None]:
pip install  datasets

In [None]:
!pip install langchain-huggingface

In [None]:
!pip install "dspy-ai[chromadb]"


In [2]:
from datasets import load_dataset
import os
import json
from dotenv import load_dotenv
from openai import OpenAI

In [3]:
# Initialization

load_dotenv(override=True)

openai_api_key = os.getenv('OPENAI_API_KEY')
if openai_api_key:
    print(f"OpenAI API Key exists and begins {openai_api_key[:8]}")
else:
    print("OpenAI API Key not set")

MODEL = "gpt-4o-mini"
openai = OpenAI()


OpenAI API Key exists and begins sk-proj-


In [8]:
lm = dspy.LM('ollama_chat/llama3', api_base='http://localhost:11434', api_key='')
#lm = dspy.LM('openai/gpt-4o-mini')

In [4]:
cot = dspy.ChainOfThought('question -> response')

In [5]:
import chromadb

Define the embedding function

In [6]:
from chromadb.utils import embedding_functions
# Native Chroma way to load the same model
ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)

Define the retriever class

In [51]:
class LocalChromaRetriever(dspy.Retrieve):
    def __init__(self, collection_name, persist_path, ef, k=3):
        self.client = chromadb.PersistentClient(path=persist_path)
        self.collection = self.client.get_or_create_collection(
            name=collection_name, embedding_function=ef
        )
        self.k = k

    def forward(self, query_or_queries, k=None):
        queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries
        k = k if k else self.k

        results = self.collection.query(query_texts=queries, n_results=k)

        passages = [
            {"text": doc, "id": id_, "metadata": meta}
            for docs, ids, metas in zip(results["documents"], results["ids"], results["metadatas"])
            for doc, id_, meta in zip(docs, ids, metas)
        ]

        return dspy.Prediction(passages=passages)


In [112]:
# 1. Define the signature for generating a search query at each hop
class GenerateSearchQuery(dspy.Signature):
    """
     Generate a search query for facts NOT already in the context.
     - DO NOT assume anything without evidence  
    Write a search query that will help answer a complex question."""
    context = dspy.InputField(desc="may contain relevant facts already retrieved")
    question = dspy.InputField()
    query = dspy.OutputField(desc="search query to find missing information")

# 2. Define the signature for generating the final answer
class GenerateAnswer(dspy.Signature):
    """Answer the question based on the provided context."""
    context = dspy.InputField(desc="may contain relevant facts")
    question = dspy.InputField()
    response = dspy.OutputField(desc="often between 1 and 8 words")
    

In [113]:
# 3. Build the RAG Module
class RAG(dspy.Module):
    def __init__(self, num_passages=3,retriever=None):
        super().__init__()
        # Use the retriever you configured 
        self.retrieve = retriever or dspy.Retrieve(k=num_passages)
        self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
    
    def forward(self, question):
        context = self.retrieve(question).passages
        prediction = self.generate_answer(context=context, question=question)
        # Use 'response' instead of 'answer' for compatibility with metrics 
        return dspy.Prediction(context=context, response=prediction.response)

**Load HotPot Data**

In [13]:
# Load the dataset
hotpot = load_dataset("hotpot_qa", "fullwiki")

# Filter ONLY the train split
multihop_train = hotpot['train'].filter(lambda x: x["type"] in ["bridge", "comparison"])

singlehop_ds = hotpot['train'].filter(lambda x: x["type"] not in ["bridge", "comparison"])

print(f"Multi-hop count: {len(multihop_train)}")


# Inspect one example
print(multihop_train[0])


Filter:   0%|          | 0/90447 [00:00<?, ? examples/s]

Filter:   0%|          | 0/90447 [00:00<?, ? examples/s]

Multi-hop count: 90447
{'id': '5a7a06935542990198eaf050', 'question': "Which magazine was started first Arthur's Magazine or First for Women?", 'answer': "Arthur's Magazine", 'type': 'comparison', 'level': 'medium', 'supporting_facts': {'title': ["Arthur's Magazine", 'First for Women'], 'sent_id': [0, 0]}, 'context': {'title': ['Radio City (Indian radio station)', 'History of Albanian football', 'Echosmith', "Women's colleges in the Southern United States", 'First Arthur County Courthouse and Jail', "Arthur's Magazine", '2014–15 Ukrainian Hockey Championship', 'First for Women', 'Freeway Complex Fire', 'William Rast'], 'sentences': [["Radio City is India's first private FM radio station and was started on 3 July 2001.", ' It broadcasts on 91.1 (earlier 91.0 in most cities) megahertz from Mumbai (where it was started in 2004), Bengaluru (started first in 2001), Lucknow and New Delhi (since 2003).', ' It plays Hindi, English and regional songs.', ' It was launched in Hyderabad in March 2

In [14]:
# Build DSPy Examples with titles preserved
examples = []

# Just take the first 100 "True" Multi-hop questions
multihop_train_subset = multihop_train.select(range(100))

# Hugging Face datasets allow direct slicing to get a list of dicts 
# or you can select a range:
for ex in multihop_train_subset:   
    
    # each row is a dict
   
    titles = ex["context"]["title"]
    sentences_list = ex["context"]["sentences"]
    
    # Note: In the 'hotpot_qa' dataset specifically, 
    # context is often a dict: {"title": [...], "sentences": [[...], [...]]}
    # We need to zip them together.
    docs = [f"Title: {t}. {' '.join(s)}" for t, s in zip(titles, sentences_list)]

    example = dspy.Example(
        question=ex["question"],
        response=ex["answer"],
        context=docs
    ).with_inputs("question")

    examples.append(example)

    
print("Built", len(examples), "examples")
print("Sample Example:\n", examples[0])


Built 100 examples
Sample Example:
 Example({'question': "Which magazine was started first Arthur's Magazine or First for Women?", 'response': "Arthur's Magazine", 'context': ["Title: Radio City (Indian radio station). Radio City is India's first private FM radio station and was started on 3 July 2001.  It broadcasts on 91.1 (earlier 91.0 in most cities) megahertz from Mumbai (where it was started in 2004), Bengaluru (started first in 2001), Lucknow and New Delhi (since 2003).  It plays Hindi, English and regional songs.  It was launched in Hyderabad in March 2006, in Chennai on 7 July 2006 and in Visakhapatnam October 2007.  Radio City recently forayed into New Media in May 2008 with the launch of a music portal - PlanetRadiocity.com that offers music related news, videos, songs, and other music-related features.  The Radio station currently plays a mix of Hindi and Regional music.  Abraham Thomas is the CEO of the company.", "Title: History of Albanian football. Football in Albania e

In [15]:

# Setup persistent storage
client = chromadb.PersistentClient(path="./hotpot_chroma")
# Define the exact same embedding function used in your retriever
ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)

#  Create collection (Chroma uses Sentence Transformers by default)
try:
    client.delete_collection(name="hotpot_v1")
except:
    pass # Collection didn't exist, that's fine
collection = client.get_or_create_collection(
    name="hotpot_v1", 
    embedding_function=ef
)


# 1. Collect all docs from your 100 examples (approx 1,000 docs)
docs_to_index = {}
for ex in examples:
    for doc_text in ex.context:
        docs_to_index[doc_text] = doc_text 

small_subset_texts = list(docs_to_index.values())

# 2. Ingest ONLY this small subset (< 5 seconds on M4 Mini)
collection.add(
    documents=small_subset_texts,
    ids=[f"hp_small_{i}" for i in range(len(small_subset_texts))]
)

print(f"Ingested {len(small_subset_texts)} docs. You can now test your RAG!")

Ingested 992 docs. You can now test your RAG!


**Create a  retriever instance pointing to the   database**

In [16]:
# Create a  retriever instance pointing to the   database
hotpot_rm = LocalChromaRetriever(
    collection_name="hotpot_v1", 
    persist_path="./hotpot_chroma", 
    ef=ef
)
dspy.settings.configure(lm=lm, rm=hotpot_rm)

Explicitly accumulates all contexts (all_context.extend(...)).

Deduplication is concise and order‑preserving.

Clear separation of hops with comments.

In [92]:
class GenerateAnswer(dspy.Signature):
    """Answer the question based on the provided context."""
    context = dspy.InputField()
    question = dspy.InputField()
    answer = dspy.OutputField()

class DiagnoseQuery(dspy.Signature):
    """Analyze a failed search and provide a DIFFERENT search strategy."""
    question = dspy.InputField()
    history = dspy.InputField()
    failed_query = dspy.InputField()
    reason = dspy.InputField()
    instruction = dspy.OutputField(desc="A NEW search strategy, e.g., 'Search for aliases' or 'Search for the subject's occupation'")


class ExtractEntityRelation(dspy.Signature):
    """
    Identify the main entity and the relation type being asked about in the question.
    Works for people (spouse, nationality), products (manufacturer), places (capital), etc.
    """
    question = dspy.InputField(desc="the original user question")
    entity = dspy.OutputField(desc="the canonical entity name mentioned in the question")
    relation_type = dspy.OutputField(desc="the relation or attribute being asked about, e.g., spouse, manufacturer, capital")
    reasoning = dspy.OutputField(desc="explanation of how entity and relation were determined")

class ResolveAlias(dspy.Signature):
    """
    Resolve the canonical alias, pseudonym, or stage name for the entity in the question.
    Must explicitly return a known alias or stage name found in the retrieved context.
    """
    context = dspy.InputField(desc="retrieved passages about the entity")
    entity = dspy.InputField(desc="the entity name from the question")
    alias = dspy.OutputField(desc="the canonical alias, pseudonym, or stage name")
    reasoning = dspy.OutputField(desc="explanation of how the alias was determined")
class VerifyConnection(dspy.Signature):
    """
    Verify if the discovered relation/attribute actually belongs to the specific entity in the question.
    Works for people (spouse, nationality), products (manufacturer), places (capital city), etc.
    """
    context = dspy.InputField(desc="retrieved passages or evidence")
    entity = dspy.InputField(desc="the main entity from the question, e.g., person or product")
    relation_type = dspy.InputField(desc="the type of relation being checked, e.g., spouse, manufacturer, capital")
    candidate = dspy.InputField(desc="the discovered candidate relation/attribute")
    
    is_correct = dspy.OutputField(desc="boolean: True if candidate is correctly linked to the entity")
    reasoning = dspy.OutputField(desc="explanation of why the candidate is or is not correct")
class ExtractCandidate(dspy.Signature):
    """Extract the specific candidate that fulfills the relation_type for the entity."""
    context = dspy.InputField(desc="Retrieved research passages")
    entity = dspy.InputField(desc="The main subject (e.g., Henry , IPhone)")
    relation_type = dspy.InputField(desc="The relation to find (e.g., spouse, manufacturer)")
    candidate = dspy.OutputField(desc="The specific name or entity found, or 'None' if not found")

class FinalReasoning(dspy.Signature):
    """Answer the question. Use the verified_candidate as the absolute subject 
    and ignore any other similar names in the context."""
    context = dspy.InputField()
    question = dspy.InputField()
    verified_candidate = dspy.InputField(desc="The ONLY correct subject to answer about")
    answer = dspy.OutputField()

In [124]:
class MultiHopRetrieval(dspy.Module):
    def __init__(self, retriever, max_hops=3, num_passages=3):
        super().__init__()
        self.max_hops = max_hops
        self.retriever = retriever  
        self.num_passages = num_passages

        # Predictors with specific roles
        self.extract_er = dspy.Predict(ExtractEntityRelation)
        self.resolve_alias = dspy.ChainOfThought(ResolveAlias)
        # Force the model to generate a query even if it feels the context is 'unrelated'
        self.generate_query = dspy.ChainOfThought(
            "context, question, history, hint -> query",
            instructions="""You are an expert researcher. 
            1. If the answer is not already in the context, you MUST generate a new search query.
            2. Use the 'hint' to pivot your search if previous attempts failed.
            3. Even if you think the context is unrelated, generate a query for the Target Entity mentioned in the hint."""
        )
        self.diagnose = dspy.Predict(DiagnoseQuery) 
        self.extract_candidate = dspy.Predict(ExtractCandidate)
        self.verify = dspy.ChainOfThought(VerifyConnection)
        self.generate_answer = dspy.ChainOfThought(FinalReasoning)
        
    @staticmethod
    def is_too_similar(query, history, threshold=0.8):
        if not history: return False
        query_str = str(query).lower()
        words_new = set(query_str.split())
        for past_query in history:
            past_str = str(past_query).lower()
            words_old = set(past_str.split())
            if len(words_new & words_old) / max(len(words_new), 1) > threshold:
                return True
        return False

    def forward(self, question):
        context = []
        prev_queries = []
        hint = "Identify the entity and any potential aliases."

        # Step 0: Extract entity and relation together 
        er_pred = dspy.Predict(ExtractEntityRelation)(question=question) 
        entity = er_pred.entity 
        relation_type = er_pred.relation_type
        
        # Step 1: Initial Discovery & Alias Resolution
        # We need context first to know James Henry Miller is Ewan MacColl
        initial_passages = self.retriever(question, k=self.num_passages).passages
        context.extend(initial_passages)
        
        alias_res = self.resolve_alias(context=context, entity=entity)
        canonical_name = alias_res.alias # e.g., "Ewan MacColl"

        verified_candidate = "none"
        last_check = None 
        
        # Step 2: Multi-hop Loop using the Canonical Name
        for hop in range(self.max_hops):
            # Pass the canonical name in the hint to keep the model on track
            current_hint = f"{hint} (Target Entity: {canonical_name})"
            pred = self.generate_query(context=context, question=question, history=prev_queries, hint=current_hint)
            query = pred.query

            if query in prev_queries or self.is_too_similar(query, prev_queries):
                diagnosis = self.diagnose(question=question, history=prev_queries, failed_query=query, reason="Duplicate search")
                pred = self.generate_query(context=context, question=question, history=prev_queries, hint=diagnosis.instruction)
                query = pred.query
            
            passages = self.retriever(query, k=self.num_passages).passages
            context = list(set(context + passages))
            prev_queries.append(query)
            
            # Step 3: extract candidate and verify
            extraction = self.extract_candidate(context=context, entity=canonical_name, relation_type=relation_type)
            candidate = extraction.candidate
            
            # If we found a candidate, we can choose to stop early or keep refining
            if candidate.lower() != "none":
                check = self.verify(context=context, entity=canonical_name, relation_type=relation_type, candidate=candidate)
                
                if "true" in str(check.is_correct).lower():
                    verified_candidate = candidate
                    # SUCCESS: We break the loop because we found a verified relation
                    break 
                else:
                    # 1. Remove the rejected candidate's name from the context entirely
                    context = [doc for doc in context if candidate.lower() not in str(doc).lower()]
                    
                    # 2. Update the hint with a NEGATIVE CONSTRAINT
                    hint = (
                        f"CRITICAL: {candidate} is WRONG. Do NOT search for or mention {candidate}. "
                        f"Search for a DIFFERENT spouse for the folk singer {canonical_name}."
                    )

        # Final Handling
        if verified_candidate == "none":
            fail_msg = last_check.reasoning if last_check else "No candidate found."
            return dspy.Prediction(response=f"Not Found: {fail_msg}", context=context)

        # Step 4: Final Answer with Anchor (Fixes the Prompt 9 drift)
        res = self.generate_answer(
            context=context, 
            question=question, 
            verified_candidate=verified_candidate
        )
        
        return dspy.Prediction(response=res.answer, context=context, reasoning=check.reasoning)


In [125]:
rag_multi = MultiHopRetrieval(retriever=hotpot_rm)   
rag_single=RAG(retriever=hotpot_rm)
from dspy.evaluate import SemanticF1

metric = SemanticF1(decompositional=True)

In [126]:
dspy.settings.lm.history = []
# Assume you already built `examples` from HotpotQA
sample = examples[3]   # pick the first example, or any index

print("Question:", sample.question)
print("Gold Answer:", sample.response)

# Run it through your retriever + Baleen pipeline
prediction = rag_multi(sample.question)

print("\nPredicted Answer:", prediction.response)
print("Retrieved Context (first 3 docs):")
for doc in prediction.context[:3]:
    print("-", doc[:200], "...")


Question:  What nationality was James Henry Miller's wife?
Gold Answer: American

Predicted Answer: American
Retrieved Context (first 3 docs):
- Title: Ewan MacColl. James Henry Miller (25 January 1915 – 22 October 1989), better known by his stage name Ewan MacColl, was an English folk singer, songwriter, communist, labour activist, actor, poe ...
- Title: Alicia McCormack. Alicia McCormack (born 7 June 1983 in Helensburgh, New South Wales) is an Australian water polo goalkeeper.  Her playing career started at the age of fourteen with the Kirrawe ...
- Title: Peggy Seeger. Margaret "Peggy" Seeger (born June 17, 1935) is an American folksinger.  She is also well known in Britain, where she has lived for more than 30 years, and was married to the sing ...


In [128]:
for i, h in enumerate(dspy.settings.lm.history):
    print(f"--- Prompt {i} ---")
    print("Prompt:", h.get("prompt"))
    print("Raw Outputs:", h.get("outputs"))


--- Prompt 0 ---
Prompt: None
Raw Outputs: ['[[ ## entity ## ]]\nJames Henry Miller\n\n[[ ## relation_type ## ]]\nwife\n\n[[ ## reasoning ## ]]\nThe question mentions James Henry Miller as a specific person, indicating that he is the main entity being referred to. The phrase "What nationality was..." suggests that we are looking for an attribute or characteristic of this person\'s wife, which is the relation type being asked about.\n\n[[ ## completed ## ]]']
--- Prompt 1 ---
Prompt: None
Raw Outputs: ['[[ ## reasoning ## ]]\nThe context suggests that James Henry Miller is also known as Ewan MacColl, a stage name he used in his career as an English folk singer, songwriter, communist, labour activist, actor, poet, playwright and record producer.\n\n[[ ## alias ## ]]\nEwan MacColl\n\n[[ ## completed ## ]]']
--- Prompt 2 ---
Prompt: None
Raw Outputs: ["[[ ## reasoning ## ]]\nThe question is asking about the nationality of James Henry Miller's wife, but since Ewan MacColl (James Henry Mille

In [127]:

# Take a small slice of HotpotQA validation set
subset = multihop_train_subset.select(range(5))   # 5 examples for demo

scores_single = []
scores_multi = []

for ex in subset:
    # Wrap into DSPy Example
    example = dspy.Example(
        question=ex["question"],
        response=ex["answer"]
    ).with_inputs("question")

    # Run single-hop RAG
    pred_single = rag_single(example.question)
    score_single = metric(example, pred_single)
    scores_single.append(score_single)

    # Run multi-hop RAG
    pred_multi = rag_multi(example.question)
    score_multi = metric(example, pred_multi)
    scores_multi.append(score_multi)

print("Average SemanticF1 (Single-Hop RAG):", sum(scores_single)/len(scores_single))
print("Average SemanticF1 (Multi-Hop RAG):", sum(scores_multi)/len(scores_multi))


Average SemanticF1 (Single-Hop RAG): 0.7333333333333333
Average SemanticF1 (Multi-Hop RAG): 0.6666666666666666


Let us see the prompts for the last few results

In [None]:
dspy.settings.lm.inspect_history(n=3)

In [61]:
dspy.inspect_history(n=2)






[34m[2026-02-18T18:22:05.032975][0m

[31mSystem message:[0m

Your input fields are:
1. `context` (str): 
2. `question` (str): 
3. `history` (str): 
4. `hint` (str):
Your output fields are:
1. `reasoning` (str): 
2. `query` (str):
All interactions will be structured in the following way, with the appropriate values filled in.

[[ ## context ## ]]
{context}

[[ ## question ## ]]
{question}

[[ ## history ## ]]
{history}

[[ ## hint ## ]]
{hint}

[[ ## reasoning ## ]]
{reasoning}

[[ ## query ## ]]
{query}

[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `context`, `question`, `history`, `hint`, produce the fields `query`.


[31mUser message:[0m

[[ ## context ## ]]
[1] «Title: Ewan MacColl. James Henry Miller (25 January 1915 – 22 October 1989), better known by his stage name Ewan MacColl, was an English folk singer, songwriter, communist, labour activist, actor, poet, playwright and record producer.»
[2] «Title: Aleksander Ford

In [65]:
from dspy.teleprompt import BootstrapFewShot

# 1. Define the Optimizer
# 'max_bootstrapped_demos' tells DSPy to find 2 perfect examples to put in the prompt
optimizer = BootstrapFewShot(metric=metric, max_bootstrapped_demos=2)

# 2. Compile the Baleen Module
# It's 'training' the Librarian to search better and the Researcher to answer shorter.
compiled_baleen = optimizer.compile(rag_multi, trainset=examples[:20])

 10%|███████████████████████▍                                                                                                                                                                                                                  | 2/20 [00:00<00:04,  3.85it/s]

Bootstrapped 2 full traces after 2 examples for up to 1 rounds, amounting to 2 attempts.





In [70]:
import json

# 1. Manually extract state only from the components that have 'dump_state' working
# We target the 'generate_query' and 'answer_gen' specifically
program_state = {
    "generate_query": compiled_baleen.generate_query.dump_state(),
    "answer_gen": compiled_baleen.answer_gen.dump_state()
}

# 2. Save to JSON
with open("hotpot_baleen_compiled.json", "w") as f:
    json.dump(program_state, f, indent=4)

print("Saved Predictor states manually, bypassing the Retriever bug!")


Saved Predictor states manually, bypassing the Retriever bug!


In [71]:
# Create a fresh module
new_baleen = MultiHopBaleen(retriever=hotpot_rm)

# Load the states into the specific sub-modules
with open("hotpot_baleen_compiled.json", "r") as f:
    loaded_state = json.load(f)

new_baleen.generate_query.load_state(loaded_state["generate_query"])
new_baleen.answer_gen.load_state(loaded_state["answer_gen"])

print("Successfully re-assembled the Optimized Baleen!")


Successfully re-assembled the Optimized Baleen!


In [72]:
# Use the compiled version we just built
# If you re-loaded it, ensure you use the correct variable name
model_to_test = compiled_baleen 

scores_single = []
scores_multi_compiled = []

# Take a fresh slice of 10-20 examples to see the "Bridge" logic in action
eval_set = examples[20:40] 

for ex in eval_set:
    # 1. Setup Example
    example = dspy.Example(
        question=ex.question,
        response=ex.response
    ).with_inputs("question")

    # 2. Run Single-Hop (The Baseline)
    pred_single = rag_single(example.question)
    scores_single.append(metric(example, pred_single))

    # 3. Run Compiled Multi-Hop (The Optimized Librarian)
    pred_multi = model_to_test(example.question)
    scores_multi_compiled.append(metric(example, pred_multi))

print(f"Average SemanticF1 (Single-Hop): {sum(scores_single)/len(scores_single):.4f}")
print(f"Average SemanticF1 (Compiled Multi-Hop): {sum(scores_multi_compiled)/len(scores_multi_compiled):.4f}")


Average SemanticF1 (Single-Hop): 0.7288
Average SemanticF1 (Compiled Multi-Hop): 0.6750
