In [16]:
import time
import uuid

from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Command

from typing import Any


In [10]:
@task
def write_essay(topic: str) -> str:
    """Write an essay about the given topic."""
    time.sleep(1) # This is a placeholder for a long-running task.
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=MemorySaver())
def workflow(topic: str) -> dict:
    """A simple workflow that writes an essay and asks for a review."""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        "essay": essay, 
        "action": "Please approve/reject the essay",
    })

    return {
        "essay": essay, # The essay that was generated
        "is_approved": is_approved, # Response from HIL
    }

thread_id = str(uuid.uuid4())

config = {
    "configurable": {
        "thread_id": thread_id
    }
}

for item in workflow.stream("cat", config):
    print(item)

# Get review from a user (e.g., via a UI)
# In this case, we're using a bool, but this can be any json-serializable value.
human_review = True

hh = Command(resume=human_review)
for item in workflow.stream(Command(resume=human_review), config):
    print(item)

{'write_essay': 'An essay about topic: cat'}
{'__interrupt__': (Interrupt(value={'essay': 'An essay about topic: cat', 'action': 'Please approve/reject the essay'}, resumable=True, ns=['workflow:0855fb8b-7fe5-eb8e-f570-2edc1bb0d804'], when='during'),)}
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': True}}


In [18]:
@entrypoint(checkpointer=MemorySaver())
def my_workflow(number: int, *, previous: Any = None) -> int:
    previous = previous or 0
    return number + previous

config = {
    "configurable": {
        "thread_id": "some_thread_id"
    }
}

print (my_workflow.invoke(1, config))  # 1 (previous was None)
my_workflow.invoke(2, config)

1


3

In [1]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context
)
from llama_index.llms.gemini import Gemini

llm = Gemini(
    model="models/gemini-1.5-flash",
    # api_key="some key",  # uses GOOGLE_API_KEY env var by default
)

class JokeEvent(Event):
    joke: str


joke_flow = Workflow(timeout=60, verbose=True)


@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
    topic = ev.topic

    prompt = f"Write your best joke about {topic}."

    response = await llm.acomplete(prompt)
    print ('\n * one * \n')
    print (response)
    return JokeEvent(joke=str(response))


@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
    joke = ev.joke

    prompt = (
        f"Give a thorough analysis and critique of the following joke: {joke}"
    )
    response = await llm.acomplete(prompt)

    return StopEvent(result=str(response))

In [3]:
w = joke_flow

handler = w.run(topic="Pirates")
result = await handler
#print ('\n * two * \n')
#print (result)

print (handler.ctx)
# continue with next run
handler = w.run(ctx=handler.ctx, topic="Pirates")
result = await handler
#print ('\n * thee * \n')
#print (result)


Running step generate_joke

 * one * 

Why do pirates make terrible dancers?  Because they always have one leg!

Step generate_joke produced event JokeEvent
Running step critique_joke
Step critique_joke produced event StopEvent
<llama_index.core.workflow.context.Context object at 0x11a041d00>
Running step generate_joke

 * one * 

Why do pirates make terrible dancers?  Because they always have one leg!

Step generate_joke produced event JokeEvent
Running step critique_joke
Step critique_joke produced event StopEvent


In [19]:
from llama_index.core.workflow import WorkflowCheckpointer

w = joke_flow
w_cptr = WorkflowCheckpointer(workflow=w)

# to checkpoint a run, use the `run` method from w_cptr
handler = w_cptr.run(topic="Pirates")
await handler

print ('here')
# to view the stored checkpoints of this run
for c in w_cptr.checkpoints[handler.run_id]:
    print ('yo')
    print (c)

# to run from one of the checkpoints, use `run_from` method
ckpt = w_cptr.checkpoints[handler.run_id][0]
handler = w_cptr.run_from(topic="Ships", checkpoint=ckpt)
await handler

Running step generate_joke

 * one * 

Why do pirates make terrible dancers?  Because they always have one leg!

Step generate_joke produced event JokeEvent
Running step critique_joke
Step critique_joke produced event StopEvent
here
yo
id_='f3cdb4ad-4c38-4262-b291-2c85af5075c1' last_completed_step='generate_joke' input_event=StartEvent() output_event=JokeEvent(joke='Why do pirates make terrible dancers?  Because they always have one leg!\n') ctx_state={'globals': {}, 'streaming_queue': '[]', 'queues': {'aa5644d7-5345-46c5-bf3e-b320828cbe8e': '["{\\"__is_pydantic\\": true, \\"value\\": {\\"_data\\": {\\"topic\\": \\"Pirates\\"}}, \\"qualified_name\\": \\"llama_index.core.workflow.events.StartEvent\\"}"]', '_done': '[]', 'generate_joke': '[]', 'critique_joke': '[]'}, 'stepwise': False, 'events_buffer': {}, 'in_progress': {'generate_joke': []}, 'accepted_events': [('generate_joke', 'StartEvent'), ('critique_joke', 'JokeEvent')], 'broker_log': ['{"__is_pydantic": true, "value": {"_data": {

'This joke relies on a simple pun, exploiting the double meaning of "one leg."  Let\'s analyze its strengths and weaknesses:\n\n**Strengths:**\n\n* **Simplicity and Immediate Understanding:** The setup is clear and concise.  Most people understand the stereotypical image of a pirate with a missing leg. The punchline is instantly grasped.  This makes it accessible to a wide audience.\n* **Surprise and Unexpectedness:** While the setup hints at a physical limitation, the punchline plays on the literal interpretation of "one leg" in relation to dancing, creating a small but satisfying surprise.  The unexpectedness is key to the humor.\n* **Reliance on a Common Stereotype:** The pirate with a missing leg is a well-established trope in popular culture. This pre-existing knowledge makes the joke\'s premise instantly relatable and understandable, requiring minimal explanation.\n\n\n**Weaknesses:**\n\n* **Over-reliance on a Single Pun:** The entire joke hinges on a single pun.  This limits its

In [2]:
class InputEvent(Event):
    input: str


class SetupEvent(Event):
    error: bool


class QueryEvent(Event):
    query: str


class CollectExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
        # generically start everything up
        if not hasattr(self, "setup") or not self.setup:
            self.setup = True
            print("I got set up")
        return SetupEvent(error=False)

    @step
    async def collect_input(self, ev: StartEvent) -> InputEvent:
        if hasattr(ev, "input"):
            # perhaps validate the input
            print("I got some input")
            return InputEvent(input=ev.input)

    @step
    async def parse_query(self, ev: StartEvent) -> QueryEvent:
        if hasattr(ev, "query"):
            # parse the query in some way
            print("I got a query")
            return QueryEvent(query=ev.query)

    @step
    async def run_query(
        self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
    ) -> StopEvent | None:
        ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
        if ready is None:
            print("Not enough events yet")
            return None

        # run the query
        print("Now I have all the events")
        print(ready)

        result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
        return StopEvent(result=result)

In [3]:
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)

I got some input
I got a query
Not enough events yet
Not enough events yet
Now I have all the events
[QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)]
Ran query 'Here's my question' on input 'Here's some input'


In [4]:
from llama_index.core.agent.workflow import (
    AgentWorkflow,
    FunctionAgent,
    ReActAgent,
)
from llama_index.core.tools import FunctionTool

llm = Gemini(
    model="models/gemini-1.5-flash",
    # api_key="some key",  # uses GOOGLE_API_KEY env var by default
)

# Define some tools
def add(a: int, b: int) -> int:
    """Add two numbers."""
    return a + b


def subtract(a: int, b: int) -> int:
    """Subtract two numbers."""
    return a - b


# Create agent configs
# NOTE: we can use FunctionAgent or ReActAgent here.
# FunctionAgent works for LLMs with a function calling API.
# ReActAgent works for any LLM.
calculator_agent = FunctionAgent(
    name="calculator",
    description="Performs basic arithmetic operations",
    system_prompt="You are a calculator assistant.",
    tools=[
        FunctionTool.from_defaults(fn=add),
        FunctionTool.from_defaults(fn=subtract),
    ],
    llm=llm,
)

retriever_agent = FunctionAgent(
    name="retriever",
    description="Manages data retrieval",
    system_prompt="You are a retrieval assistant.",
    llm=llm,
)

# Create and run the workflow
workflow = AgentWorkflow(
    agents=[calculator_agent, retriever_agent], root_agent="calculator"
)

# Run the system
response = await workflow.run(user_msg="Can you add 5 and 3?")

#  Or stream the events
handler = workflow.run(user_msg="Can you add 5 and 3?")
async for event in handler.stream_events():
    if hasattr(event, "delta"):
        print(event.delta, end="", flush=True)

The answer is 8.


In [5]:
from llama_index.core.agent.react import ReActChatFormatter, ReActOutputParser


In [7]:
ReActOutputParser?

[0;31mInit signature:[0m [0mReActOutputParser[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m      ReAct Output parser.
[0;31mFile:[0m           ~/Documents/Github/100-days-of-grind/.venv/lib/python3.12/site-packages/llama_index/core/agent/react/output_parser.py
[0;31mType:[0m           ABCMeta
[0;31mSubclasses:[0m     