# AI Agents Crash Course - Part 3 Implementation

This notebook demonstrates the key concepts from the Daily Dose of Data Science AI Agents article by Avi Chawla and Akshay Pachaar.

## What you'll learn:
1. **CrewAI Flows**, a powerful feature that enables the creation of structured, event-driven workflows for AI agents. This article focuses on moving beyond basic agent configurations to building complex, production-ready systems that seamlessly integrate deterministic processes with AI-driven autonomy through flow control mechanisms.

## Prerequisites:
- Install required packages
- Set up API keys
- Choose your LLM provider

## 📦 Installation and Setup

First, let's install the required packages and set up our environment.

In [1]:
# Install required packages (run this cell first)
#!pip install crewai crewai-tools python-dotenv pyyaml IPython

In [None]:
import os
import yaml
from dotenv import load_dotenv
from IPython.display import Markdown, display
import warnings
warnings.filterwarnings('ignore')

# CrewAI imports
from crewai import Agent, Task, Crew, Process, LLM
from crewai_tools import SerperDevTool, FileReadTool

print("✅ All packages imported successfully!")

## 🔑 Environment Configuration

Set up your environment variables. You have two options:

### Option 1: Create a `.env` file with:
```
SERPER_API_KEY="your-serper-api-key"
AZURE_OPENAI_API_KEY="your-azure-openai-key"
AZURE_OPENAI_ENDPOINT="your-azure-openai-endpoint"
AZURE_OPENAI_API_VERSION="your-azure-openai-api-version"
AZURE_OPENAI_MODEL_NAME="your-azure-openai-model-name"
```

### Option 2: Set them directly in this notebook (not recommended for production):

In [None]:
import os

# Load environment variables
load_dotenv()

# Option 2: Uncomment and set your API keys directly (not recommended for production)
# os.environ['OPENAI_API_KEY'] = 'your-openai-api-key-here'
# os.environ['SERPER_API_KEY'] = 'your-serper-api-key-here'

# Configure LLM - Choose one of the options below:

# Option A: Local Ollama (as mentioned in the article)
#llm = LLM(
#    model="ollama/llama3.2:1b",
#    base_url="http://localhost:11434"
#)

# Option B: OpenAI GPT-4 (uncomment to use)
# llm = LLM(model="gpt-4")

# Option C: OpenAI GPT-3.5-turbo (cheaper alternative)
# llm = LLM(model="gpt-3.5-turbo")

# Option D: Azure OpenAI
openai_api_key = os.getenv("AZURE_OPENAI_API_KEY")
openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
openai_api_version = os.getenv("AZURE_OPENAI_API_VERSION")
openai_model_name = os.getenv("AZURE_OPENAI_MODEL_NAME")

llm = LLM(
    model="azure/gpt-4o-mini",
    api_key=openai_api_key,
    base_url=openai_endpoint,
    api_version=openai_api_version,
    azure=True
)

print("🚀 Environment configured!")
print(f"LLM Model: {llm.model}")

## 🤖 Example 1: Basic Flow

In this walkthrough, we’ll explore how CrewAI Flows allow you to effortlessly manage sequences of tasks powered by AI.

To keep things practical, we'll build a simple yet interesting scenario: first generating a random movie genre, then using that genre to suggest a popular movie recommendation.

In [4]:
import openai
from crewai.flow.flow import Flow, start, listen

openai_client = openai.AzureOpenAI(
    api_key=openai_api_key,
    api_version=openai_api_version,
    azure_endpoint=openai_endpoint
)

class MovieRecommendationFlow(Flow):

    @start()
    def generate_genre(self):
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "user",
                    "content": "Give me a random movie genre.",
                },
            ],
        )

        random_genre = response.choices[0].message.content.strip()
        self.state["genre"] = random_genre
        return random_genre
    

    @listen(generate_genre)
    def recommend_movie(self, random_genre):
    
        response = openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "user",
                    "content": f"Recommend a movie in {random_genre} genre.",
                },
            ],
        )

        movie_recommendation = response.choices[0].message.content.strip()
        self.state["recommendation"] = movie_recommendation
        return movie_recommendation

In [None]:
flow = MovieRecommendationFlow()
final_result = await flow.kickoff_async()

print(f"\nMovie Recommendation: {final_result}")


In [None]:
from pprint import pprint
pprint(flow.state)

While this is a basic demo that could have been done with a simple LLM call, the example was used to explain that this particular design pattern is incredibly powerful for several reasons:

Flows automate AI-driven sequential task execution, removing manual intervention.
Tasks can easily share data, ensuring consistent context management.
You can effortlessly add more tasks and dependencies.

## Example 2: States in Flows
Managing the state effectively is essential for building robust and maintainable AI workflows. CrewAI Flows provides two approaches for handling state:

Unstructured state management, which is more flexible and dynamic.
Structured state management, which enforces schema validation using a model-based approach.
In this guide, we'll go beyond just storing intermediate values and explore how to modify, update, and manage the state effectively throughout a flow.

Instead of generating movie recommendations, this time we’ll build a task management system where an AI agent:

Generates a task for a software engineer.
Updates the task’s status to "In Progress."
Marks the task as "Completed."


### Unstructured state
In unstructured state management, all state attributes are stored dynamically in self.state, similar to how a Python dictionary works. This allows the flow to store and modify state values on the fly, without requiring a predefined schema

Here’s how we can implement a task management system using an unstructured state:

In [7]:
from crewai.flow.flow import Flow, listen, start

class TaskManagementFlow(Flow):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state['id']}")
        
        # Step 1: Generate a new task
        self.state["task"] = "Fix a critical bug in the payment system"
        self.state["status"] = "Pending"
        print(f"Task generated: {self.state['task']} (Status: {self.state['status']})")

    @listen(generate_task)
    def start_task(self):
        # Step 2: Update task status to 'In Progress'
        self.state["status"] = "In Progress"
        print(f"Task status updated: {self.state['status']}")

    @listen(start_task)
    def complete_task(self):
        # Step 3: Mark task as 'Completed'
        self.state["status"] = "Completed"
        print(f"Task status updated: {self.state['status']}")
        print(f"Final Task State: {self.state}")

In [None]:
# Execute the flow
flow = TaskManagementFlow()
final_result = await flow.kickoff_async()

### Structured state
While unstructured state management is flexible, structured state management enforces a predefined schema, ensuring that all state values conform to a specific format. This prevents accidental errors and makes debugging easier.

Like we learned in Part 2 at the time of structured outputs, we’ll use Pydantic to define a strict schema for the task stat

In [10]:
from pydantic import BaseModel

# Defining a structured state model
class TaskState(BaseModel):
    task: str = "None"
    status: str = "None"

In [11]:
from crewai.flow.flow import Flow, listen, start

class StructuredTaskFlow(Flow[TaskState]):

    @start()
    def generate_task(self):
        print(f"Flow started. State ID: {self.state.id}")
        self.state.task = "Develop a new API endpoint"
        self.state.status = "Pending"
        print(f"Task generated: {self.state.task} (Status: {self.state.status})")

    @listen(generate_task)
    def start_task(self):
        self.state.status = "In Progress"
        print(f"Task status updated: {self.state.status}")

    @listen(start_task)
    def complete_task(self):
        self.state.status = "Completed"
        print(f"Task status updated: {self.state.status}")
        print(f"Final Task State: {self.state}")

In [None]:
# Execute the flow
flow = StructuredTaskFlow()
final_result = await flow.kickoff_async()

In [None]:
from pprint import pprint
pprint(flow.state)

## Conditional Flow Control
Managing the flow of tasks dynamically is a crucial part of AI-powered workflows. CrewAI Flows provides powerful tools to control execution paths based on conditions.

Below, we shall cover three important mechanisms for flow control:

- **or_ function**: Triggers a task when at least one of multiple conditions is met.
- **and_ function**: Triggers a task only when all specified conditions are met.
- **@router() decorator**: Directs the flow dynamically based on decision logic.

### OR conditional logic
The or_ function allows a method to be executed when any one of multiple tasks is completed. This is useful when a task should proceed regardless of which event happens first.

Let’s build a system where a support request can come from two sources:

Live chat
Email ticket
If a request is received from either channel, it should be logged.

In [15]:
from crewai.flow.flow import Flow, listen, or_, start

class SupportFlow(Flow):

    @start()
    def live_chat_request(self):
        return "Support request received via live chat"

    @start()
    def email_ticket_request(self):
        return "Support request received via email ticket"

    @listen(or_(live_chat_request, email_ticket_request))
    def log_request(self, request_source):
        print(f"Logging request: {request_source}")

In [None]:
# Execute the flow
flow = SupportFlow()
final_result = await flow.kickoff_async()

## AND conditional logic
The and_ function ensures that a task only executes when multiple conditions are satisfied. For instance, we can escalate a support ticket only after user response and agent review.

In this example, a ticket should only be escalated if:

The user confirms they still need help.
A support agent has reviewed the request.

In [17]:
from crewai.flow.flow import Flow, and_, listen, start

class TicketEscalationFlow(Flow):

    @start()
    def user_confirms_issue(self):
        self.state["user_confirmation"] = True
        print("User confirmed they still need assistance.")

    @listen(user_confirms_issue)
    def agent_reviews_ticket(self):
        self.state["agent_review"] = True
        print("Support agent has reviewed the ticket.")

    @listen(and_(user_confirms_issue, agent_reviews_ticket))
    def escalate_ticket(self):
        print("Escalating ticket to Level 2 support!")

In [None]:
# Execute the flow
flow = TicketEscalationFlow()
final_result = await flow.kickoff_async()

### Router logic
The @router() decorator allows a method to decide the next task based on a condition. This is useful when different cases require different handling strategies. For instance, some logic (which could be AI-driven) can determine if a support request is urgent or non-urgent:

Urgent tickets should be assigned to a live support agent.
Non-urgent tickets should be sent to email support.
The key concepts required in this workflow will be:

Pydantic for structured state management
Dynamic ticket classification.
Routing execution flow using @router()
Listening to specific values using @listen()

In [27]:
from pydantic import BaseModel

class TicketState(BaseModel):
    priority: str = "low"


In [30]:
from crewai.flow.flow import Flow, listen, router, start
import random

class TicketRoutingFlow(Flow[TicketState]):
    @start()
    def classify_ticket(self):
        print("Classifying support ticket...")
        self.state.priority = random.choice(["high", "low"])
        print(f"Ticket classified as: {self.state.priority}")

    @router(classify_ticket)
    def route_ticket(self):
        if self.state.priority == "high":
            return "urgent_support"
        else:
            return "email_support"
        
    @listen("urgent_support")
    def assign_to_agent(self):
        print("Urgent ticket assigned to a live support agent!")

    @listen("email_support")
    def send_to_email_queue(self):
        print("Non-urgent ticket sent to email support queue.")

In [None]:
# Execute the flow
flow = TicketRoutingFlow()
final_result = await flow.kickoff_async()