# Venue Search Agentic Pipeline

## Initial Imports

In [5]:
# Warning control
import warnings
warnings.filterwarnings('ignore')

# Load environment variables
from helper import load_env
load_env()

import os
import yaml
from crewai import Agent, Task, Crew

## Load API tokens for our 3rd party APIs

In [6]:
os.environ['OPENAI_MODEL_NAME'] = 'o1-mini'

## Loading Tasks and Agents YAML files

In [7]:
# Define file paths for YAML configurations
files = {
    'data_collection_agents': 'config/agents/data_collection_agents.yaml',
    'data_collection_tasks': 'config/tasks/data_collection_tasks.yaml',
    'location_analysis_agents': 'config/agents/location_analysis_agents.yaml',
    'location_analysis_tasks': 'config/tasks/location_analysis_tasks.yaml',
    'email_engagement_agents': 'config/agents/email_engagements_agents.yaml',
    'email_engagement_tasks': 'config/tasks/email_engagement_tasks.yaml',
    'feature_extraction_agents': 'config/agents/feature_extraction_agents.yaml',
    'feature_extraction_tasks': 'config/tasks/feature_extraction_tasks.yaml',
    'scoring_agents': 'config/agents/scoring_agents.yaml',
    'scoring_tasks': 'config/tasks/scoring_tasks.yaml',
    'reporting_agents': 'config/agents/reporting_agents.yaml',
    'reporting_tasks': 'config/tasks/reporting_tasks.yaml'
}

# Load configurations from YAML files
configs = {}
for config_type, file_path in files.items():
    with open(file_path, 'r') as file:
        configs[config_type] = yaml.safe_load(file)

# Assign loaded configurations to specific variables
data_collection_agents_config = configs['data_collection_agents']
data_collection_tasks_config = configs['data_collection_tasks']
location_analysis_agents_config = configs['location_analysis_agents']
location_analysis_tasks_config = configs['location_analysis_tasks']
feature_extraction_agents_config = configs['feature_extraction_agents']
feature_extraction_tasks_config = configs['feature_extraction_tasks']
scoring_agents_config = configs['scoring_agents']
scoring_tasks_config = configs['scoring_tasks']
email_engagement_agents_config = configs['email_engagement_agents']
email_engagement_tasks_config = configs['email_engagement_tasks']
reporting_agents_config = configs['reporting_agents']
reporting_tasks_config = configs['reporting_tasks']

## Create Pydantic Models for Structured Output

In [8]:
from pydantic import BaseModel, Field
from typing import Dict, Optional, List, Set, Tuple

# Venue Search Pydantic Models
class Venue(BaseModel):
    name: str = Field(..., description="The name of the venue.")
    location: str = Field(..., description="The location of the venue.")
    capacity: int = Field(..., description="The capacity of the venue in terms of attendees.")
    amenities: List[str] = Field(..., description="A list of amenities available at the venue.")
    accessibility: str = Field(..., description="The accessibility of the venue for attendees with disabilities.")
    pricing: float = Field(..., description="The pricing of the venue per attendee.")
    website: str = Field(..., description="The website of the venue.")
    contact_info: str = Field(..., description="The contact information of the venue.")
    reviews: List[str] = Field(..., description="A list of reviews of the venue.")
    photos: List[str] = Field(..., description="A list of photos of the venue.")
    capacity: int = Field(..., description="The capacity of the venue in terms of attendees.")
    amenities: List[str] = Field(..., description="A list of amenities available at the venue.")
    accessibility: str = Field(..., description="The accessibility of the venue for attendees with disabilities.")
    pricing: float = Field(..., description="The pricing of the venue per attendee.")
    website: str = Field(..., description="The website of the venue.")
    contact_info: str = Field(..., description="The contact information of the venue.")
    reviews: List[str] = Field(..., description="A list of reviews of the venue.")
    photos: List[str] = Field(..., description="A list of photos of the venue.")

class VenueScore(BaseModel):
    score: int = Field(..., ge=0, le=100, description="The final score assigned to the venue (0-100).")
    scoring_criteria: List[str] = Field(..., description="The criteria used to determine the venue's score.")
    validation_notes: Optional[str] = Field(None, description="Any notes regarding the validation of the venue score.")

class VenueScoringResult(BaseModel):
    venue: Venue = Field(..., description="The venue object with all its attributes.")
    venue_score: VenueScore = Field(..., description="The calculated score and related information for the venue.") 



## Importing Tools

In [9]:
from crewai_tools import SerperDevTool, ScrapeWebsiteTool

## Lead Qualification Crew, Agents and Tasks

In [10]:
# Creating Agents

data_collection_agent = Agent(
  config=data_collection_agents_config['data_collector'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

location_analysis_agent = Agent(
  config=location_analysis_agents_config['location_analyst'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

feature_extraction_agent = Agent(
  config=feature_extraction_agents_config['feature_extractor'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

scoring_agent = Agent(
  config=scoring_agents_config['scoring_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

email_engagement_agent = Agent(
  config=email_engagement_agents_config['email_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

reporting_agent = Agent(
  config=reporting_agents_config['reporting_agent'],
  tools=[SerperDevTool(), ScrapeWebsiteTool()]
)

# Creating Tasks
data_collection_task = Task(
  config=data_collection_tasks_config['collect_conference_data'],
  agent=data_collection_agent
)

location_analysis_task = Task(
  config=location_analysis_tasks_config['identify_venues'],
  agent=location_analysis_agent
)

feature_extraction_task = Task(
  config=feature_extraction_tasks_config['extract_features'],
  agent=feature_extraction_agent
)

scoring_task = Task(
  config=scoring_tasks_config['score_venues'],
  agent=scoring_agent,
  context=[data_collection_task, location_analysis_task, feature_extraction_task],
  output_pydantic=VenueScoringResult
)

email_engagement_task = Task(
  config=email_engagement_tasks_config['generate_emails'],
  agent=email_engagement_agent,
  context=[scoring_task],
  output_pydantic=VenueScoringResult
)

reporting_task = Task(
  config=reporting_tasks_config['generate_report'],
  agent=reporting_agent,
  context=[scoring_task],
  output_pydantic=VenueScoringResult
)

# Creating Crew
venue_scoring_crew = Crew(
  agents=[
    data_collection_agent,
    location_analysis_agent,
    feature_extraction_agent,
    scoring_agent,
    email_engagement_agent,
    reporting_agent
  ],
  tasks=[
    data_collection_task,
    location_analysis_task,
    feature_extraction_task,
    scoring_task,
    email_engagement_task,
    reporting_task
  ],
  verbose=True
)

## Creating Complete Venue Search Flow

In [11]:
from crewai import Flow
from crewai.flow.flow import listen, start

class VenueSearchPipeline(Flow):
    @start()
    def collect_venue_data(self):
        # Collect initial venue and conference data
        venues = [
            {
                "conference_name": "TECHSPO New York 2024",
                "city": "New York ",
                "radius_km": 10,
                "venue_data": {
                    "name": "New York Marriott at the Brooklyn Bridge Hotel", 
                    "location": "333 Adams St, Brooklyn, NY 11201, United States",
                    "capacity": 5000,
                    "amenities": ["TECHSPO Hall", "Technology Training Theater", "Networking Luncheon"]
                }
            }
        ]
        return venues

    @listen(collect_venue_data) 
    def analyze_locations(self, venues):
        analysis = venue_scoring_crew.kickoff_for_each(venues)
        self.state["venue_analysis_results"] = analysis
        return analysis

    @listen(analyze_locations)
    def extract_features(self, analysis):
        # Extract and analyze venue features
        return venue_scoring_crew.kickoff()

    @listen(extract_features)
    def score_venues(self, features):
        scores = venue_scoring_crew.kickoff()
        return [venue for venue in scores if venue['venue_score'].score > 80]

    @listen(score_venues)
    def generate_emails(self, scored_venues):
        venue_contacts = [venue.to_dict() for venue in scored_venues]
        emails = venue_scoring_crew.kickoff_for_each(venue_contacts)
        return emails

    @listen(generate_emails)
    def generate_report(self, outreach_data):
        # Generate final report with venue analysis and recommendations
        return venue_scoring_crew.kickoff()

flow = VenueSearchPipeline()

## Plotting the Flow

In [12]:
VenueSearchPipeline().plot()

Plot saved as crewai_flow.html


In [13]:
from IPython.display import IFrame

IFrame(src='./crewai_flow.html', width='150%', height=600)

## Flow Kickoff

In [14]:
import asyncio
try:
    loop = asyncio.get_event_loop()
    if loop.is_running():
        # If we're in a running loop, create a task
        emails = await flow.kickoff_async()
    else:
        # If no loop is running, we can create and run one
        emails = loop.run_until_complete(flow.kickoff_async())
except RuntimeError:
    # Fallback if we can't get the event loop
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    emails = loop.run_until_complete(flow.kickoff_async())
    loop.close()

emails



[1m[95m# Agent:[00m [1m[92mData Collection Agent[00m
[95m## Task:[00m [92mResearch and collect detailed information about TECHSPO New York 2024  in New York . Include the exact location, dates, and expected attendance.
[00m


[1m[95m# Agent:[00m [1m[92mData Collection Agent[00m
[95m## Thought:[00m [92mThought: I need to find information about TECHSPO New York 2024, including location, dates, and expected attendance.[00m
[95m## Using tool:[00m [92mSearch the internet[00m
[95m## Tool Input:[00m [92m
"{\"search_query\": \"TECHSPO New York 2024 conference details\"}"[00m
[95m## Tool Output:[00m [92m

Search results: Title: TECHSPO New York 2025 · Technology Expo · July 7 - 8, 2025 ...
Link: https://techsponyc.com/
Snippet: Where Business, Tech and Innovation Collide in NYC! TECHSPO New York is an annual 2 day Technology Expo which showcases the next generation of technology ...
---
Title: 2024 Technology Expos - techspo
Link: https://techspo.co/2024-technolog

## Usage Metrics and Costs

Let’s see how much it would cost each time if this crew runs at scale.

In [12]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([flow.state["score_crews_results"][0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0038


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,25294,23256,10496,2038,10


In [13]:
import pandas as pd

# Convert UsageMetrics instance to a DataFrame
df_usage_metrics = pd.DataFrame([emails[0].token_usage.dict()])

# Calculate total costs
costs = 0.150 * df_usage_metrics['total_tokens'].sum() / 1_000_000
print(f"Total costs: ${costs:.4f}")

# Display the DataFrame
df_usage_metrics

Total costs: $0.0002


Unnamed: 0,total_tokens,prompt_tokens,cached_prompt_tokens,completion_tokens,successful_requests
0,1303,1071,0,232,2


## Inspecting Results

In [14]:
scores = flow.state["score_crews_results"]

In [15]:
import pandas as pd
from IPython.display import display, HTML

lead_scoring_result = scores[0].pydantic

# Create a dictionary with the nested structure flattened
data = {
    'Name': lead_scoring_result.personal_info.name,
    'Job Title': lead_scoring_result.personal_info.job_title,
    'Role Relevance': lead_scoring_result.personal_info.role_relevance,
    'Professional Background': lead_scoring_result.personal_info.professional_background,
    'Company Name': lead_scoring_result.company_info.company_name,
    'Industry': lead_scoring_result.company_info.industry,
    'Company Size': lead_scoring_result.company_info.company_size,
    'Revenue': lead_scoring_result.company_info.revenue,
    'Market Presence': lead_scoring_result.company_info.market_presence,
    'Lead Score': lead_scoring_result.lead_score.score,
    'Scoring Criteria': ', '.join(lead_scoring_result.lead_score.scoring_criteria),
    'Validation Notes': lead_scoring_result.lead_score.validation_notes
}

# Convert the dictionary to a DataFrame
df = pd.DataFrame.from_dict(data, orient='index', columns=['Value'])

# Reset the index to turn the original column names into a regular column
df = df.reset_index()

# Rename the index column to 'Attribute'
df = df.rename(columns={'index': 'Attribute'})

# Create HTML table with bold attributes and left-aligned values
html_table = df.style.set_properties(**{'text-align': 'left'}) \
                     .format({'Attribute': lambda x: f'<b>{x}</b>'}) \
                     .hide(axis='index') \
                     .to_html()

# Display the styled HTML table
display(HTML(html_table))

Attribute,Value
Name,João Moura
Job Title,Director of Engineering
Role Relevance,8
Professional Background,"João Moura has nearly 20 years of experience in the software engineering field. He is currently in a leadership role at Clearbit, where he has been integral in building and scaling the engineering team. His interests include data science, game development, and open source contributions. He previously worked as a senior software engineer and has a strong technical background in multiple programming languages, making him a well-rounded professional in his field."
Company Name,Clearbit
Industry,B2B Marketing Intelligence / Data Enrichment
Company Size,139
Revenue,31.500000
Market Presence,8
Lead Score,78


## Results

In [16]:
import textwrap

result_text = emails[0].raw
wrapped_text = textwrap.fill(result_text, width=80)
print(wrapped_text)

João, imagine transforming Clearbit's automation with CrewAI's groundbreaking
Multi-Agent Orchestration Platform. Seamlessly orchestrate AI Agents tailored
specifically for your B2B marketing intelligence needs. Picture enhanced data
enrichment processes and skyrocketing efficiencies.   Take the next step:
**[Schedule a meeting now](#)** to unveil how CrewAI can revolutionize your
engineering objectives. Don’t miss this opportunity to lead your team into an
automated future. **[Explore more](#)** today and see immediate benefits for
your engineering goals.


## How Complex Can it Get?

In [23]:
from crewai import Flow
from crewai.flow.flow import listen, start, and_, or_, router

class SalesPipeline(Flow):
    
  @start()
  def fetch_leads(self):
    # Pull our leads from the database
    # This is a mock, in a real-world scenario, this is where you would
    # fetch leads from a database
    leads = [
      {
        "lead_data": {
          "name": "João Moura",
          "job_title": "Director of Engineering",
          "company": "Clearbit",
          "email": "joao@clearbit.com",
          "use_case": "Using AI Agent to do better data enrichment."
        },
      },
    ]
    return leads

  @listen(fetch_leads)
  def score_leads(self, leads):
    scores = lead_scoring_crew.kickoff_for_each(leads)
    self.state["score_crews_results"] = scores
    return scores

  @listen(score_leads)
  def store_leads_score(self, scores):
    # Here we would store the scores in the database
    return scores

  @listen(score_leads)
  def filter_leads(self, scores):
    return [score for score in scores if score['lead_score'].score > 70]

  @listen(and_(filter_leads, store_leads_score))
  def log_leads(self, leads):
    print(f"Leads: {leads}")

  @router(filter_leads)
  def count_leads(self, scores):
    if len(scores) > 10:
      return 'high'
    elif len(scores) > 5:
      return 'medium'
    else:
      return 'low'

  @listen('high')
  def store_in_salesforce(self, leads):
    return leads

  @listen('medium')
  def send_to_sales_team(self, leads):
    return leads

  @listen('low')
  def write_email(self, leads):
    scored_leads = [lead.to_dict() for lead in leads]
    emails = email_writing_crew.kickoff_for_each(scored_leads)
    return emails

  @listen(write_email)
  def send_email(self, emails):
    # Here we would send the emails to the leads
    return emails

## Plotting the Flow

In [24]:
flow = SalesPipeline()
flow.plot()

Plot saved as crewai_flow.html


In [36]:
from IPython.display import IFrame

IFrame(src='./mixercloud_venue_search_flow.html', width='150%', height=600)


In [43]:
import dash
from dash import html

app = dash.Dash(__name__)

app.layout = html.Div([
    html.Iframe(src="./mixercloud_venue_search_flow.html", width="800", height="600")
])


ModuleNotFoundError: No module named 'dash'