In [2]:
# Warning control
import warnings
warnings.filterwarnings('ignore')


import os
import yaml
from crewai import Agent, Task, Crew, LLM
from dotenv import load_dotenv
import pandas as pd

In [3]:
llm = LLM(
    model="groq/llama3-8b-8192",
    api_key=os.getenv("GROQ_API_KEY")
)

In [4]:
files = {
    'lead_agents': 'config/lead_qualification_agents.yaml',
    'lead_tasks': 'config/lead_qualification_tasks.yaml',
    'email_agents': 'config/email_engagement_agents.yaml',
    'email_tasks': 'config/email_engagement_tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

In [5]:
lead_agents_config = configs['lead_agents']
lead_tasks_config = configs['lead_tasks']
email_agents_config = configs['email_agents']
email_tasks_config = configs['email_tasks']

In [6]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

class LeadPersonalInfo(BaseModel):
    name: str = Field(..., description="The full name of the lead.")
    job_title: str = Field(..., description="The job title of the lead.")
    role_relevance: int = Field(..., ge=0, le=10, description="A score representing how relevant the lead's role is to the decision-making process (0-10).")
    professional_background: Optional[str] = Field(..., description="A brief description of the lead's professional background.")

class CompanyInfo(BaseModel):
    company_name: str = Field(..., description="The name of the company the lead works for.")
    industry: str = Field(..., description="The industry in which the company operates.")
    company_size: int = Field(..., description="The size of the company in terms of employee count.")
    revenue: Optional[float] = Field(None, description="The annual revenue of the company, if available.")
    market_presence: int = Field(..., ge=0, le=10, description="A score representing the company's market presence (0-10).")

class LeadScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the lead (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the lead's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the lead score.")

class LeadScoringResult(BaseModel):
    personal_info: LeadPersonalInfo = Field(..., description="Personal information about the lead.")
    company_info: CompanyInfo = Field(..., description="Information about the lead's company.")
    lead_score: LeadScore = Field(..., description="The calculated score and related information for the lead.")

In [7]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

In [8]:
# Agents
lead_data_agent = Agent(
  config=lead_agents_config['lead_data_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

cultural_fit_agent = Agent(
  config=lead_agents_config['cultural_fit_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

scoring_validation_agent = Agent(
  config=lead_agents_config['scoring_validation_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

# Tasks
lead_data_task = Task(
  config=lead_tasks_config['lead_data_collection'],
  agent=lead_data_agent
)

cultural_fit_task = Task(
  config=lead_tasks_config['cultural_fit_analysis'],
  agent=cultural_fit_agent
)

scoring_validation_task = Task(
  config=lead_tasks_config['lead_scoring_and_validation'],
  agent=scoring_validation_agent,
  context=[lead_data_task, cultural_fit_task],
  output_pydantic=LeadScoringResult
)

lead_scoring_crew = Crew(
  agents=[
    lead_data_agent,
    cultural_fit_agent,
    scoring_validation_agent
  ],
  tasks=[
    lead_data_task,
    cultural_fit_task,
    scoring_validation_task
  ],
  verbose=True
)

In [9]:
# Creating Agents
email_content_specialist = Agent(
  config=email_agents_config['email_content_specialist']
)

engagement_strategist = Agent(
  config=email_agents_config['engagement_strategist']
)

# Creating Tasks
email_drafting = Task(
  config=email_tasks_config['email_drafting'],
  agent=email_content_specialist
)

engagement_optimization = Task(
  config=email_tasks_config['engagement_optimization'],
  agent=engagement_strategist
)

# Creating Crew
email_writing_crew = Crew(
  agents=[
    email_content_specialist,
    engagement_strategist
  ],
  tasks=[
    email_drafting,
    engagement_optimization
  ],
  verbose=True
)



In [10]:
import gspread
from google.oauth2.service_account import Credentials
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

In [23]:
class SalesPipeline(Flow):
  @start()
  def fetch_leads(self):
    google_sheet_id = os.getenv("GOOGLE_SHEET_ID")
    worksheet_name = os.getenv("WORKSHEET_NAME")
    service_account_json = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")

    if not google_sheet_id or not worksheet_name or not service_account_json:
        raise ValueError("Missing required environment variables for Google Sheets configuration.")

    credentials = Credentials.from_service_account_file(
        service_account_json,
        scopes=["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"]
    )
    client = gspread.authorize(credentials)
    sheet = client.open_by_key(google_sheet_id).worksheet(worksheet_name)
    rows = sheet.get_all_records()
    leads = [
        {
            "lead_data": {
                "name": row.get("Name", "").strip(),
                "job_title": row.get("Job Title", "").strip(),
                "company": row.get("Company", "").strip(),
                "email": row.get("Email", "").strip(),
                "use_case": row.get("Use Case", "").strip(),
            }
        }
        for row in rows
    ]

    return leads


  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    
    google_sheet_id = os.getenv("GOOGLE_SHEET_ID")
    worksheet_name = os.getenv("WORKSHEET_NAME")
    service_account_json = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")

    if not google_sheet_id or not worksheet_name or not service_account_json:
        raise ValueError("Missing required environment variables for Google Sheets configuration.")


    credentials = Credentials.from_service_account_file(
        service_account_json,
        scopes=["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"]
    )
    client = gspread.authorize(credentials)
    sheet = client.open_by_key(google_sheet_id).worksheet(worksheet_name)

    rows = sheet.get_all_records()

    if len(scores) != len(rows):
        raise ValueError("The number of scores does not match the number of rows in the Google Sheet.")

    headers = sheet.row_values(1)
    if "Lead Score" not in headers:
        headers.append("Lead Score")
        sheet.update("1:1", [headers])

    
    lead_score_column_index = headers.index("Lead Score") + 1

    for i, score in enumerate(scores, start=2):  # Start from row 2 (excluding headers)
        sheet.update_cell(i, lead_score_column_index, score)

    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 65]

  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  @router(filter_leads)
  def count_leads(self, scores):
    if len(scores) > 30:
      return 'high'
    else:
      return 'low'

  @listen('high')
  def store_in_newsheet(self, leads):
    google_sheet_id = os.getenv("GOOGLE_SHEET_ID")
    service_account_json = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")

    if not google_sheet_id or not service_account_json:
        raise ValueError("Missing required environment variables for Google Sheets configuration.")

    credentials = Credentials.from_service_account_file(
        service_account_json,
        scopes=["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"]
    )
    client = gspread.authorize(credentials)
    sheet = client.open_by_key(google_sheet_id)

    try:
        hot_leads_sheet = sheet.worksheet("Hot Leads")
    except gspread.WorksheetNotFound:
        hot_leads_sheet = sheet.add_worksheet(title="Hot Leads", rows=100, cols=20)
    hot_leads_sheet.clear()

    if leads:
        headers = list(leads[0].keys())
        hot_leads_sheet.append_row(headers)

    for lead in leads:
        hot_leads_sheet.append_row(list(lead.values()))

    return leads

  @listen('low')
  def write_email(self, leads):
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    print(emails)
    
    return emails

In [24]:
flow = SalesPipeline()

In [25]:
flow.plot()

Plot saved as crewai_flow.html


In [None]:
emails = await flow.kickoff()