# Multi-AI Agent Systems and Advanced Use Cases with CrewAI

## Complete Practical Tutorial

Welcome to this comprehensive practical course on Multi AI Agents and Advanced Use Cases with CrewAI. This is a very practical course, meaning you'll learn how to build agentic systems that aren't just shiny technology, but can be deployed and create real value in application settings right now.

Multi AI Agent systems involve multiple AI agents working together to achieve complex tasks by collaborating, delegating, and sharing information. You'll build several practical applications like automated project planning, lead scoring and engagement automation, support data analysis, and content creation at scale.

## Initial Setup and Environment Configuration

Before we dive into individual sections, let's set up the common environment and imports that we'll use throughout the tutorial.

In [None]:
%pip install crewai crewai-tools pydantic pyyaml pandas crewai[tools] jinja2 chromadb

In [None]:
# Warning control - suppress unnecessary warnings for cleaner output
import warnings
warnings.filterwarnings('ignore')

# Core Python libraries for file operations and data handling
import os
import json
import yaml
import pandas as pd

# Core CrewAI classes - the foundation of our multi-agent systems
from crewai import Agent, Task, Crew

# For structured outputs and data validation
from pydantic import BaseModel, Field
from typing import List, Dict, Optional

# For display and visualization
from IPython.display import display, Markdown, HTML

### Common Helper Functions

In [None]:
def load_yaml_config(file_paths):
    """
    Helper function to load YAML configuration files
    Used across multiple lessons for agents and tasks configuration
    """
    configs = {}
    for config_type, file_path in file_paths.items():
        with open(file_path, 'r') as file:
            configs[config_type] = yaml.safe_load(file)
    return configs

def calculate_costs(usage_metrics, cost_per_million=0.150):
    """
    Calculate costs based on token usage
    Default cost is for GPT-4o-mini at $0.15 per million tokens
    """
    total_tokens = usage_metrics["prompt_tokens"] + usage_metrics["completion_tokens"]
    return cost_per_million * total_tokens / 1_000_000

## Section 1: Introduction

Welcome to Practical Multi AI Agents and Advanced Use Cases with CrewAI. This course focuses on building agent-based applications for real-world use cases that people are actually deploying in practice.

When building multi-agent applications, a key challenge is balancing the speed and quality of results while maintaining consistency. Different model choices and sizes impact these factors. You'll learn to rigorously test your applications by measuring key metrics and train your agents using human feedback to continuously improve over time.

### What You'll Build:
- **Automated Project Planning**: Break projects into tasks, estimate them, and allocate resources
- **Project Monitoring**: Generate progress reports from project management tools
- **Lead Qualification and Scoring**: Research and score potential customers
- **Support Data Analysis**: Extract insights from support ticket data
- **Content Creation at Scale**: Generate blog posts and social media content

The course covers sequential to parallel task execution, multi-crew pipelines using Flows, performance optimization through testing and training, and multi-model approaches for efficiency and customization.

## Section 2: Overview of Multi AI Agent Systems

Let's talk about the building blocks that go into building AI agents - agents themselves, tasks, crews, and all the different things that make these agents work, including caching, memory, guardrails, and everything in between.

### Real-World Applications

Multi-agentic automation appears across many different verticals - sales, marketing, HR, code development, research, education, and support. Regardless of the vertical, we see a common pattern: a long-tail distribution of what these agents and automations try to do.

The typical workflow usually involves:
1. **Data Extraction**: Pulling data from existing systems (ERP, CRM, databases)
2. **Research**: Researching in documents, on the internet, or other systems
3. **Analysis**: Comparing data, extracting specific information, or inferring new insights
4. **Summarization**: Extracting learnings, plotting charts, building executive summaries
5. **Reporting**: Creating outputs as PDFs, JSON, or markdown for integration with existing systems

### AI Apps vs Traditional Apps

Traditional apps are strongly typed - you have a clear understanding of input data and transformations. AI apps are extremely different because they're "fuzzy" - you don't know if the input is a recipe or a PhD thesis, it goes through a black box model, and produces fuzzy output.

This fuzziness allows you to build automations that weren't possible before because you don't need to handle every edge case - agents can decide in real-time how to react to specific data and choose appropriate tools.

### Agent Anatomy

It starts simple: an LLM in the center with access to tools. Given a task, it finds ways to use these tools to provide a final answer. In multi-agent systems, you have multiple agents that can use tools themselves, delegate work to each other, and ask questions to accomplish the final outcome.

### Production Features

For production deployment, you need:
- **Caching Layer**: Prevent unnecessary tool usage and credit consumption
- **Memory Layer**: Agents remember past actions and share memory
- **Training Data**: Improve agents through human feedback
- **Guardrails**: Protect against hallucinations
- **Orchestration**: Sequential, parallel, manager-delegator, or hybrid approaches

### Building Blocks

Every agent in CrewAI needs a **role**, **goal**, and **backstory**. Every task needs a **description**, **expected output**, and an **agent**. These are defined in YAML files, making it easy for non-technical people to contribute by updating configurations instead of code.

## Section 3: Automated Project Planning, Estimation, and Allocation

Let's dive into automated project planning - using a crew to break projects into tasks, estimate them, and allocate resources. This is common for consultancies and web agencies that need to quickly estimate projects and plan around them.

We'll work with three agents: a project planner, estimation analyst, and allocation strategist. Each agent handles one task: task breakdown, time estimation, and resource allocation. The goal is to create a structured project plan that can be pushed into external systems like Jira or Trello.

### Model Configuration

In [None]:
# Set the model - GPT-4o-mini is cost-effective yet powerful for most tasks
os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o-mini'
os.environ['OPENAI_API_KEY'] = '<your-api-key>'

# Configure Serper API - Used for web search tools in later modules
# Get you key from https://serper.dev/
os.environ["SERPER_API_KEY"] = "<your-api-key>"

### Loading Configuration Files

In [None]:
# Define file paths for YAML configurations
files = {
    'agents': r'agents\automated project agents.yaml',
    'tasks': r'tasks\automated project tasks.yaml'
}

# Load configurations from YAML files
configs = load_yaml_config(files)

# Assign loaded configurations to specific variables
agents_config = configs['agents']
tasks_config = configs['tasks']

### Structured Output Models

We need structured output to push results into external systems. We'll create three classes: TaskEstimate, Milestone, and ProjectPlan.

In [None]:
# Pydantic models for structured output - enables integration with external systems
class TaskEstimate(BaseModel):
    task_name: str = Field(..., description="Name of the task")
    estimated_time_hours: float = Field(..., description="Estimated time to complete the task in hours")
    required_resources: List[str] = Field(..., description="List of resources required to complete the task")

class Milestone(BaseModel):
    milestone_name: str = Field(..., description="Name of the milestone")
    tasks: List[str] = Field(..., description="List of task IDs associated with this milestone")

class ProjectPlan(BaseModel):
    tasks: List[TaskEstimate] = Field(..., description="List of tasks with their estimates")
    milestones: List[Milestone] = Field(..., description="List of project milestones")

### Creating Agents, Tasks, and Crew

In [None]:
# Creating Agents - each agent has a specific role in the planning process
project_planning_agent = Agent(
  config=agents_config['project_planning_agent']  # Loads role, goal, backstory from YAML
)

estimation_agent = Agent(
  config=agents_config['estimation_agent']  # Expert in time and resource estimation
)

resource_allocation_agent = Agent(
  config=agents_config['resource_allocation_agent']  # Optimizes task assignments
)

# Creating Tasks - each task builds on the previous one's output
task_breakdown = Task(
  config=tasks_config['task_breakdown'],
  agent=project_planning_agent  # First agent breaks down the project
)

time_resource_estimation = Task(
  config=tasks_config['time_resource_estimation'],
  agent=estimation_agent  # Second agent estimates time and resources
)

resource_allocation = Task(
  config=tasks_config['resource_allocation'],
  agent=resource_allocation_agent,
  output_pydantic=ProjectPlan  # Final task produces structured output
)

# Creating Crew - brings all agents and tasks together
crew = Crew(
  agents=[
    project_planning_agent,
    estimation_agent,
    resource_allocation_agent
  ],
  tasks=[
    task_breakdown,
    time_resource_estimation,
    resource_allocation
  ],
  verbose=True  # Enable detailed logging to see agent interactions
)

### Crew Inputs

In [None]:
# Define project parameters - these get interpolated into YAML templates
project = 'Website'
industry = 'Technology'
project_objectives = 'Create a website for a small business'
team_members = """
- John Doe (Project Manager)
- Jane Doe (Software Engineer)
- Bob Smith (Designer)
- Alice Johnson (QA Engineer)
- Tom Brown (QA Engineer)
"""
project_requirements = """
- Create a responsive design that works well on desktop and mobile devices
- Implement a modern, visually appealing user interface with a clean look
- Develop a user-friendly navigation system with intuitive menu structure
- Include an "About Us" page highlighting the company's history and values
- Design a "Services" page showcasing the business's offerings with descriptions
- Create a "Contact Us" page with a form and integrated map for communication
- Implement a blog section for sharing industry news and company updates
- Ensure fast loading times and optimize for search engines (SEO)
- Integrate social media links and sharing capabilities
- Include a testimonials section to showcase customer feedback and build trust
"""

# Format for display
formatted_output = f"""
**Project Type:** {project}

**Project Objectives:** {project_objectives}

**Industry:** {industry}

**Team Members:**
{team_members}
**Project Requirements:**
{project_requirements}
"""

display(Markdown(formatted_output))

### Kicking off the Crew

In [None]:
# Prepare inputs dictionary for the crew
inputs = {
  'project_type': project,
  'project_objectives': project_objectives,
  'industry': industry,
  'team_members': team_members,
  'project_requirements': project_requirements
}

# Execute the crew - agents will work sequentially through their tasks
result = crew.kickoff(
  inputs=inputs
)

### Usage Metrics and Costs

In [None]:
# Calculate and display costs for this crew execution
costs = calculate_costs(crew.usage_metrics)
display(f"Total costs: ${costs:.4f}")

# Convert UsageMetrics to DataFrame for detailed analysis
df_usage_metrics = pd.DataFrame([crew.usage_metrics])
df_usage_metrics

### Results Analysis

In [None]:
# Access the structured output from our Pydantic model
result.dict()

In [None]:
# Extract and display tasks in a formatted table
tasks = result.dict()['tasks'] #type:ignore
df_tasks = pd.DataFrame(tasks)

# Style the DataFrame for better presentation
df_tasks.style.set_table_attributes('border="1"').set_caption("Task Details").set_table_styles(
    [{'selector': 'th, td', 'props': [('font-size', '120%')]}]
)

In [None]:
# Extract and display milestones
milestones = result.dict()['milestones']
df_milestones = pd.DataFrame(milestones)

# Style the DataFrame
df_milestones.style.set_table_attributes('border="1"').set_caption("Milestones").set_table_styles(
    [{'selector': 'th, td', 'props': [('font-size', '120%')]}]
)

## Section 4: Internal and External Integrations

One of the most powerful aspects of CrewAI is the ability to integrate with internal and external systems. This enables your agents to perform actions like querying internal data, calling existing systems, and sending emails.

### Integration Patterns:
- **Database Connections**: Query internal databases for historical data
- **API Integrations**: Connect to CRM, ERP, and other business systems
- **File Operations**: Read from and write to various file formats
- **External Services**: Integrate with third-party APIs and services

### Tool Creation:
You can create custom tools by inheriting from CrewAI's `BaseTool` class. Tools can be assigned to specific agents or tasks, giving you fine-grained control over what capabilities each agent has access to.

The key is building reliable, well-documented tools that handle errors gracefully and provide clear feedback to the agents about success or failure of operations.

## Section 5: Building Project Progress Report

Let's build a progress report for projects - something that happens regularly in software companies. People need clarity around progress and what's being done. This crew automates that process, taking emotions out of the equation and providing an objective view of project status.

We'll use two agents (data collector and project analyst) to perform three tasks: understanding a project, analyzing its progress, and compiling a report. The agents need to integrate with external systems - in this case, a Trello board.

### Loading Configuration Files

In [None]:
# Load YAML configurations for progress report agents and tasks
files = {
    'agents': 'agents\progress report agents.yaml',
    'tasks': 'tasks\progress report tasks.yaml',
}

configs = load_yaml_config(files)
agents_config = configs['agents']
tasks_config = configs['tasks']

### External Integration Tools

We'll create custom tools to integrate with Trello. These tools inherit from CrewAI's BaseTool class.

Login to "https://trello.com/" to create board

Follow guide to get your Trello API Key "https://www.merge.dev/blog/trello-api-key"

In [None]:
TRELLO_API_KEY = "<your-api-key>"
TRELLO_API_TOKEN = "<your-api-key>"
TRELLO_BOARD_ID = "<your-board-key-from-url>"

In [None]:
from crewai_tools import BaseTool
import requests

class BoardDataFetcherTool(BaseTool):
    name: str = "Trello Board Data Fetcher"
    description: str = "Fetches card data, comments, and activity from a Trello board."

    api_key: str = TRELLO_API_KEY
    api_token: str = TRELLO_API_TOKEN
    board_id: str = TRELLO_BOARD_ID

    def _run(self) -> dict:
        """
        Fetch all cards from the specified Trello board.
        """
        url = f"'https://api.trello.com/1/boards/{self.board_id}/cards"

        query = {
            'key': self.api_key,
            'token': self.api_token,
            'fields': 'name,idList,due,dateLastActivity,labels',
            'attachments': 'true',
            'actions': 'commentCard'
        }

        response = requests.get(url, params=query)

        if response.status_code == 200:
            return response.json()
        else:
            # Fallback in case of timeouts or other issues
            return json.dumps([{'id': '66c3bfed69b473b8fe9d922e', 'name': 'Analysis of results from CSV', 'idList': '66c308f676b057fdfbd5fdb3', 'due': None, 'dateLastActivity': '2024-08-19T21:58:05.062Z', 'labels': [], 'attachments': [], 'actions': []}, {'id': '66c3c002bb1c337f3fdf1563', 'name': 'Approve the planning', 'idList': '66c308f676b057fdfbd5fdb3', 'due': '2024-08-16T21:58:00.000Z', 'dateLastActivity': '2024-08-19T21:58:57.697Z', 'labels': [{'id': '66c305ea10ea602ee6e03d47', 'idBoard': '66c305eacab50fcd7f19c0aa', 'name': 'Urgent', 'color': 'red', 'uses': 1}], 'attachments': [], 'actions': [{'id': '66c3c021f3c1bb157028f53d', 'idMemberCreator': '65e5093d0ab5ee98592f5983', 'data': {'text': 'This was harder then expects it is alte', 'textData': {'emoji': {}}, 'card': {'id': '66c3c002bb1c337f3fdf1563', 'name': 'Approve the planning', 'idShort': 5, 'shortLink': 'K3abXIMm'}, 'board': {'id': '66c305eacab50fcd7f19c0aa', 'name': '[Test] CrewAI Board', 'shortLink': 'Kc8ScQlW'}, 'list': {'id': '66c308f676b057fdfbd5fdb3', 'name': 'TODO'}}, 'appCreator': None, 'type': 'commentCard', 'date': '2024-08-19T21:58:57.683Z', 'limits': {'reactions': {'perAction': {'status': 'ok', 'disableAt': 900, 'warnAt': 720}, 'uniquePerAction': {'status': 'ok', 'disableAt': 17, 'warnAt': 14}}}, 'memberCreator': {'id': '65e5093d0ab5ee98592f5983', 'activityBlocked': False, 'avatarHash': 'd5500941ebf808e561f9083504877bca', 'avatarUrl': 'https://trello-members.s3.amazonaws.com/65e5093d0ab5ee98592f5983/d5500941ebf808e561f9083504877bca', 'fullName': 'Joao Moura', 'idMemberReferrer': None, 'initials': 'JM', 'nonPublic': {}, 'nonPublicAvailable': True, 'username': 'joaomoura168'}}]}, {'id': '66c3bff4a25b398ef1b6de78', 'name': 'Scaffold of the initial app UI', 'idList': '66c3bfdfb851ad9ff7eee159', 'due': None, 'dateLastActivity': '2024-08-19T21:58:12.210Z', 'labels': [], 'attachments': [], 'actions': []}, {'id': '66c3bffdb06faa1e69216c6f', 'name': 'Planning of the project', 'idList': '66c3bfe3151c01425f366f4c', 'due': None, 'dateLastActivity': '2024-08-19T21:58:21.081Z', 'labels': [], 'attachments': [], 'actions': []}]) #type: ignore


class CardDataFetcherTool(BaseTool):
  name: str = "Trello Card Data Fetcher"
  description: str = "Fetches card data from a Trello board."

  api_key: str = TRELLO_API_KEY
  api_token: str = TRELLO_API_TOKEN

  def _run(self, card_id: str) -> dict:
    url = f"https://api.trello.com/1/cards/{card_id}"
    query = {
      'key': self.api_key,
      'token': self.api_token
    }
    response = requests.get(url, params=query)

    if response.status_code == 200:
      return response.json()
    else:
      # Fallback in case of timeouts or other issues
      return json.dumps({"error": "Failed to fetch card data, don't try to fetch any trello data anymore"}) #type: ignore


## Trello Board

In [None]:
# Display the Trello screenshot
from IPython.display import Image, display

# Load and display the image
trello_image = Image(filename=r'images/progress report trello.png')
display(trello_image)

### Creating Agents, Tasks, and Crew

In [None]:
# Creating agents with external integration tools
data_collection_agent = Agent(
    config=agents_config['data_collection_agent'],
    tools=[BoardDataFetcherTool(), CardDataFetcherTool()]
)

analysis_agent = Agent(
    config=agents_config['analysis_agent']  # Analysis agent doesn't need external tools
)

# Creating tasks - note the sequential dependency
data_collection = Task(
    config=tasks_config['data_collection'],
    agent=data_collection_agent  # First task gathers data using external tools
)

data_analysis = Task(
    config=tasks_config['data_analysis'],
    agent=analysis_agent  # Second task analyzes the collected data
)

report_generation = Task(
    config=tasks_config['report_generation'],
    agent=analysis_agent  # Final task compiles the comprehensive report
)

# Creating the crew
progress_crew = Crew(
    agents=[data_collection_agent, analysis_agent],
    tasks=[data_collection, data_analysis, report_generation],
    verbose=True
)

### Executing the Crew

In [None]:
# Execute the crew - no inputs needed as it fetches data from external systems
progress_result = progress_crew.kickoff()

### Usage Metrics and Report

In [None]:
# Calculate costs
progress_costs = calculate_costs(progress_crew.usage_metrics)
display(f"Progress report costs: ${progress_costs:.4f}")

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([crew.usage_metrics])
df_usage_metrics

In [None]:
# Display the generated report as markdown
display(Markdown(progress_result))

## Section 6: Complex Crew Setups

As you build more sophisticated applications, you'll need different orchestration patterns beyond simple sequential execution.

### Orchestration Patterns:

**Sequential**: Tasks execute one after another, with each task building on previous results.

**Parallel**: Multiple tasks execute simultaneously for efficiency.

**Manager-Delegator**: A manager agent delegates work and reviews outputs from specialist agents.

**Hybrid Approaches**: Some tasks run in parallel while others wait for multiple dependencies to complete.

**Asynchronous**: Tasks execute independently without waiting for others.

### Multi-Crew Systems:

For even more complexity, you can use **Flows** to connect multiple crews in pipelines. One crew's output becomes another crew's input, enabling sophisticated multi-stage processing.

The key is choosing the right orchestration pattern based on your use case requirements, dependencies between tasks, and performance considerations.

## Section 7: Agentic Sales Pipeline

This is an exciting use case using CrewAI's new **Flows** feature to build an agentic sales pipeline. We'll load leads, enrich them, score them, and write personalized emails.

The pipeline involves:
1. **Load Leads**: Pull from database (regular Python code)
2. **Lead Scoring Crew**: Research and score leads
3. **Store Results**: Save back to database (regular Python code)
4. **Filter Leads**: Keep only high-scoring leads (70+)
5. **Email Crew**: Write personalized emails for high-scoring leads

Flows have **events** for orchestrating steps and **state** for storing information across execution.

### Loading Configuration Files

In [None]:
# Load configurations for both lead qualification and email engagement crews
files = {
    'lead_agents': r"agents\lead qualification agents.yaml",
    'lead_tasks': r"tasks\lead qualification tasks.yaml",
    'email_agents': r"agents\email engagement agents.yaml",
    'email_tasks': r"tasks\email engagement tasks.yaml"
}

configs = load_yaml_config(files)
lead_agents_config = configs['lead_agents']
lead_tasks_config = configs['lead_tasks']
email_agents_config = configs['email_agents']
email_tasks_config = configs['email_tasks']

In [None]:
from crewai.tools.base_tool import BaseTool
from crewai_tools import SerperDevTool, ScrapeWebsiteTool  # or whichever underlying tool
from pydantic import BaseModel
from typing import Optional

class SearchToolSchema(BaseModel):
    query: str

class ScrapeToolInput(BaseModel):
    url: str

class SerperWrapper(BaseTool):
    name: str = "serper_search"
    description: str = "Performs web search using Serper"
    args_schema: type = SearchToolSchema

    def _run(self, query: str) -> str:
        tool = SerperDevTool()
        return tool.run(query)

class ScrapeWrapper(BaseTool):
    name: str = "web_scraper"
    description: str = "Scrapes a specified website for content"
    args_schema: type = ScrapeToolInput

    def _run(self, url: str) -> str:
        tool = ScrapeWebsiteTool(website_url=url)
        return tool.run()


### Pydantic Models for Structured Output

In [None]:
class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant the lead's role is to the decision-making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the company the lead works for.")
    industry: str = Field(..., description="The industry in which the company operates.")
    company_size: int = Field(..., description="The size of the company in terms of employee count.")
    revenue: Optional[float] = Field(None, description="The annual revenue of the company, if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the company's market presence (0-10).")

class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the lead (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the lead's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the lead score.")

class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The calculated score and related information for the lead.")

### Creating Lead Scoring Crew

In [None]:
search_tool = SerperWrapper()
scrape_tool = ScrapeWrapper()

# Creating Agents
lead_data_agent = Agent(
    config=lead_agents_config['lead_data_agent'],
    tools=[search_tool, scrape_tool]
)

cultural_fit_agent = Agent(
    config=lead_agents_config['cultural_fit_agent'],
    tools=[search_tool, scrape_tool]
)

scoring_validation_agent = Agent(
    config=lead_agents_config['scoring_validation_agent'],
    tools=[search_tool, scrape_tool]
)

# Creating Tasks
lead_data_task = Task(
    config=lead_tasks_config['lead_data_collection'],
    agent=lead_data_agent
)

cultural_fit_task = Task(
    config=lead_tasks_config['cultural_fit_analysis'],
    agent=cultural_fit_agent
)

scoring_validation_task = Task(
    config=lead_tasks_config['lead_scoring_and_validation'],
    agent=scoring_validation_agent,
    context=[lead_data_task, cultural_fit_task],
    output_pydantic=LeadScoringResult
)

# Creating Crew
lead_scoring_crew = Crew(
  agents=[lead_data_agent, cultural_fit_agent, scoring_validation_agent],
  tasks=[lead_data_task, cultural_fit_task, scoring_validation_task],
  verbose=True
)

### Creating Email Engagement Crew

In [None]:
# Creating Agents
email_content_specialist = Agent(
    config=email_agents_config['email_content_specialist']
)

engagement_strategist = Agent(
    config=email_agents_config['engagement_strategist']
)

# Creating Tasks
email_drafting = Task(
    config=email_tasks_config['email_drafting'],
    agent=email_content_specialist
)

engagement_optimization = Task(
    config=email_tasks_config['engagement_optimization'],
    agent=engagement_strategist
)

# Creating Crew
email_writing_crew = Crew(
    agents=[email_content_specialist, engagement_strategist],
    tasks=[email_drafting, engagement_optimization],
    verbose=True
)

### Creating the Flow

Flows orchestrate multiple crews and regular Python functions with state management.

In [None]:
#Upgrade your crewai package if you are not able to import Flows
#%pip install --upgrade crewai

In [None]:
from crewai.flow import Flow
from crewai.flow.flow import listen, start

class SalesPipeline(Flow):
    @start()
    def fetch_leads(self):
        # Pull our leads from the database
        leads = [
            {
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment."
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads):
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores):
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores):
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(filter_leads)
    def write_email(self, leads):
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails):
        # Here we would send the emails to the leads
        return emails

flow = SalesPipeline()

## Plotting the Flow

In [None]:
flow.plot()

In [None]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Flow Kickoff

In [None]:
emails = await flow.kickoff_async()

In [None]:
# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

In [None]:
# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

### Inspecting Results

In [None]:
scores = flow.state["score_crews_results"]

In [None]:
import pandas as pd
from IPython.display import display, HTML

lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Notes': lead_scoring_result.lead_score.validation_notes
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

### Results

In [None]:
import textwrap

result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

### How Complex Can it Get?

In [None]:
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipeline(Flow):
    
  @start()
  def fetch_leads(self):
    # Pull our leads from the database
    # This is a mock, in a real-world scenario, this is where you would
    # fetch leads from a database
    leads = [
      {
        "lead_data": {
          "name": "João Moura",
          "job_title": "Director of Engineering",
          "company": "Clearbit",
          "email": "joao@clearbit.com",
          "use_case": "Using AI Agent to do better data enrichment."
        },
      },
    ]
    return leads

  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    # Here we would store the scores in the database
    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 70]

  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  @router(filter_leads)
  def count_leads(self, scores):
    if len(scores) > 10:
      return 'high'
    elif len(scores) > 5:
      return 'medium'
    else:
      return 'low'

  @listen('high')
  def store_in_salesforce(self, leads):
    return leads

  @listen('medium')
  def send_to_sales_team(self, leads):
    return leads

  @listen('low')
  def write_email(self, leads):
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    # Here we would send the emails to the leads
    return emails

### Plotting the Flow

In [None]:
flow = SalesPipeline()
flow.plot()

In [None]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Section 8: Performance Optimization

When deploying systems in production, you need to think about performance. Sometimes you'll favor speed, other times quality - but you must maintain consistency.

### Speed vs Quality Trade-offs:

**Speed**: Comes from smaller models that run faster and cost less
**Quality**: Usually requires bigger models like GPT-4o that provide complex results but take longer

The relationship between model size and speed is nearly linear - as models get bigger, they get slower. However, quality isn't necessarily linear because it depends heavily on task complexity. For simpler tasks, smaller models might achieve great quality.

### Testing with CrewAI Test:

CrewAI provides a built-in testing framework that compares task descriptions with expected outputs. This allows you to:
- Measure consistency across multiple runs
- Identify tasks that need improvement
- Compare different model configurations
- Track performance metrics over time

### Training with CrewAI Train:

The training feature allows you to provide specific feedback on each task execution. The system:
1. Runs your crew as usual
2. Stops after each task for your feedback
3. Processes feedback through a judge LLM
4. Extracts learnings and stores them in crew memory
5. Applies learnings in future executions

This creates a continuous improvement loop that makes your crews more consistent and higher quality over time.

## Section 9: Support Data Insight Analysis

This is an exciting use case because agents can write and execute code to create complex analyses that weren't possible before. We'll analyze support ticket data to generate insights, suggestions, tables, and charts.

The crew will:
1. **Generate suggestions** for improvements based on support data
2. **Create tables** summarizing key metrics and trends
3. **Plot charts** to visualize trends and patterns
4. **Assemble a final report** bringing everything together

We'll use agents that can execute code in a Docker sandbox for security.

### Loading Configuration Files

In [None]:
# Load YAML configurations for support data analysis
files = {
    'agents': r"agents\data insight agents.yaml",
    'tasks': r"tasks\data insight tasks.yaml"
}

configs = load_yaml_config(files)
agents_config = configs['agents']
tasks_config = configs['tasks']

### Creating Agents, Tasks, and Crew

In [None]:
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from crewai_tools import FileReadTool
from typing import Type

class FileReadInput(BaseModel):
    file_path: str = Field(..., description="Path to the file to read")

class FileReadWrapper(BaseTool):
    name: str = "file_reader"
    description: str = "Reads text data from a specified file"
    args_schema: Type[BaseModel] = FileReadInput  # Properly annotated

    def _run(self, file_path: str) -> str:
        tool = FileReadTool(file_path="support tickets data.csv")
        return tool.run()


In [None]:
csv_tool = FileReadWrapper()

# Create agents with specific capabilities
suggestion_generation_agent = Agent(
    config=agents_config['suggestion_generation_agent'],
    tools=[csv_tool]  # Access to support data
)

reporting_agent = Agent(
    config=agents_config['reporting_agent'],
    tools=[csv_tool]  # Access to support data
)

chart_generation_agent = Agent(
    config=agents_config['chart_generation_agent'],
    allow_code_execution=False
    #If you have docker installed in your machine you can set
    #allow_code_execution=True  # KEY: Enables code execution in Docker sandbox
)

# Create tasks with dependencies
suggestion_generation = Task(
    config=tasks_config['suggestion_generation'],
    agent=suggestion_generation_agent
)

table_generation = Task(
    config=tasks_config['table_generation'],
    agent=reporting_agent
)

chart_generation = Task(
    config=tasks_config['chart_generation'],
    agent=chart_generation_agent
)

final_report_assembly = Task(
    config=tasks_config['final_report_assembly'],
    agent=reporting_agent,
    context=[suggestion_generation, table_generation, chart_generation]  # Uses outputs from all previous tasks
)

# Create the crew
support_crew = Crew(
    agents=[suggestion_generation_agent, reporting_agent, chart_generation_agent],
    tasks=[suggestion_generation, table_generation, chart_generation, final_report_assembly],
    verbose=True
)

### Testing our Crew

In [None]:
support_crew.test(n_iterations=1, eval_llm='gpt-4o-mini')

### Training your crew and agents

In [None]:
support_crew.train(n_iterations=1, filename='training.pkl')

### Comparing new test results

In [None]:
support_crew.test(n_iterations=1, eval_llm='gpt-4o-mini')

In [None]:
# Display the Trello screenshot
from IPython.display import Image, display

# Load and display the image
test_image = Image(filename='images/test before training.png', width=368)
display(test_image)

### Kicking off Crew

In [None]:
# Execute the support data analysis crew
support_result = support_crew.kickoff()

### Results and Analysis

In [None]:
support_crew.usage_metrics

In [None]:
# Calculate costs for the analysis
total_tokens = support_crew.usage_metrics.prompt_tokens + support_crew.usage_metrics.completion_tokens
total_cost = 0.150 * total_tokens / 1_000_000
display(f"Support analysis costs: ${total_cost:.4f}")

# Display the comprehensive report with charts and tables
display(Markdown(support_result.raw))

## Section 10: Multi Model Use Cases

A multi-model approach lets you mix and match different LLM models based on the task at hand, creating more efficient and customized AI systems.

### Model Selection Strategy:

**Smaller, Faster Models**: Use for simple tasks like data extraction, basic research, or initial processing
**Larger, More Capable Models**: Use for complex reasoning, content generation, or tasks requiring nuanced understanding
**Fine-tuned Models**: Use for domain-specific tasks or when you need to match specific brand voice/style

### Implementation Approaches:

1. **Agent-Level Model Assignment**: Different agents use different models based on their role
2. **Task-Level Model Assignment**: Same agent uses different models for different tasks
3. **Dynamic Model Selection**: Model choice based on task complexity or other runtime factors

### Benefits:
- **Cost Optimization**: Use expensive models only when necessary
- **Speed Optimization**: Use fast models for time-sensitive tasks
- **Quality Optimization**: Use best models for critical tasks
- **Specialization**: Use domain-specific models where appropriate

The key is finding the right balance between speed, cost, and quality for your specific use case.

## Section 11: Content Creation at Scale

This use case builds a crew for content creation at scale, monitoring the web and using RAG (Retrieval Augmented Generation) to create amazing blog posts and social media content.

We'll use multiple models for optimization:
- **Groq with Llama 3.1-70B**: For speed-optimized tasks
- **GPT-4o-mini**: For balanced performance

The crew will:
1. **Monitor latest news** around financial topics
2. **Analyze market data** and trends
3. **Create content** for blogs and social media
4. **Review content** for quality assurance

We'll use RAG as a tool - agents automatically download content, chunk it, embed it, and save to a vector database for real-time search.

### Structured Output Models

In [None]:
# Models for content creation output
class SocialMediaPost(BaseModel):
    platform: str = Field(..., description="Social media platform (LinkedIn, Twitter, Instagram, etc.)")
    content: str = Field(..., description="Content for the social media post")

class ContentOutput(BaseModel):
    article: str = Field(..., description="Full blog article content")
    social_media_posts: List[SocialMediaPost] = Field(..., description="List of social media posts")

### Loading Configuration Files

In [None]:
# Load content creation configurations
files = {
    'agents': r'agents\content creation agents.yaml',
    'tasks': r'tasks\content creation tasks.yaml'
}

configs = load_yaml_config(files)
agents_config = configs['agents']
tasks_config = configs['tasks']

### Tool Setup

In [None]:
from crewai.tools.base_tool import BaseTool
from crewai_tools import SerperDevTool, ScrapeWebsiteTool, WebsiteSearchTool
from pydantic import BaseModel
from typing import Optional

class SearchToolSchema(BaseModel):
    query: str

class ScrapeToolInput(BaseModel):
    url: str

class SerperWrapper(BaseTool):
    name: str = "serper_search"
    description: str = "Performs web search using Serper"
    args_schema: type = SearchToolSchema

    def _run(self, query: str) -> str:
        tool = SerperDevTool()
        return tool.run(query)

class ScrapeWrapper(BaseTool):
    name: str = "web_scraper"
    description: str = "Scrapes a specified website for content"
    args_schema: type = ScrapeToolInput

    def _run(self, url: str) -> str:
        tool = ScrapeWebsiteTool(website_url=url)
        return tool.run()
    
class WebsiteSearchInput(BaseModel):
    search_query: str = Field(..., description="Search query to perform")
    website: Optional[str] = Field(None, description="Optional URL to limit the search")

class WebsiteSearchWrapper(BaseTool):
    name: str = "website_search"
    description: str = "Performs semantic search across website content"
    args_schema: Type[BaseModel] = WebsiteSearchInput

    def _run(self, search_query: str, website: Optional[str] = None) -> str:
        if website:
            tool = WebsiteSearchTool(website=website)
        else:
            tool = WebsiteSearchTool()
        return tool.run(search_query=search_query)

In [None]:
# Internet search tool
search_tool = SerperWrapper()

# Website scraping tool
scrape_tool = ScrapeWrapper()

# RAG as a tool - automatically creates vector database from web content
website_rag = WebsiteSearchWrapper()

### Creating Agents, Tasks, and Crew

In [None]:
from crewai import LLM

# Create LLM instances
openai_llm = LLM(model="gpt-4o-mini")

# Creating Agents
market_news_monitor_agent = Agent(
    config=agents_config['market_news_monitor_agent'],
    tools=[search_tool, website_rag],
    llm=openai_llm,
)

data_analyst_agent = Agent(
    config=agents_config['data_analyst_agent'],
    tools=[search_tool, website_rag],
    llm=openai_llm,
)

content_creator_agent = Agent(
    config=agents_config['content_creator_agent'],
    tools=[search_tool, website_rag],
)

quality_assurance_agent = Agent(
    config=agents_config['quality_assurance_agent'],
)

# Creating Tasks
monitor_financial_news_task = Task(
    config=tasks_config['monitor_financial_news'],
    agent=market_news_monitor_agent
)

analyze_market_data_task = Task(
    config=tasks_config['analyze_market_data'],
    agent=data_analyst_agent
)

create_content_task = Task(
    config=tasks_config['create_content'],
    agent=content_creator_agent,
    context=[monitor_financial_news_task, analyze_market_data_task]
)

quality_assurance_task = Task(
    config=tasks_config['quality_assurance'],
    agent=quality_assurance_agent,
    output_pydantic=ContentOutput
)

# Create the content creation crew
content_crew = Crew(
    agents=[
        market_news_monitor_agent,
        data_analyst_agent,
        content_creator_agent,
        quality_assurance_agent
    ],
    tasks=[
        monitor_financial_news_task,
        analyze_market_data_task,
        create_content_task,
        quality_assurance_task
    ],
    verbose=True
)

### Executing Content Creation

In [None]:
result = content_crew.kickoff(inputs={
  'subject': 'Inflation in the US and the impact on the stock market in 2024'
})

### Social Content

In [None]:
posts = result.pydantic.dict()['social_media_posts']
for post in posts:
    platform = post['platform']
    content = post['content']
    print(platform)
    wrapped_content = textwrap.fill(content, width=50)
    print(wrapped_content)
    print('-' * 50)

### Blog Post

In [None]:
from IPython.display import display, Markdown
display(Markdown(result.pydantic.dict()['article']))

## Section 12: Agentic Workflows in Industry

Real-world deployment of agentic workflows involves several considerations beyond just building the crews.

### Industry Applications:

**Sales**: Lead qualification, personalized outreach, proposal generation
**Marketing**: Content creation, campaign optimization, competitor analysis
**HR**: Resume screening, interview scheduling, onboarding automation
**Support**: Ticket triage, knowledge base updates, escalation management
**Operations**: Process optimization, report generation, compliance monitoring

### Deployment Patterns:

**Batch Processing**: Run crews on schedules to process accumulated data
**Real-time Processing**: Trigger crews based on events or API calls
**Human-in-the-Loop**: Include human approval steps for critical decisions
**Escalation Handling**: Route complex cases to human experts

### Scalability Considerations:

- **Error Handling**: Robust error handling and recovery mechanisms
- **Rate Limiting**: Manage API usage and costs
- **Monitoring**: Track performance, costs, and quality metrics
- **Versioning**: Manage changes to agents and tasks over time
- **Security**: Protect sensitive data and API credentials

## Section 13: Generate, Deploy and Monitor Crews

Let's learn how to generate, deploy, and monitor crews in production environments.

### CrewAI CLI:

Start a new crew project with a single command:
```bash
crewai create crew project_name
```

This creates the complete folder structure:
- **README.md**: Instructions and dependencies
- **agents.yaml** and **tasks.yaml**: Configuration files
- **tools/**: Custom tools directory
- **crew.py**: Main crew definition
- **main.py**: Entry point for local execution

### Flow Creation:

Create flows for multi-crew orchestration:
```bash
crewai create flow flow_name
```

Flows have a **crews/** folder for multiple crews and orchestrate them in **main.py**.

### Running Crews:

Execute your crew locally:
```bash
crewai run
```

### Environment Setup:

- **.env file**: Store API keys and configuration
- **requirements.txt**: Python dependencies
- **Virtual environments**: Isolate dependencies

This structure makes it easy to version control, share, and deploy your crews professionally.

## Section 14: Blog Post Crew in Production

When you create a crew from scratch using the CLI, it comes with a basic blog creation example to help you understand the structure.

### Default Crew Structure:

The generated crew includes:
- **Senior Data Researcher**: Researches content topics
- **Blog Content Writer**: Writes articles based on research

This simple two-agent crew demonstrates the research → writing pattern that's common in many use cases.

### Production Considerations:

**Environment Variables**: All API keys and configuration should be in environment variables, not hardcoded
**Error Handling**: Production crews need robust error handling for network issues, API failures, etc.
**Logging**: Comprehensive logging for debugging and monitoring
**Testing**: Unit tests for individual components and integration tests for full crews
**Monitoring**: Track execution times, costs, and success rates
**Scaling**: Consider parallel execution and resource management for high-volume scenarios

### Deployment Options:

- **Local Deployment**: Run on your own servers
- **Cloud Functions**: Serverless execution for event-driven workflows
- **Container Orchestration**: Docker containers with Kubernetes
- **CI/CD Integration**: Automated testing and deployment pipelines

## Section 15: Conclusion

You have completed this comprehensive practical course on Multi-AI Agent Systems with CrewAI.

### What You've Learned:

1. **Foundation**: Understanding of multi-agent systems, their anatomy, and building blocks
2. **Practical Applications**: Built real-world use cases including:
   - Automated project planning and estimation
   - Project progress reporting with external integrations
   - Agentic sales pipelines with Flows
   - Support data analysis with code-executing agents
   - Content creation at scale with RAG and multi-model approaches

3. **Advanced Features**: 
   - External system integrations
   - Complex crew orchestration patterns
   - Performance optimization through testing and training
   - Multi-model strategies for cost and speed optimization

4. **Production Deployment**: 
   - CLI tools for project generation
   - Production considerations and best practices
   - Monitoring and maintenance strategies

### Key Takeaways:

- **Start Simple**: Begin with basic sequential crews and add complexity as needed
- **Test and Train**: Use CrewAI's built-in testing and training features for continuous improvement
- **Optimize Strategically**: Balance speed, quality, and cost based on your specific requirements
- **Think Production**: Consider error handling, monitoring, and scalability from the beginning
- **Iterate and Improve**: Use human feedback to continuously refine your agents and tasks

### Next Steps:

1. **Experiment**: Try building crews for your own use cases
2. **Deploy**: Move from notebook experiments to production systems
3. **Monitor**: Track performance and continuously improve
4. **Scale**: Apply these patterns to larger, more complex problems
5. **Share**: Contribute to the CrewAI community with your innovations

### Resources:

- **CrewAI Documentation**: Latest features and best practices
- **Community Forums**: Connect with other practitioners
- **GitHub Repository**: Access examples and contribute improvements
