#### Ensuring Factual Correctness using Refine

In [None]:
import dspy

class FactualityJudge(dspy.Signature):
    """Determine if a statement is factually accurate."""
    statement: str = dspy.InputField()
    is_factual: bool = dspy.OutputField()

factuality_judge = dspy.ChainOfThought(FactualityJudge)

def factuality_reward(args, pred: dspy.Prediction) -> float:
    statement = pred.answer    
    result = factuality_judge(statement)    
    return 1.0 if result.is_factual else 0.0

refined_qa = dspy.Refine(
    module=dspy.ChainOfThought("question -> answer"),
    N=3,
    reward_fn=factuality_reward,
    threshold=1.0
)

result = refined_qa(question="Tell me about Belgium's capital city.")
print(result.answer)

#### Controlling Response Length using BestOfN

In [None]:
import dspy

def ideal_length_reward(args, pred: dspy.Prediction) -> float:
    """
    Reward the summary for being close to 75 words with a tapering off for longer summaries.
    """
    word_count = len(pred.summary.split())
    distance = abs(word_count - 75)
    return max(0.0, 1.0 - (distance / 125))

optimized_summarizer = dspy.BestOfN(
    module=dspy.ChainOfThought("text -> summary"),
    N=50,
    reward_fn=ideal_length_reward,
    threshold=0.9
)

result = optimized_summarizer(
    text="[Long text to summarize...]"
)
print(result.summary)

#### ReAct

In [None]:
import dspy

DOCS = {}

def search(query: str, k: int) -> list[str]:
    results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=k)
    results = [x['text'] for x in results]

    for result in results:
        title, text = result.split(" | ", 1)
        DOCS[title] = text

    return results

def search_wikipedia(query: str) -> list[str]:
    """Returns top-5 results and then the titles of the top-5 to top-30 results."""

    topK = search(query, 30)
    titles, topK = [f"`{x.split(' | ')[0]}`" for x in topK[5:30]], topK[:5]
    return topK + [f"Other retrieved pages have titles: {', '.join(titles)}."]

def lookup_wikipedia(title: str) -> str:
    """Returns the text of the Wikipedia page, if it exists."""

    if title in DOCS:
        return DOCS[title]

    results = [x for x in search(title, 10) if x.startswith(title + " | ")]
    if not results:
        return f"No Wikipedia page found for title: {title}"
    return results[0]


instructions = "Find all Wikipedia titles relevant to verifying (or refuting) the claim."
signature = dspy.Signature("claim -> titles: list[str]", instructions)
react = dspy.ReAct(signature, tools=[search_wikipedia, lookup_wikipedia], max_iters=20)

#### Multi-Hop RAG

In [None]:
import dspy

DOCS = {}

def search(query: str, k: int) -> list[str]:
    results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=k)
    results = [x['text'] for x in results]

    for result in results:
        title, text = result.split(" | ", 1)
        DOCS[title] = text

    return results

class Hop(dspy.Module):
    def __init__(self, num_docs=10, num_hops=4):
        self.num_docs, self.num_hops = num_docs, num_hops
        self.generate_query = dspy.ChainOfThought('claim, notes -> query')
        self.append_notes = dspy.ChainOfThought('claim, notes, context -> new_notes: list[str], titles: list[str]')

    def forward(self, claim: str) -> list[str]:
        notes = []
        titles = []

        for _ in range(self.num_hops):
            query = self.generate_query(claim=claim, notes=notes).query
            context = search(query, k=self.num_docs)
            prediction = self.append_notes(claim=claim, notes=notes, context=context)
            notes.extend(prediction.new_notes)
            titles.extend(prediction.titles)
        
        return dspy.Prediction(notes=notes, titles=list(set(titles)))

#### Entity Extraction

In [None]:
import dspy

class PeopleExtraction(dspy.Signature):
    """
    Extract contiguous tokens referring to specific people, if any, from a list of string tokens.
    Output a list of tokens. In other words, do not combine multiple tokens into a single value.
    """
    tokens: list[str] = dspy.InputField(desc="tokenized text")
    extracted_people: list[str] = dspy.OutputField(desc="all tokens referring to specific people extracted from the tokenized text")

people_extractor = dspy.ChainOfThought(PeopleExtraction)

#### Classification

In [None]:
import dspy
from typing import Literal

class Categorize(dspy.Signature):
    """Classify historic events."""

    event: str = dspy.InputField()
    category: Literal[
        "Wars and Conflicts",
        "Politics and Governance",
        "Science and Innovation",
        "Cultural and Artistic Movements",
        "Exploration and Discovery",
        "Economic Events",
        "Social Movements",
        "Man-Made Disasters and Accidents",
        "Natural Disasters and Climate",
        "Sports and Entertainment",
        "Famous Personalities and Achievements"
    ] = dspy.OutputField()
    confidence: float = dspy.OutputField()

classify = dspy.Predict(Categorize)

# Here is how we call this module
classification = classify(event="[YOUR HISTORIC EVENT")

### Advanced Tool Use

In [None]:
import inspect
# from func_timeout import func_set_timeout

def fn_metadata(func):
    signature = inspect.signature(func)
    docstring = inspect.getdoc(func) or "No docstring."
    return dict(function_name=func.__name__, arguments=str(signature), docstring=docstring)

def wrap_function_with_timeout(fn):
    # @func_set_timeout(10)
    def wrapper(*args, **kwargs):
        try:
            return {"return_value": fn(*args, **kwargs), "errors": None}
        except Exception as e:
            return {"return_value": None, "errors": str(e)}

    return wrapper


class Agent(dspy.Module):
    def __init__(self, max_steps=5):
        self.max_steps = max_steps
        instructions = "For the final answer, produce short (not full sentence) answers in which you format dates as YYYY-MM-DD, names as Firstname Lastname, and numbers without leading 0s."
        signature = dspy.Signature('question, trajectory, functions -> next_selected_fn, args: dict[str, Any]', instructions)
        self.react = dspy.ChainOfThought(signature)

    def forward(self, question, functions):
        tools = {fn_name: fn_metadata(fn) for fn_name, fn in functions.items()}
        trajectory = []

        for _ in range(self.max_steps):
            pred = self.react(question=question, trajectory=trajectory, functions=tools)
            selected_fn = pred.next_selected_fn.strip('"').strip("'")
            fn_output = wrap_function_with_timeout(functions[selected_fn])(**pred.args)
            trajectory.append(dict(reasoning=pred.reasoning, selected_fn=selected_fn, args=pred.args, **fn_output))

            if selected_fn == "finish":
                break

        return dspy.Prediction(answer=fn_output.get("return_value", ''), trajectory=trajectory)

#### [Tool Retriever](https://www.databricks.com/blog/optimizing-databricks-llm-pipelines-dspy)

In [None]:
import dspy

class ToolChoice(dspy.Signature):
   """Determines a tool to choose from a list of tools based on a query"""

   list_of_tools = dspy.InputField(desc="list of tools available to the agent")
   query = dspy.InputField()
   selected_tool = dspy.OutputField(desc="returns a single tool based on the query from the list_of_tools input")

class GenerateAnswer(dspy.Signature):
   """Answer questions with informative summary of an answer to user's question."""

   context = dspy.InputField(desc="may contain relevant facts")
   question = dspy.InputField()
   answer = dspy.OutputField(desc="informative summary of an answer to user's question")

class ToolRetriever(dspy.Module):
   def __init__(self):
       self.generate_query = dspy.ChainOfThought("context, question -> query")
       self.choose_tool = dspy.ChainOfThought(ToolChoice)
       self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
       self.tools = "[answer_payroll_faq, irrelevant_content]"


   def irrelevant_content(self):
       return "Ask something else."


   def forward(self, question):
       client = OpenAI(api_key=openai_api_key)
       retrieve = DatabricksRM()


       context = []
       query_output = self.generate_query(context = context, question=question)
       tool_choice = self.choose_tool(list_of_tools=self.tools, query=query_output.query)


       if tool_choice.selected_tool == "irrelevant_content":
           return self.irrelevant_content()      
       else:
           search_query_embedding = client.embeddings.create(model="text-embedding-ada-002", input=[question]).data[0].embedding
           retrieved_context = retrieve(search_query_embedding, 1)


           context += retrieved_context
           return self.generate_answer(context=context, question=question)