In [None]:
import operator
from typing import List, Tuple, Annotated, TypedDict, Union, Literal
from langchain import hub
from langchain.tools import tool
from langchain_groq import ChatGroq
from langchain_community.chat_models import ChatOllama
from langchain.tools import tool
from duckduckgo_search import DDGS
from langchain_community.utilities import DuckDuckGoSearchAPIWrapper
from langgraph.prebuilt import create_react_agent
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph

In [None]:
#tools
#wrapper = DuckDuckGoSearchAPIWrapper(max_results=20)
#web_search_tool = DuckDuckGoSearchRun(api_wrapper=wrapper)
#web_search_tool = DuckDuckGoSearchResults(api_wrapper=wrapper)

@tool("search_duckduckgo", return_direct=True)
def search_duckduckgo(query: str, num_results: int = 5, content: bool = True):
    ''' Provides search results from duckduckgo '''
    results = DDGS().text(query, max_results=num_results)
    if content:
        formatted_results = [{"url": result['href'], "content": result['body']} for result in results]
    else:
        formatted_results = [{"url": result['href']} for result in results]
    return formatted_results


tools = [search_duckduckgo]

# Prompt style
prompt = hub.pull("wfh/react-agent-executor")
prompt.pretty_print()

# Creating an Agent Executor with tools
groq_api_key = "ADD_API_KEY_HERE"
llm = ChatGroq(temperature=0, groq_api_key=groq_api_key, model_name="llama3-70b-8192")
#llm = ChatOllama(model = "llama3", temperature=0)
agent_executor = create_react_agent(llm, tools, messages_modifier=prompt)

In [None]:
#agent_executor.invoke({"messages": [("user", "Give me a list of G20 Countries")]})

In [None]:
# Defining the State
class PlanExecute(TypedDict):
    input: str
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

In [None]:
# Defining the planning step
class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

planner_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps. Break down
the given objective into simple tasks. For example: If i ask for the hometown of the current us open winner, the plan would be to 1. Find who the current
winner of US open, once you have the name 2. Find the home town""",
        ),
        ("placeholder", "{messages}"),
    ]
)
planner = planner_prompt | ChatGroq(temperature=0, groq_api_key=groq_api_key, model_name="llama3-70b-8192").with_structured_output(Plan)

# planner.invoke(
#     {
#         "messages": [
#             ("user", "Compile table of electricity consumption per capita for g20 nations")
#         ]
#     }
# )

In [None]:
# Defining the re-planing step
class Response(BaseModel):
    """Response to user."""

    response: str


class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )


replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.Break down
the given objective into simple tasks. For example: If i ask for the hometown of the current us open winner, the plan would be to 1. Find who the current
winner of US open, once you have the name 2. Find the home town

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)

replanner = replanner_prompt | ChatGroq(temperature=0, groq_api_key=groq_api_key, model_name="llama3-70b-8192").with_structured_output(Act)

In [None]:
# Defining table_agent 
## (This is a custom output structure for the prompt "Compile table of electricity consumption per capita for g20 nations")
class Table(BaseModel):
    country: str
    energy_consumption_per_capita : int
    source_url: str

table_agent_prompt = ChatPromptTemplate.from_template(
    """{response}"""
)

table_agent_chain = table_agent_prompt | ChatGroq(temperature=0, groq_api_key=groq_api_key, model_name="llama3-70b-8192").with_structured_output(Table)

In [None]:
# Create the Graph
async def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step.{1}, {task}. \n\nYou only have one tool: search_duckduckgo, use it when needed. \n\nProvide sources with urls """
    agent_response = await agent_executor.ainvoke(
        {"messages": [("user", task_formatted)]}
    )
    return {
        "past_steps": (task, agent_response["messages"][-1].content),
    }

async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}


async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}


def should_end(state: PlanExecute) -> Literal["agent", "table_agent"]:
    if "response" in state and state["response"]:
        return "table_agent"
    else:
        return "agent"

async def table_agent(state: PlanExecute):
    table = await table_agent_chain.ainvoke(state)
    return table

In [None]:
workflow = StateGraph(PlanExecute)

# Adding the plan node
workflow.add_node("planner", plan_step)

# Adding the execution step
workflow.add_node("agent", execute_step)

# Adding a replan node
workflow.add_node("replan", replan_step)

workflow.set_entry_point("planner")

# From plan to agent
workflow.add_edge("planner", "agent")

# From agent, to replan
workflow.add_edge("agent", "replan")

workflow.add_conditional_edges(
    "replan",
    # Pass in the function that will determine which node is called next.
    should_end,
)

# Creating table agent node
workflow.add_node("table_agent", table_agent)

workflow.set_finish_point("table_agent")

# From replan to table agent
workflow.add_edge("replan", "table_agent")

app = workflow.compile()

In [None]:
config = {"recursion_limit": 50}
inputs = {"input": "Compile table of electricity consumption per capita for g20 nations"}
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        print(v)