# L3: Agentic Sales Pipeline

In [12]:
from crewai import Agent, Task, Crew

In [13]:
import os 
import yaml
import json
from dotenv import load_dotenv
load_dotenv('../../../.env')

model=os.environ.get('model')
print(model)

azure/gpt-4


In [14]:
# Define file paths for YAML configurations
files = {
    'lead_agents': 'config/3_sales/lead_qualification_agents.yaml',
    'lead_tasks': 'config/3_sales/lead_qualification_tasks.yaml',
    'email_agents': 'config/3_sales/email_engagement_agents.yaml',
    'email_tasks': 'config/3_sales/email_engagement_tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
lead_agents_config = configs['lead_agents']
lead_tasks_config = configs['lead_tasks']
email_agents_config = configs['email_agents']
email_tasks_config = configs['email_tasks']

## Create Pydantic Models for Structured Output

In [15]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant the lead's role is to the decision-making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the company the lead works for.")
    industry: str = Field(..., description="The industry in which the company operates.")
    company_size: int = Field(..., description="The size of the company in terms of employee count.")
    revenue: Optional[float] = Field(None, description="The annual revenue of the company, if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the company's market presence (0-10).")

class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the lead (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the lead's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the lead score.")

class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The calculated score and related information for the lead.")

## Importing Tools

In [16]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

## Lead Qualification Crew, Agents and Tasks

In [17]:
# Creating Agents
lead_data_agent = Agent(
  config=lead_agents_config['lead_data_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

cultural_fit_agent = Agent(
  config=lead_agents_config['cultural_fit_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

scoring_validation_agent = Agent(
  config=lead_agents_config['scoring_validation_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

# Creating Tasks
lead_data_task = Task(
  config=lead_tasks_config['lead_data_collection'],
  agent=lead_data_agent
)

cultural_fit_task = Task(
  config=lead_tasks_config['cultural_fit_analysis'],
  agent=cultural_fit_agent
)

scoring_validation_task = Task(
  config=lead_tasks_config['lead_scoring_and_validation'],
  agent=scoring_validation_agent,
  context=[lead_data_task, cultural_fit_task],
  output_pydantic=LeadScoringResult
)

# Creating Crew
lead_scoring_crew = Crew(
  agents=[
    lead_data_agent,
    cultural_fit_agent,
    scoring_validation_agent
  ],
  tasks=[
    lead_data_task,
    cultural_fit_task,
    scoring_validation_task
  ],
  verbose=True
)



## Email Engagement Crew

In [18]:
# Creating Agents
email_content_specialist = Agent(
  config=email_agents_config['email_content_specialist']
)

engagement_strategist = Agent(
  config=email_agents_config['engagement_strategist']
)

# Creating Tasks
email_drafting = Task(
  config=email_tasks_config['email_drafting'],
  agent=email_content_specialist
)

engagement_optimization = Task(
  config=email_tasks_config['engagement_optimization'],
  agent=engagement_strategist
)

# Creating Crew
email_writing_crew = Crew(
  agents=[
    email_content_specialist,
    engagement_strategist
  ],
  tasks=[
    email_drafting,
    engagement_optimization
  ],
  verbose=True
)



## Creating Complete Sales Flow

In [19]:
from crewai import Flow
from crewai.flow.flow import listen, start

class SalesPipeline(Flow):
    @start()
    def fetch_leads(self):
        print('*** Inside method fetch_leads')
        # Pull our leads from the database
        leads = [
            {
                "id": 1001,
                "lead_data": {
                    "name": "João Moura",
                    "job_title": "Director of Engineering",
                    "company": "Clearbit",
                    "email": "joao@clearbit.com",
                    "use_case": "Using AI Agent to do better data enrichment."
                },
            },
        ]
        return leads

    @listen(fetch_leads)
    def score_leads(self, leads):
        print('*** Inside method score_leads')
        scores = lead_scoring_crew.kickoff_for_each(leads)
        self.state["score_crews_results"] = scores
        return scores

    @listen(score_leads)
    def store_leads_score(self, scores):
        print('*** Inside method store_leads_score')
        # Here we would store the scores in the database
        return scores

    @listen(score_leads)
    def filter_leads(self, scores):
        print('*** Inside method filter_leads')
        return [score for score in scores if score['lead_score'].score > 70]

    @listen(filter_leads)
    def write_email(self, leads):
        print('*** Inside method write_email')
        scored_leads = [lead.to_dict() for lead in leads]
        emails = email_writing_crew.kickoff_for_each(scored_leads)
        return emails

    @listen(write_email)
    def send_email(self, emails):
        print('*** Inside method send_email')
        # Here we would send the emails to the leads
        return emails

flow = SalesPipeline()

## Plotting the Flow

In [20]:
flow.plot()

Plot saved as crewai_flow.html


In [21]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Kickoff Flow

In [22]:
import nest_asyncio
nest_asyncio.apply()

emails = await flow.kickoff_async()



*** Inside method fetch_leads
*** Inside method score_leads
[1m[95m# Agent:[00m [1m[92mLead Data Specialist\n[00m
[95m## Task:[00m [92mCollect and analyze the following information about the lead:
- Personal Information:
  - Name: Obtain the full name of the lead.
  - Job Title: Determine the lead's current job title.
  - Role Relevance: Assess how relevant the lead's role is to the decision-making process on a scale from 0 to 10.
  - Professional Background: Optionally, gather a brief description of the lead's professional background.

- Company Information:
  - Company Name: Identify the name of the company the lead works for.
  - Industry: Determine the industry in which the company operates.
  - Company Size: Estimate the size of the company in terms of employee count.
  - Revenue: If available, collect information on the annual revenue of the company.
  - Market Presence: Evaluate the company's market presence on a scale from 0 to 10.

- Our Company and Product:
  - Compa





[1m[95m# Agent:[00m [1m[92mLead Scorer and Validator\n[00m
[95m## Final Answer:[00m [92m
{
  "personal_info": {
    "name": "João Moura",
    "job_title": "Director of Engineering",
    "role_relevance": 9,
    "professional_background": "Previous Senior Engineering Manager at Clearbit and likely CEO of CrewAI. Experience in building/scaling engineering teams, former startup CTO, speaker at various global events, instrumental in AI at Clearbit, teaching experience."
  },
  "company_info": {
    "company_name": "Clearbit",
    "industry": "B2B Marketing Intelligence Solutions",
    "company_size": 8,
    "revenue": null,
    "market_presence": 9
  },
  "lead_score": {
    "score": 85,
    "scoring_criteria": [
      "Role relevance indicates high decision-making power and direct relevance to the product offered by CrewAI.",
      "Company size suggests a broad scope of operation and potential needs for extensive AI orchestration.",
      "Market presence denotes significant i

## Usage Metrics and Costs

Let’s see how much it would cost each time if this crew runs at scale.

In [24]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0190


C:\Users\320086703\AppData\Local\Temp\ipykernel_14040\1963678899.py:4: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,126560,123024,0,3536,17


In [25]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0003


C:\Users\320086703\AppData\Local\Temp\ipykernel_14040\99626748.py:4: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,1685,1234,0,451,2


## Inspecting Results

In [26]:
scores = flow.state["score_crews_results"]

In [27]:
import pandas as pd
from IPython.display import display, HTML

lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Notes': lead_scoring_result.lead_score.validation_notes
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,João Moura
Job Title,Director of Engineering
Role Relevance,9
Professional Background,"Previous Senior Engineering Manager at Clearbit and likely CEO of CrewAI. Experience in building/scaling engineering teams, former startup CTO, speaker at various global events, instrumental in AI at Clearbit, teaching experience."
Company Name,Clearbit
Industry,B2B Marketing Intelligence Solutions
Company Size,8
Revenue,
Market Presence,9
Lead Score,85


## Results

In [29]:
import textwrap

result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

I see you're at the cutting edge of AI at Clearbit—an exciting place to be.
CrewAI's platform can amplify that edge.   Our platform integrates AI Agents
across various verticals, custom-tailored to enterprises eyeing agentic
automation. With CrewAI, imagine your engineering efforts magnified, teams
unified, and operations streamlined. Unique to your role and experience, I'm
certain the synergy between Clearbit and CrewAI could be transformative.  I'm
keen on discussing how we might elevate Clearbit's AI capabilities. Are you
available for a quick call this week to explore this potential? Let's maximize
your team's productivity together.   Click here [Insert Calendar Link] to book a
meeting, or reply with a time that suits you.   Looking forward to the
opportunity to collaborate,  [Your Name] [Your Position] CrewAI


## More Complex Workflows

In [32]:
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipeline(Flow):
    
  @start()
  def fetch_leads(self):
    # Pull our leads from the database
    # This is a mock, in a real-world scenario, this is where you would
    # fetch leads from a database
    leads = [
      {
        "lead_data": {
          "name": "João Moura",
          "job_title": "Director of Engineering",
          "company": "Clearbit",
          "email": "joao@clearbit.com",
          "use_case": "Using AI Agent to do better data enrichment."
        },
      },
    ]
    return leads

  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    # Here we would store the scores in the database
    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 70]

  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  @router(filter_leads)
  def count_leads(self, scores):
    if len(scores) > 10:
      return 'high'
    elif len(scores) > 5:
      return 'medium'
    else:
      return 'low'

  @listen('high')
  def store_in_salesforce(self, leads):
    return leads

  @listen('medium')
  def send_to_sales_team(self, leads):
    return leads

  @listen('low')
  def write_email(self, leads):
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    # Here we would send the emails to the leads
    return emails

In [34]:
#Plot the flow

flow = SalesPipeline()
flow.plot()

from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

Plot saved as crewai_flow.html
