In [87]:
from llama_index.llms.google_genai import GoogleGenAI
from dotenv import load_dotenv

load_dotenv()

llm = GoogleGenAI(model="gemini-2.0-flash")

Python-dotenv could not parse statement starting at line 16
Python-dotenv could not parse statement starting at line 18


In [88]:
import json
import os
from tavily import AsyncTavilyClient
from llama_index.core.workflow import Context
from urllib.parse import urlparse
from llama_index.vector_stores.postgres import PGVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import VectorStoreIndex, StorageContext

async def search(query: str) -> str:
    """Useful for using the knowledge base to answer questions."""

    # Get Postgres credentials from connection string
    pg_url = urlparse(os.getenv("PG_CONN_STR"))
    host = pg_url.hostname
    port = pg_url.port
    database = pg_url.path[1:]
    user = pg_url.username
    password = pg_url.password

    # Vector store to store chunks + embeddings in

    vector_store = PGVectorStore.from_params(
        host=host,
        port=port,
        database=database,
        user=user,
        password=password,
        table_name="llamaindex_docs",
        schema_name="public",
        hybrid_search=True,
        embed_dim=768,
        hnsw_kwargs={
            "hnsw_m": 16,
            "hnsw_ef_construction": 64,
            "hnsw_ef_search": 40,
            "hnsw_dist_method": "vector_cosine_ops",
        },
    )

    index = VectorStoreIndex.from_vector_store(
        vector_store=vector_store,
        embed_model=HuggingFaceEmbedding(
            model_name="sentence-transformers/all-mpnet-base-v2"
        ),
    )

    text_retriever = index.as_retriever(
        vector_store_query_mode="sparse", similarity_top_k=10
    )

    results = text_retriever.retrieve(query)

    return json.dumps([{"doc_id": node.node_id[:8], "content": node.text} for node in results], indent=2)


async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
    """Useful for recording notes on a given topic. Your input should be notes with a title to save the notes under. Any information you record should include an inline citation of the document id in the form [doc_id]."""
    current_state = await ctx.get("state")
    if "research_notes" not in current_state:
        current_state["research_notes"] = {}
    current_state["research_notes"][notes_title] = notes
    await ctx.set("state", current_state)
    return "Notes recorded."


async def write_report(ctx: Context, report_content: str) -> str:
    """Useful for writing a report on a given topic. Your input should be a markdown formatted report."""
    current_state = await ctx.get("state")
    current_state["report_content"] = report_content
    await ctx.set("state", current_state)
    return "Report written."


async def review_report(ctx: Context, review: str) -> str:
    """Useful for reviewing a report and providing feedback. Your input should be a review of the report."""
    current_state = await ctx.get("state")
    print("Current report:", current_state["report_content"])
    current_state["review"] = review
    await ctx.set("state", current_state)
    return "Report reviewed."

In [98]:
from llama_index.core.agent.workflow import FunctionAgent, ReActAgent

research_agent = ReActAgent(
    name="ResearchAgent",
    description="Useful for searching the web for information on a given topic and recording notes on the topic.",
    system_prompt=(
        "You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
        "Once notes are recorded and you are satisfied, you should hand off control to the WriteAgent to write a report on the topic. "
        "You should have at least some notes on a topic before handing off control to the WriteAgent."
    ),
    llm=llm,
    tools=[search, record_notes],
    can_handoff_to=["WriteAgent"],
)

write_agent = ReActAgent(
    name="WriteAgent",
    description="Useful for writing a report on a given topic.",
    system_prompt=(
        "You are the WriteAgent that can write a report on a given topic. "
        "Your report should be in a markdown format. The content should be grounded in the research notes. "
        "The notes include document citations for each piece of information. Copy these citations exactly. Make sure to cite every piece of information you cite."
        "ALWAYS use the write_report tool first to write the report out."
        "Once the report is written, ALWAYS get feedback at least once from the ReviewAgent."
    ),
    llm=llm,
    tools=[write_report],
    can_handoff_to=["ReviewAgent", "ResearchAgent"],
)

review_agent = FunctionAgent(
    name="ReviewAgent",
    description="Useful for reviewing a report and providing feedback.",
    system_prompt=(
        "You are the ReviewAgent that can review the write report and provide feedback. "
        "First, repeat what you have been prompted word for word."
        "Your review should either approve the current report or request changes for the WriteAgent to implement. "
        "If you have feedback that requires changes, you should hand off control to the WriteAgent to implement the changes after submitting the review."
    ),
    llm=llm,
    tools=[review_report],
    can_handoff_to=["WriteAgent"],
)

In [99]:
from llama_index.core.agent.workflow import AgentWorkflow

agent_workflow = AgentWorkflow(
    agents=[research_agent, write_agent, review_agent],
    root_agent=research_agent.name,
    initial_state={
        "research_notes": {},
        "report_content": "Not written yet.",
        "review": "Review required.",
    },
)

In [100]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)
from llama_index.core.workflow.handler import WorkflowHandler

handler = agent_workflow.run(
    user_msg=(
        "Can you summarize the key findings of “USING BIOLOGICAL PARAMETERS AND TRADITIONAL ECOLOGICAL KNOWLEDGE TO STUDY THE STATUS OF Spirinchus starksi (NIGHT SMELT) IN NORTHERN CALIFORNIA” by Z Zenobia?"
    )
)

current_agent = None
current_tool_calls = ""
async for event in handler.stream_events():
    if (
        hasattr(event, "current_agent_name")
        and event.current_agent_name != current_agent
    ):
        current_agent = event.current_agent_name
        print(f"\n{'='*50}")
        print(f"🤖 Agent: {current_agent}")
        print(f"{'='*50}\n")

    # if isinstance(event, AgentStream):
    #     if event.delta:
    #         print(event.delta, end="", flush=True)
    # elif isinstance(event, AgentInput):
    #     print("📥 Input:", event.input)
    elif isinstance(event, AgentOutput):
        if event.response.content:
            print("📤 Output:", event.response.content)
        if event.tool_calls:
            print(
                "🛠️  Planning to use tools:",
                [call.tool_name for call in event.tool_calls],
            )
    elif isinstance(event, ToolCallResult):
        print(f"🔧 Tool Result ({event.tool_name}):")
        print(f"  Arguments: {event.tool_kwargs}")
        print(f"  Output: {event.tool_output}")
    elif isinstance(event, ToolCall):
        print(f"🔨 Calling Tool: {event.tool_name}")
        print(f"  With arguments: {event.tool_kwargs}")


🤖 Agent: ResearchAgent

📤 Output: Thought: The current language of the user is: English. I need to use the search tool to find the key findings of the research paper and then record the findings in my notes.
Action: search
Action Input: {"query": "key findings of “USING BIOLOGICAL PARAMETERS AND TRADITIONAL ECOLOGICAL KNOWLEDGE TO STUDY THE STATUS OF Spirinchus starksi (NIGHT SMELT) IN NORTHERN CALIFORNIA” by Z Zenobia"}

🛠️  Planning to use tools: ['search']
🔨 Calling Tool: search
  With arguments: {'query': 'key findings of “USING BIOLOGICAL PARAMETERS AND TRADITIONAL ECOLOGICAL KNOWLEDGE TO STUDY THE STATUS OF Spirinchus starksi (NIGHT SMELT) IN NORTHERN CALIFORNIA” by Z Zenobia'}
🔧 Tool Result (search):
  Arguments: {'query': 'key findings of “USING BIOLOGICAL PARAMETERS AND TRADITIONAL ECOLOGICAL KNOWLEDGE TO STUDY THE STATUS OF Spirinchus starksi (NIGHT SMELT) IN NORTHERN CALIFORNIA” by Z Zenobia'}
  Output: [
  {
    "doc_id": "7c3786b6",
    "content": "## USING BIOLOGICAL PAR

In [93]:
state = await handler.ctx.get("state")
print(state["report_content"])

## Key Findings of Spirinchus starksi Study

The research used biological parameters and traditional ecological knowledge to study the status of the night smelt population [0af188b1]. The study aimed to understand the Indigenous role of smelt as a long-term dietary and subsistence species [b1eb1405]. The objectives included comparing the abundance, length, weight, age, sex ratio, and timing of reproduction of the night smelt population in Humboldt and Del Norte counties during 2014 and 2021 [98c16ca4]. The study also quantified their larvae in the nearshore ocean since 2007, analyzed commercial catches in Humboldt County during the last 20 years, and used traditional and local ecological knowledge to explore what significance night smelt hold for the Tribal nations [98c16ca4].


In [None]:
import logging
import os
import uuid
from typing import List, Literal, Optional

from llama_index.core.base.llms.types import (
    CompletionResponse,
    CompletionResponseAsyncGen,
)
from llama_index.core.indices.base import BaseIndex
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.memory.simple_composable_memory import SimpleComposableMemory
from llama_index.core.prompts import PromptTemplate
from llama_index.core.schema import MetadataMode, Node, NodeWithScore
from llama_index.core.settings import Settings
from llama_index.core.types import ChatMessage, MessageRole
from llama_index.core.workflow import (
    Context,
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from pydantic import BaseModel, Field

Settings.llm = GoogleGenAI(model="gemini-2.0-flash")

logger = logging.getLogger()
logger.setLevel(logging.INFO)


# Workflow events
class PlanResearchEvent(Event):
    pass


class ResearchEvent(Event):
    question_id: str
    question: str
    context_nodes: List[NodeWithScore]


class CollectAnswersEvent(Event):
    question_id: str
    question: str
    answer: str


class ReportEvent(Event):
    pass


# Events that are streamed to the frontend and rendered there
class DeepResearchEventData(BaseModel):
    event: Literal["retrieve", "analyze", "answer"]
    state: Literal["pending", "inprogress", "done", "error"]
    id: Optional[str] = None
    question: Optional[str] = None
    answer: Optional[str] = None


class DataEvent(Event):
    type: Literal["deep_research_event"]
    data: DeepResearchEventData

    def to_response(self):
        return self.model_dump()


class DeepResearchWorkflow(Workflow):
    """
    A workflow to research and analyze documents from multiple perspectives and write a comprehensive report.

    Requirements:
    - An indexed documents containing the knowledge base related to the topic

    Steps:
    1. Retrieve information from the knowledge base
    2. Analyze the retrieved information and provide questions for answering
    3. Answer the questions
    4. Write the report based on the research results
    """

    memory: SimpleComposableMemory
    context_nodes: List[Node]
    index: BaseIndex
    user_request: str
    stream: bool = True

    def __init__(
        self,
        index: BaseIndex,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.index = index
        self.context_nodes = []
        self.memory = SimpleComposableMemory.from_defaults(
            primary_memory=ChatMemoryBuffer.from_defaults(),
        )

    @step
    async def retrieve(self, ctx: Context, ev: StartEvent) -> PlanResearchEvent:
        """
        Initiate the workflow: memory, tools, agent
        """
        self.stream = ev.get("stream", True)
        self.user_request = ev.get("user_msg")
        chat_history = ev.get("chat_history")
        if chat_history is not None:
            self.memory.put_messages(chat_history)

        await ctx.set("total_questions", 0)

        # Add user message to memory
        self.memory.put_messages(
            messages=[
                ChatMessage(
                    role=MessageRole.USER,
                    content=self.user_request,
                )
            ]
        )
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "retrieve",
                    "state": "inprogress",
                },
            )
        )
        retriever = self.index.as_retriever(
            similarity_top_k=int(os.getenv("TOP_K", 10)),
        )
        nodes = retriever.retrieve(self.user_request)
        self.context_nodes.extend(nodes)  # type: ignore
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "retrieve",
                    "state": "done",
                },
            )
        )
        print("Retrieval done")
        # Send source nodes to the stream
        # Use SourceNodesEvent to display source nodes in the UI.
        return PlanResearchEvent()

    @step
    async def analyze(
        self, ctx: Context, ev: PlanResearchEvent
    ) -> ResearchEvent | ReportEvent | StopEvent:
        """
        Analyze the retrieved information
        """
        print("Analyzing the retrieved information")
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "analyze",
                    "state": "inprogress",
                },
            )
        )
        total_questions = await ctx.get("total_questions")
        res = await plan_research(
            memory=self.memory,
            context_nodes=self.context_nodes,
            user_request=self.user_request,
            total_questions=total_questions,
        )
        if res.decision == "cancel":
            ctx.write_event_to_stream(
                DataEvent(
                    type="deep_research_event",
                    data={
                        "event": "analyze",
                        "state": "done",
                    },
                )
            )
            return StopEvent(
                result=res.cancel_reason,
            )
        elif res.decision == "write":
            # Writing a report without any research context is not allowed.
            # It's a LLM hallucination.
            if total_questions == 0:
                ctx.write_event_to_stream(
                    DataEvent(
                        type="deep_research_event",
                        data={
                            "event": "analyze",
                            "state": "done",
                        },
                    )
                )
                return StopEvent(
                    result="Sorry, I have a problem when analyzing the retrieved information. Please try again.",
                )

            self.memory.put(
                message=ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="No more idea to analyze. We should report the answers.",
                )
            )
            ctx.send_event(ReportEvent())
        else:
            total_questions += len(res.research_questions)
            await ctx.set("total_questions", total_questions)  # For tracking
            await ctx.set(
                "waiting_questions", len(res.research_questions)
            )  # For waiting questions to be answered
            self.memory.put(
                message=ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content="We need to find answers to the following questions:\n"
                    + "\n".join(res.research_questions),
                )
            )
            for question in res.research_questions:
                question_id = str(uuid.uuid4())
                ctx.write_event_to_stream(
                    DataEvent(
                        type="deep_research_event",
                        data={
                            "event": "answer",
                            "state": "pending",
                            "id": question_id,
                            "question": question,
                            "answer": None,
                        },
                    )
                )
                ctx.send_event(
                    ResearchEvent(
                        question_id=question_id,
                        question=question,
                        context_nodes=self.context_nodes,
                    )
                )
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "analyze",
                    "state": "done",
                },
            )
        )
        
        return None

    @step(num_workers=2)
    async def answer(self, ctx: Context, ev: ResearchEvent) -> CollectAnswersEvent:
        """
        Answer the question
        """
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "answer",
                    "state": "inprogress",
                    "id": ev.question_id,
                    "question": ev.question,
                },
            )
        )
        try:
            answer = await research(
                context_nodes=ev.context_nodes,
                question=ev.question,
            )
        except Exception as e:
            print(f"Error answering question {ev.question}: {e}")
            answer = f"Got error when answering the question: {ev.question}"
        ctx.write_event_to_stream(
            DataEvent(
                type="deep_research_event",
                data={
                    "event": "answer",
                    "state": "done",
                    "id": ev.question_id,
                    "question": ev.question,
                    "answer": answer,
                },
            )
        )

        return CollectAnswersEvent(
            question_id=ev.question_id,
            question=ev.question,
            answer=answer,
        )

    @step
    async def collect_answers(
        self, ctx: Context, ev: CollectAnswersEvent
    ) -> PlanResearchEvent:
        """
        Collect answers to all questions
        """
        num_questions = await ctx.get("waiting_questions")
        results = ctx.collect_events(
            ev,
            expected=[CollectAnswersEvent] * num_questions,
        )
        if results is None:
            return None
        for result in results:
            self.memory.put(
                message=ChatMessage(
                    role=MessageRole.ASSISTANT,
                    content=f"<Question>{result.question}</Question>\n<Answer>{result.answer}</Answer>",
                )
            )
        await ctx.set("waiting_questions", 0)
        self.memory.put(
            message=ChatMessage(
                role=MessageRole.ASSISTANT,
                content="Researched all the questions. Now, i need to analyze if it's ready to write a report or need to research more.",
            )
        )
        return PlanResearchEvent()

    @step
    async def report(self, ctx: Context, ev: ReportEvent) -> StopEvent:
        """
        Report the answers
        """
        res = await write_report(
            memory=self.memory,
            user_request=self.user_request,
            stream=self.stream,
        )
        return StopEvent(
            result=res,
        )


class AnalysisDecision(BaseModel):
    decision: Literal["research", "write", "cancel"] = Field(
        description="Whether to continue research, write a report, or cancel the research after several retries"
    )
    research_questions: Optional[List[str]] = Field(
        description="""
        If the decision is to research, provide a list of questions to research that related to the user request.
        Maximum 3 questions. Set to null or empty if writing a report or cancel the research.
        """,
        default_factory=list,
    )
    cancel_reason: Optional[str] = Field(
        description="The reason for cancellation if the decision is to cancel research.",
        default=None,
    )


async def plan_research(
    memory: SimpleComposableMemory,
    context_nodes: List[Node],
    user_request: str,
    total_questions: int,
) -> AnalysisDecision:
    analyze_prompt = """
      You are a professor who is guiding a researcher to research a specific request/problem.
      Your task is to decide on a research plan for the researcher.

      The possible actions are:
      + Provide a list of questions for the researcher to investigate, with the purpose of clarifying the request.
      + Write a report if the researcher has already gathered enough research on the topic and can resolve the initial request.
      + Cancel the research if most of the answers from researchers indicate there is insufficient information to research the request. Do not attempt more than 3 research iterations or too many questions.

      The workflow should be:
      + Always begin by providing some initial questions for the researcher to investigate.
      + Analyze the provided answers against the initial topic/request. If the answers are insufficient to resolve the initial request, provide additional questions for the researcher to investigate.
      + If the answers are sufficient to resolve the initial request, instruct the researcher to write a report.

      Here are the context: 
      <Collected information>
      {context_str}
      </Collected information>

      <Conversation context>
      {conversation_context}
      </Conversation context>

      {enhanced_prompt}

      Now, provide your decision in the required format for this user request:
      <User request>
      {user_request}
      </User request>
      """
    # Manually craft the prompt to avoid LLM hallucination
    enhanced_prompt = ""
    if total_questions == 0:
        # Avoid writing a report without any research context
        enhanced_prompt = """
        
        The student has no questions to research. Let start by asking some questions.
        """
    elif total_questions > 6:
        # Avoid asking too many questions (when the data is not ready for writing a report)
        enhanced_prompt = f"""

        The student has researched {total_questions} questions. Should cancel the research if the context is not enough to write a report.
        """

    conversation_context = "\n".join(
        [f"{message.role}: {message.content}" for message in memory.get_all()]
    )
    context_str = "\n".join(
        [node.get_content(metadata_mode=MetadataMode.LLM) for node in context_nodes]
    )
    res = await Settings.llm.astructured_predict(
        output_cls=AnalysisDecision,
        prompt=PromptTemplate(template=analyze_prompt),
        user_request=user_request,
        context_str=context_str,
        conversation_context=conversation_context,
        enhanced_prompt=enhanced_prompt,
    )
    return res


async def research(
    question: str,
    context_nodes: List[NodeWithScore],
) -> str:
    prompt = """
    You are a researcher who is in the process of answering the question.
    The purpose is to answer the question based on the collected information, without using prior knowledge or making up any new information.
    Always add citations to the sentence/point/paragraph using the id of the provided content.
    The citation should follow this format: [citation:id] where id is the id of the content.
    
    E.g:
    If we have a context like this:
    <Citation id='abc-xyz'>
    Baby llama is called cria
    </Citation id='abc-xyz'>

    And your answer uses the content, then the citation should be:
    - Baby llama is called cria [citation:abc-xyz]

    Here is the provided context for the question:
    <Collected information>
    {context_str}
    </Collected information>`

    No prior knowledge, just use the provided context to answer the question: {question}
    """
    context_str = "\n".join(
        [_get_text_node_content_for_citation(node) for node in context_nodes]
    )
    res = await Settings.llm.acomplete(
        prompt=prompt.format(question=question, context_str=context_str),
    )
    return res.text


async def write_report(
    memory: SimpleComposableMemory,
    user_request: str,
    stream: bool = False,
) -> CompletionResponse | CompletionResponseAsyncGen:
    report_prompt = """
    You are a researcher writing a report based on a user request and the research context.
    You have researched various perspectives related to the user request.
    The report should provide a comprehensive outline covering all important points from the researched perspectives.
    Create a well-structured outline for the research report that covers all the answers.

    # IMPORTANT when writing in markdown format:
    + Use tables or figures where appropriate to enhance presentation.
    + Preserve all citation syntax (the `[citation:id]()` parts in the provided context). Keep these citations in the final report - no separate reference section is needed.
    + Do not add links, a table of contents, or a references section to the report.

    <User request>
    {user_request}
    </User request>

    <Research context>
    {research_context}
    </Research context>

    Now, write a report addressing the user request based on the research provided following the format and guidelines above.
    """
    research_context = "\n".join(
        [f"{message.role}: {message.content}" for message in memory.get_all()]
    )

    llm_complete_func = (
        Settings.llm.astream_complete if stream else Settings.llm.acomplete
    )

    res = await llm_complete_func(
        prompt=report_prompt.format(
            user_request=user_request,
            research_context=research_context,
        ),
    )
    return res


def _get_text_node_content_for_citation(node: NodeWithScore) -> str:
    """
    Construct node content for LLM with citation flag.
    """
    node_id = node.node.node_id
    content = f"<Citation id='{node_id}'>\n{node.get_content(metadata_mode=MetadataMode.LLM)}</Citation id='{node_id}'>"
    return content

In [125]:
# Get Postgres credentials from connection string
pg_url = urlparse(os.getenv("PG_CONN_STR"))
host = pg_url.hostname
port = pg_url.port
database = pg_url.path[1:]
user = pg_url.username
password = pg_url.password

# Vector store to store chunks + embeddings in

vector_store = PGVectorStore.from_params(
    host=host,
    port=port,
    database=database,
    user=user,
    password=password,
    table_name="llamaindex_docs",
    schema_name="public",
    hybrid_search=True,
    embed_dim=768,
    hnsw_kwargs={
        "hnsw_m": 16,
        "hnsw_ef_construction": 64,
        "hnsw_ef_search": 40,
        "hnsw_dist_method": "vector_cosine_ops",
    },
)

index = VectorStoreIndex.from_vector_store(
    vector_store=vector_store,
    embed_model=HuggingFaceEmbedding(
        model_name="sentence-transformers/all-mpnet-base-v2"
    ),
)

# text_retriever = index.as_retriever(
#     vector_store_query_mode="sparse", similarity_top_k=10
# )


w = DeepResearchWorkflow(index=index, timeout=120)

handler = w.run(user_msg="Can you summarize the key findings of “USING BIOLOGICAL PARAMETERS AND TRADITIONAL ECOLOGICAL KNOWLEDGE TO STUDY THE STATUS OF Spirinchus starksi (NIGHT SMELT) IN NORTHERN CALIFORNIA” by Z Zenobia?")
async for ev in w.stream_events():
    print(ev) 

await handler

Batches: 100%|██████████| 1/1 [00:00<00:00, 55.70it/s]


Retrieval done
type='deep_research_event' data=DeepResearchEventData(event='retrieve', state='inprogress', id=None, question=None, answer=None)
type='deep_research_event' data=DeepResearchEventData(event='retrieve', state='done', id=None, question=None, answer=None)
Analyzing the retrieved information
type='deep_research_event' data=DeepResearchEventData(event='analyze', state='inprogress', id=None, question=None, answer=None)
type='deep_research_event' data=DeepResearchEventData(event='answer', state='pending', id='1b4a85ee-5bec-438c-a8a0-33c1be93ae3f', question='What were the key biological parameters studied in relation to Spirinchus starksi?', answer=None)
type='deep_research_event' data=DeepResearchEventData(event='answer', state='pending', id='765bfe80-7b83-44f9-a437-e8cbd044103c', question='How was Traditional Ecological Knowledge (TEK) incorporated into the study of Spirinchus starksi?', answer=None)
type='deep_research_event' data=DeepResearchEventData(event='answer', state='p

<async_generator object llm_completion_callback.<locals>.wrap.<locals>.wrapped_async_llm_predict.<locals>.wrapped_gen at 0x7fa9346c8640>

In [122]:
print(str(handler))

<async_generator object llm_completion_callback.<locals>.wrap.<locals>.wrapped_async_llm_predict.<locals>.wrapped_gen at 0x7fa971233440>
