### FLow Persistance in CrewAI Flow

#### The @persist decorator enables automatic state persistence in CrewAI Flows, allowing you to maintain flow state across restarts or different workflow executions.`


In [1]:
import nest_asyncio
nest_asyncio.apply()

#### 1.Class Level Persistance

In [2]:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from dotenv import load_dotenv
from pydantic import BaseModel

class CounterState(BaseModel):
    counter: int = 0

@persist() # Using SQLiteFlowPersistence by default
class MyFlow(Flow[CounterState]):
    @start()
    def initialize_flow(self):
        # This method will automatically have its state persisted
        self.state.counter = 1
        print("Initialized flow. State ID:", self.state.id)

    @listen(initialize_flow)
    def next_step(self):
        # The state (including self.state.id) is automatically reloaded
        self.state.counter += 1
        print("Flow state is persisted. Counter:", self.state.counter)

In [4]:
flow = MyFlow()
result = flow.kickoff()
print(f"Flow completed with final state: {result}")

Flow completed with final state: None


In [14]:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist
from pydantic import BaseModel

class CounterState(BaseModel):
    value: int = 0

@persist()  # Apply to the entire flow class
class PersistentCounterFlow(Flow[CounterState]):
    @start()
    def increment(self):
        self.state.value += 1
        print(f"Incremented to {self.state.value}")
        return self.state.value

    @listen(increment)
    def double(self, value):
        self.state.value = value * 2
        print(f"Doubled to {self.state.value}")
        return self.state.value

# First run
flow1 = PersistentCounterFlow()
result1 = flow1.kickoff()
print(f"First run result: {result1}")

# Second run - state is automatically loaded
flow2 = PersistentCounterFlow()
result2 = flow2.kickoff()
print(f"Second run result: {result2}")  # Will be higher due to persisted state

[2K[1A[2K[1A[2K[1A[2K[1;34m🌊 Flow: [0m[34mMyFlow[0m
[37mID: [0m[34m32bd1c22-910a-4832-9e2d-e40a8a9ca6cf[0m
├── [31m❌ Flow Step Failed[0m
└── [1;31m❌ Failed:[0m[1;31m initialize_flow[0m
[?25h[34m┌─[0m[34m─────────────────────────────[0m[34m Flow Execution [0m[34m──────────────────────────────[0m[34m─┐[0m
[34m│[0m                                                                             [34m│[0m
[34m│[0m  [1;34mStarting Flow Execution[0m                                                    [34m│[0m
[34m│[0m  [37mName: [0m[34mPersistentCounterFlow[0m                                                [34m│[0m
[34m│[0m  [37mID: [0m[34m3e07bb5f-6e12-4cb7-a21c-dff12bd73f78[0m                                   [34m│[0m
[34m│[0m  [37mTool Args: [0m                                                                [34m│[0m
[34m│[0m                                                                             [34m│[0m
[34m│[0m         

### 
Method-Level Persistence

In [7]:
from crewai.flow.flow import Flow, listen, start
from crewai.flow.persistence import persist

class SelectivePersistFlow(Flow):
    @start()
    def first_step(self):
        self.state["count"] = 1
        return "First step"

    @persist()  # Only persist after this method
    @listen(first_step)
    def important_step(self, prev_result):
        self.state["count"] += 1
        self.state["important_data"] = "This will be persisted"
        return "Important step completed"

    @listen(important_step)
    def final_step(self, prev_result):
        self.state["count"] += 1
        return f"Complete with count {self.state['count']}"

In [8]:
flow= SelectivePersistFlow()
result = flow.kickoff()
print(f"Flow completed with result: {result}")

Flow completed with result: Complete with count 3


In [None]:
### Example of using the flow with CrewAI: Condition Based
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel

class PaymentState(BaseModel):
    amount: float = 0.0
    is_approved: bool = False
    retry_count: int = 0

class PaymentFlow(Flow[PaymentState]):
    @start()
    def process_payment(self):
        # Simulate payment processing
        # self.state.amount = 100.0
        self.state.is_approved = self.state.amount < 1000
        return "Payment processed"

    @router(process_payment)
    def check_approval(self, previous_result):
        if self.state.is_approved:
            return "approved"
        elif self.state.retry_count < 3:
            return "retry"
        else:
            return "rejected"

    @listen("approved")
    def handle_approval(self):
        return f"Payment of ${self.state.amount} approved!"

    @listen("retry")
    def handle_retry(self):
        self.state.retry_count += 1
        print(f"Retrying payment (attempt {self.state.retry_count})...")
        # Could implement retry logic here
        return "Retry initiated"

    @listen("rejected")
    def handle_rejection(self):
        return f"Payment of ${self.state.amount} rejected after {self.state.retry_count} retries."

In [None]:
flow = PaymentFlow(state=PaymentState(amount=20000.0))
result = flow.kickoff()
print(f"{result}")

Output()