# Installs

In [0]:
%pip install unitycatalog-ai[databricks] unitycatalog-langchain[databricks] langgraph==0.2.74 databricks-langchain==0.3.0

In [0]:
dbutils.library.restartPython()

In [0]:
import os
import requests
from typing import List

from databricks_langchain import ChatDatabricks
from langchain_core.prompts import ChatPromptTemplate
from langgraph.prebuilt import create_react_agent

# Create The Web Search Tool

## Get our tool from Unity Catalog!

In [0]:
from unitycatalog.ai.core.databricks import DatabricksFunctionClient
client = DatabricksFunctionClient()

In [0]:
CATALOG = "advancing_ai"
SCHEMA = "agentic-tools"

In [0]:
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit

function_name = f"{CATALOG}.{SCHEMA}.execute_bing_web_search"

uc_toolkit = UCFunctionToolkit(function_names=[function_name], client=client)

# Create LLM + ReAct Agent

In [0]:
llm = ChatDatabricks(
    endpoint= "gpt-4o"
)

In [0]:
prompt = "You are an AI agent who answers questions. You have access to a web search tool if you need to find out any information. You do not have to use the tools available to you if you are certain of the answer. The current year is 2025"

In [0]:
agent_executor = create_react_agent(llm, uc_toolkit.tools, prompt=prompt)

In [0]:
result = agent_executor.invoke({"messages": [("user", "Who is the current Prime Minister of the United Kingdom?")]})

# Let's Get Planning!

In [0]:
from pydantic import BaseModel, Field
from typing import List

class Plan(BaseModel):
    """Plan to follow in future"""
    
    # A list of steps to follow (in order)
    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

In [0]:
planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """
            For the given objective, come up with a simple step by step plan. 
            This plan should involve individual tasks, that if executed correctly will yield the correct answer. 
            Do not add any superfluous steps. 
            The result of the final step should be the final answer. Make sure that each step has all the information needed.
            Do not skip steps.
            """,
        ),
        (
            "placeholder", 
            "{messages}"
        ),
    ]
)

# Use a structured output to ensure consistency
# This basically forces the LLM to output its response in the same Pydantic structure every time
planner = planner_prompt | llm.with_structured_output(Plan)

In [0]:
# Visualise what our plan ends up looking like
generated_plan = planner.invoke(
    {
        "messages": [
            (
                "user", 
                """
                Who is older in 2025? The Prime Minister of the UK, or the President of the USA?
                """
            )
        ]
    }
)

In [0]:
from typing import Union

# Enforce the response as a string (unlikely it would be anything else, but good to be safe!)
class Response(BaseModel):
    """Response to user."""

    response: str

# Create a structure for a plan - combine the current information with the next step of the plan.
class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )


In [0]:
replanner_prompt = ChatPromptTemplate.from_template(
    """
    For the given objective, come up with a simple step by step plan. 
    This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. 
    The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

    Your objective was this:
    {input}

    Your original plan was this:
    {plan}

    You have currently done the follow steps:
    {past_steps}

    Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. 
    Only add steps to the plan that still NEED to be done. 
    Encourage the use of tools as part of the plan.
    """
)


replanner = replanner_prompt | llm.with_structured_output(Act)

# Graph The Planner

In [0]:
import operator
from typing import Annotated, Tuple
from typing_extensions import TypedDict

class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

In [0]:
from typing import Literal
from langgraph.graph import END

async def execute_step(state: PlanExecute):
    # Get our current plan
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]

    # Tell the llm what the plan is, and what the next step is
    task_formatted = f"""
    For the following plan:
    {plan_str}\n
    You are tasked with executing step {1}, {task}.
    """
    
    # Get the llm to execute this stage of the plan
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )

    # Return the past steps of the plan
    return {
        "past_steps": [(task, agent_response["messages"][-1].content)],
    }

# Ask the planner LLM to create a plan given the input - only called once
async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}

# Ask the replanner to take the current plan and outputs of the plan to refine next steps
async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}

# Check if we have reached the end
def should_end(state: PlanExecute):
    if "response" in state and state["response"]:
        return END
    else:
        return "execute_step"

In [0]:
from langgraph.graph import StateGraph, START

workflow = StateGraph(PlanExecute)

# Add the plan node
workflow.add_node("plan_step", plan_step)

# Add the execution step
workflow.add_node("execute_step", execute_step)

# Add a replan node
workflow.add_node("replan_step", replan_step)

workflow.add_edge(START, "plan_step")

# From plan we go to agent
workflow.add_edge("plan_step", "execute_step")

# From agent, we replan
workflow.add_edge("execute_step", "replan_step")

workflow.add_conditional_edges(
    "replan_step",
    # Next, we pass in the function that will determine which node is called next.
    should_end,
    ["execute_step", END],
)

# Finally, we compile it!
# This compiles it into a LangChain Runnable,
# meaning you can use it as you would any other runnable
app = workflow.compile()

In [0]:
from IPython.display import Image, display

display(Image(app.get_graph(xray=True).draw_mermaid_png()))

In [0]:
config = {"recursion_limit": 50}
inputs = {"input": "Who is older in 2025? The Prime Minister of the UK, or the President of the USA?"}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)

In [0]:
config = {"recursion_limit": 50}
inputs = {"input": "Would I survive falling off the world's tallest tree?"}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)

In [0]:
result = await app.ainvoke(inputs, config=config)

In [0]:
result.get("response")