In [1]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

In [15]:
from crewai.flow.flow import Flow, listen, start
# from fastapi.testclient import TestClient
# from service.service import app

class OutputExampleFlow(Flow):
    @start()
    def first_method(self):
        message = "Output from first_method"
        print(message)
        return message

    @listen(first_method)
    def second_method(self, first_output):
        # api_client = TestClient(app)
        message = f"Second method received: {first_output}"
        print(message)
        return message


flow = OutputExampleFlow()
final_output = flow.kickoff()
final_output.send()
print("---- Final Output ----")
print(final_output)


TypeError: coroutine.send() takes exactly one argument (0 given)

In [19]:
from IPython.display import Markdown
Markdown(final_output)

TypeError: Markdown expects text, not <coroutine object Flow.kickoff at 0x000002909BD39210>

In [17]:
from crewai.flow.flow import Flow, listen, start

class UnstructuredExampleFlow(Flow):

    @start()
    def first_method(self):
        # The state automatically includes an 'id' field
        print(f"State ID: {self.state['id']}")
        self.state.message = "Hello from structured flow"
        self.state.counter = 0

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = UnstructuredExampleFlow()
flow.kickoff()


<coroutine object Flow.kickoff at 0x000002909F5F0D60>

In [None]:
# ruff: noqa
# pyright: reportUnusedVariable=false
# pyright: reportUnusedImport=false
import warnings

warnings.filterwarnings("ignore")
import json
from crewai.flow.flow import Flow, listen, start  # noqa: pyright
from pydantic import BaseModel  # noqa: pyright


class RecipeState(BaseModel):
    input: str = ""
    recipe_data: str = ""
    file_name: str = ""


class RecipeFlow(Flow[RecipeState]):
    @start()
    def extraction(self):
        extraction_crew_output = Extraction_Crew.kickoff({"input": self.state.input})
        # print("pydantic", extraction_crew_output.pydantic)
        # store the file name
        self.state.file_name = extraction_crew_output.pydantic.file_name
        return extraction_crew_output.pydantic

    @listen(extraction)
    def create_recipe(self, extraction_output):
        # print(f"Extraction in side create {extraction_output}")
        dict_input = extraction_output.dict()
        chef_crew_output = Chef_Crew.kickoff(dict_input)
        return chef_crew_output.pydantic

    @listen(create_recipe)
    def write_recipe(self, create_recipe_out):
        # print(f"Write recipe: {create_recipe_out}")
        to_write = create_recipe_out.dict()
        to_write["file_path"] = self.state.file_name
        print(f"Assembled to_write: {to_write}")
        writer_crew_output = Writer_Crew.kickoff(to_write)
        return writer_crew_output


makerecipe = RecipeFlow()
flow_output = makerecipe.kickoff(
    {
        "input": "Provide the Recipe for rice dumplings to serve 15 people, and write to ./chef_recipe.md"
    }
)
print(flow_output)