In [1]:
from typing import List

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langgraph.constants import END
from langgraph.graph import StateGraph

from agents.websearchagent.prompts import SUMMARIZE_CHAT_PROMPT, QUERY_PLAN_PROMPT, SEARCH_QUERY_PROMPT, CHAT_PROMPT
from agents.websearchagent.state import WebSearchState
from llm.llm import LLMFactory
from schemas import ChatRequest, QueryPlan, StepContext, QueryPlanStep, QueryStepExecution, SingleStepResults, \
    SearchResult

from tavily import TavilyClient
from datetime import datetime

In [1]:
!pip install aiosqlite



In [2]:
async def rephrase_query_with_history_v0(
        state: WebSearchState,
        config: RunnableConfig
):
    if not state.request.history:
        return {"query": state.request.query}

    history_str = "\n".join(f"{msg.role}: {msg.content}" for msg in state.request.history)
    model_name = config["configurable"].get("model", "gpt-4o")
    llm_factory = LLMFactory()
    llm = llm_factory.get_llm_by_name(model_name)
    parser = StrOutputParser()
    prompt_template = ChatPromptTemplate.from_messages(
        [("user", SUMMARIZE_CHAT_PROMPT)]
    )
    chain = prompt_template | llm | parser
    question = await chain.ainvoke({
        'chat_history': history_str,
        'question': state.request.query
    })
    return {"query": question}


async def generate_plan_v0(state: WebSearchState, config: RunnableConfig):
    model_name = config["configurable"].get("model", "gpt-4o")
    llm = LLMFactory().get_llm_by_name(model_name)

    llm = llm.with_structured_output(QueryPlan)
    query_plan_prompt = QUERY_PLAN_PROMPT.format(query=state.query)
    plan: QueryPlan = await llm.ainvoke(query_plan_prompt)

    return {"plan": plan}


async def step_executor(state: WebSearchState, config: RunnableConfig):
    model_name = config["configurable"].get("model", "gpt-4o")
    llm = LLMFactory().get_llm_by_name(model_name)
    search_query_llm = llm.with_structured_output(QueryStepExecution)

    step: QueryPlanStep = state.plan.steps[state.current_step_idx]

    # get the context from dependencies. context is the search result
    relevant_context_list = [build_context(state.search_result_tracker[dep]) for dep in step.dependencies]
    relevant_context_str = "\n".join(relevant_context_list)

    # returns current date and time
    now = datetime.now()
    search_prompt = SEARCH_QUERY_PROMPT.format(
        user_query=state.query,
        current_step=step.step,
        prev_steps_context=relevant_context_str,
        date=str(now)
    )

    query_step_execution: QueryStepExecution = await search_query_llm.ainvoke(search_prompt)

    search_queries = query_step_execution.search_queries

    search_results = ranked_search_results_and_images_from_queries(step.step, search_queries)

    single_step_result = SingleStepResults(step=step.step, results=search_results)

    return_tracker = state.search_result_tracker + [single_step_result]

    return {"search_result_tracker": return_tracker, "current_step_idx": state.current_step_idx + 1}


def check_if_summarize(state: WebSearchState):
    if state.current_step_idx == len(state.plan.steps):
        return "chat_response"
    return "step_executor"


async def summarize_results(state: WebSearchState, config: RunnableConfig):
    model_name = config["configurable"].get("model", "gpt-4o")
    llm = LLMFactory().get_llm_by_name(model_name)

    relevant_context_list = [build_context(x) for x in state.search_result_tracker]
    relevant_context_str = "\n".join(relevant_context_list)

    prompt = CHAT_PROMPT.format(
        my_query=state.query,
        my_context=relevant_context_str
    )

    chain = llm | StrOutputParser()

    resp = await chain.ainvoke(prompt)

    return {"response": resp}


def ranked_search_results_and_images_from_queries(step: str,
                                                  queries: list[str],
                                                  ) -> List[SearchResult]:
    client = TavilyClient()

    search_results = []
    for query in queries:
        result = []
        tavily_resp = client.search(query)
        result = [SearchResult(url=x['url'], content=x['content']) for x in tavily_resp['results']]
        search_results = search_results + result

    return search_results


def build_context(step_result: SingleStepResults):
    step = step_result.step
    context = "\n".join([str(x) for x in step_result.results])

    return f"Step: {step}\n Context: {context}"


def format_step_context(context: List[StepContext]):
    return "\n".join(
        [f"Step: {step.step}\nContext: {step.context}" for step in context]
    )

In [5]:
class WebSearchAgent:
    def __init__(self):
        workflow = StateGraph(WebSearchState)
        workflow.add_node("summarize_query", rephrase_query_with_history_v0)
        workflow.add_node("generate_plan", generate_plan_v0)
        workflow.add_node("step_executor", step_executor)
        workflow.add_node("chat_response", summarize_results)

        workflow.add_edge("summarize_query", "generate_plan")
        workflow.add_edge("generate_plan", "step_executor")
        workflow.add_conditional_edges("step_executor", check_if_summarize)
        workflow.add_edge("chat_response", END)

        workflow.set_entry_point("summarize_query")

        self.graph = workflow.compile()

    def get_agent(self):
        return self.graph

In [6]:
from dotenv import load_dotenv
import json
_ = load_dotenv()
agent = WebSearchAgent().get_agent()

In [7]:
query = "News about war between Russia and ukraine"
request = ChatRequest(query=query)

In [6]:
for msg, metadata in agent.stream({"request": request}, stream_mode="messages"):
    print(metadata, end="|", flush=True)

TypeError: No synchronous function provided to "summarize_query".
Either initialize with a synchronous function or invoke via the async API (ainvoke, astream, etc.)

In [8]:
async for msg, metadata in agent.astream({"request": request}, stream_mode="messages"):
    if (
        msg.content
        and metadata["langgraph_node"] == "chat_response"
    ):
        print(msg.content, end="|", flush=True)

In [9]:
from pydantic import BaseModel, Field

def convert_pydantic_to_dict(obj):
    if isinstance(obj, BaseModel):
        return obj.dict()  # Convert Pydantic model to dict
    elif isinstance(obj, dict):
        return {k: convert_pydantic_to_dict(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_pydantic_to_dict(i) for i in obj]
    else:
        return obj

In [10]:
import pprint
async for event in agent.astream_events({"request": request}, version="v2"):
    print(json.dumps(convert_pydantic_to_dict(event), indent=4))
    print("*******************************************")
    

{
    "event": "on_chain_start",
    "data": {
        "input": {
            "request": {
                "query": "News about war between Russia and ukraine",
                "history": [],
                "llm_model_name": null
            }
        }
    },
    "name": "LangGraph",
    "tags": [],
    "run_id": "ede951cc-a1de-41dd-94f7-20c8308500d9",
    "metadata": {},
    "parent_ids": []
}
*******************************************
{
    "event": "on_chain_start",
    "data": {
        "input": {
            "request": {
                "query": "News about war between Russia and ukraine",
                "history": [],
                "llm_model_name": null
            }
        }
    },
    "name": "__start__",
    "tags": [
        "graph:step:0",
        "langsmith:hidden",
        "langsmith:hidden"
    ],
    "run_id": "95622b74-2cb2-4ea8-9bc2-52b5fe82b485",
    "metadata": {
        "langgraph_step": 0,
        "langgraph_node": "__start__",
        "langgraph_triggers":

{
    "event": "on_chain_start",
    "data": {
        "input": {
            "plan": {
                "steps": [
                    {
                        "id": 0,
                        "step": "Search for the latest news articles about the war between Russia and Ukraine",
                        "dependencies": []
                    },
                    {
                        "id": 1,
                        "step": "Identify key events and developments in the Russia-Ukraine war from the news articles",
                        "dependencies": [
                            0
                        ]
                    }
                ]
            }
        }
    },
    "name": "_write",
    "tags": [
        "seq:step:2",
        "langsmith:hidden"
    ],
    "run_id": "a441aae4-d176-499b-929b-9b0b50e2d61a",
    "metadata": {
        "langgraph_step": 2,
        "langgraph_node": "generate_plan",
        "langgraph_triggers": [
            "summarize_query"
        ]

{
    "event": "on_chain_start",
    "data": {
        "input": {
            "search_result_tracker": [
                {
                    "step": "Search for the latest news articles about the war between Russia and Ukraine",
                    "results": [
                        {
                            "url": "https://www.aljazeera.com/news/2024/11/5/russia-ukraine-war-list-of-key-events-day-984",
                            "content": "Russia-Ukraine war: List of key events, day 984 | Russia-Ukraine war News | Al Jazeera Russia-Ukraine war News|Russia-Ukraine war Russia-Ukraine war: List of key events, day 984 A tank destroyed in the course of Russia's war on Ukraine outside Donetsk, in Russian-occupied Ukraine, on November 3, 2024 [Alexander Ermochenko/Reuters] Ukrainian Minister of Foreign Affairs Andrii Sybiha said he discussed with his German counterpart, Annalena Baerbock, the \u201cneed for decisive action\u201d in response to North Korea\u2019s involvement in Russ

{
    "event": "on_chain_start",
    "data": {
        "input": {
            "search_result_tracker": [
                {
                    "step": "Search for the latest news articles about the war between Russia and Ukraine",
                    "results": [
                        {
                            "url": "https://www.aljazeera.com/news/2024/11/5/russia-ukraine-war-list-of-key-events-day-984",
                            "content": "Russia-Ukraine war: List of key events, day 984 | Russia-Ukraine war News | Al Jazeera Russia-Ukraine war News|Russia-Ukraine war Russia-Ukraine war: List of key events, day 984 A tank destroyed in the course of Russia's war on Ukraine outside Donetsk, in Russian-occupied Ukraine, on November 3, 2024 [Alexander Ermochenko/Reuters] Ukrainian Minister of Foreign Affairs Andrii Sybiha said he discussed with his German counterpart, Annalena Baerbock, the \u201cneed for decisive action\u201d in response to North Korea\u2019s involvement in Russ

{
    "event": "on_chain_start",
    "data": {
        "input": {
            "response": "# Latest Developments in the Russia-Ukraine War\n\n## Key Events and Military Actions\n\nRecent developments in the ongoing conflict between Russia and Ukraine have seen significant military engagements and geopolitical maneuvers. Ukrainian forces are reportedly holding back a powerful Russian offensive, marking one of the most intense military actions since the beginning of Russia's full-scale invasion [4]. In Kyiv, Ukrainian air defense units have been actively repelling Russian drone attacks, highlighting the ongoing aerial threat faced by the capital [6].\n\n## International Involvement and Reactions\n\nThe involvement of North Korea in the conflict has drawn international condemnation. Ukrainian Foreign Minister Andrii Sybiha, along with his German counterpart Annalena Baerbock, emphasized the need for decisive action against North Korea's support for Russia. This sentiment was echoed in a j

In [14]:
import pprint
async for event in agent.astream_events({"request": request}, version="v2"):
    # Understanding query when summarize_query node completes
    is_on_chain_end = event["event"] == "on_chain_end"
    is_graph_step = any(t.startswith("graph:step:") for t in event.get("tags", []))
    
    if is_on_chain_end and is_graph_step and event["name"] == "summarize_query":
        print("Understanding query")
    
    # Generate plan when generate_plan node starts
    is_on_chain_start = event["event"] == "on_chain_start"
    
    if is_on_chain_start and is_graph_step and event["name"] == "generate_plan":
        print("Generating plan")
    
    # Display generated plan when generate_plan node completes
    if is_on_chain_end and is_graph_step and event["name"] == "generate_plan":
        print("Generated plan is:")
        print(json.dumps(convert_pydantic_to_dict(event["data"]["output"]["plan"]), indent=4))
    
    if is_on_chain_start and is_graph_step and event["name"] == "step_executor":
        print("Thinking...")
    
    if is_on_chain_end and is_graph_step and event["name"] == "chat_response":
        print("Final Response")
        print(json.dumps(convert_pydantic_to_dict(event["data"]["output"]["response"])))
    


Understanding query
Generating plan
Generated plan is:
{
    "steps": [
        {
            "id": 0,
            "step": "Search for the latest news on the war between Russia and Ukraine",
            "dependencies": []
        },
        {
            "id": 1,
            "step": "Identify key events and developments in the Russia-Ukraine war from recent news articles",
            "dependencies": [
                0
            ]
        }
    ]
}
Thinking...
Thinking...
Final Response
"# Latest Developments in the Russia-Ukraine War\n\n## Key Events and Military Developments\n\nRecent reports indicate that the conflict between Russia and Ukraine continues to escalate with significant military engagements and geopolitical implications. Ukrainian forces have been actively defending against Russian advances, particularly in the eastern regions. Russian forces have launched multiple attacks on the Pokrovsk sector, a critical area along the 1,000-km front line in eastern Ukraine [4]. A

rephrasing
