In [10]:
from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from IPython.display import display

def log(msg: str) -> None:
    """Show message in Jupyter output (works inside CrewAI flow steps)."""
    display(msg)

In [11]:
# Define your state model
class UserPreferences(BaseModel):
    theme: str = "light"
    language: str = "English"
    __private_field: str = "This is a private field"

class AppState(BaseModel):
    user_name: str = ""
    preferences: UserPreferences = UserPreferences()
    items: List[str] = []
    processed: bool = False
    completion_percentage: float = 0.0

In [None]:
# Create a flow with typed state
class StructuredStateFlow(Flow[AppState]):
    @start()
    def initialize_data(self):
        print("Initializing flow data")
        # Set state values (type-checked)
        self.state.user_name = "Ali Khan"
        self.state.preferences.theme = "White"
        self.state.preferences.__private_field = " private field written again "

        # The ID field is automatically available
        # print(f"Flow ID: {self.state.id}")
        log(f"Flow ID: {self.state.id}")
        return "Initialized data done"

    @listen(initialize_data)
    def process_data(self, previous_result):
        print(f"Processing data for {self.state.user_name}")

        # Modify state (with type checking)
        self.state.items.append("item1")
        self.state.items.append("item2")
        self.state.processed = True
        self.state.completion_percentage = 50.0
        log(f"Previous result: {previous_result}")
        return "Processed data done"

    @listen(process_data)
    def generate_summary(self, previous_result):
        # Access state (with autocompletion)
        summary = f"User {self.state.user_name} has {len(self.state.items)} items "
        summary += f"with {self.state.preferences.theme} theme. "
        summary += f"Private field: {self.state.preferences.__private_field}"
        summary += "Data is processed." if self.state.processed else "Data is not processed."
        summary += f" Completion: {self.state.completion_percentage}%"
        log(f"Previous result: {previous_result}")
        return summary



In [13]:
# Run the flow
flow = StructuredStateFlow()
result = await flow.kickoff_async()
print(f"Final result: {result}")
print(f"Final state: {flow.state}")

Flow started with ID: 35b12f18-5bac-4415-b851-71bbc76855e4


[1m[35mFlow started with ID: 35b12f18-5bac-4415-b851-71bbc76855e4[0m


Initializing flow data
Flow ID: 35b12f18-5bac-4415-b851-71bbc76855e4


'Flow ID: 35b12f18-5bac-4415-b851-71bbc76855e4'

'Previous result: Initialized data done'

Output()

'Previous result: Processed data done'

Final result: User Ali Khan has 2 items with White theme. Private field: private field written againData is processed. Completion: 50.0%
Final state: id='35b12f18-5bac-4415-b851-71bbc76855e4' user_name='Ali Khan' preferences=UserPreferences(theme='White', language='English') items=['item1', 'item2'] processed=True completion_percentage=50.0
