In [1]:
import os
from dotenv import load_dotenv
from rich import print as rich_print
import logfire
from __future__ import annotations


from dataclasses import dataclass, field
from pydantic import BaseModel
from typing import List, Optional, Literal
from pydantic_ai import Agent, NativeOutput, RunContext

load_dotenv()

LOGFIRE_WRITE_TOKEN = os.getenv("LOGFIRE_WRITE_TOKEN")
logfire.configure(token=LOGFIRE_WRITE_TOKEN)
logfire.instrument_pydantic_ai()
logfire.instrument_httpx()

run_tests = True

In [2]:
import ollama
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.ollama import OllamaProvider


OLLAMA_MODEL_NAME = "gpt-oss:20b"
OLLAMA_BASE_URL = "http://localhost:11434"
ollama.pull(OLLAMA_MODEL_NAME)
model = OpenAIChatModel(
    model_name=OLLAMA_MODEL_NAME,
    provider=OllamaProvider(base_url=f"{OLLAMA_BASE_URL}/v1", api_key="ollama"),
)

10:07:31.953 POST 127.0.0.1/api/pull


In [3]:
initial_search_query = "executive leadership government defense jobs"

with open("GEORGE_WASHINGTON_RESUME.md", "r", encoding="utf-8") as f:
    candidate_resume = f.read()

In [4]:
import os
from tavily import AsyncTavilyClient

tavily_client = AsyncTavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))

In [5]:
class SearchResult(BaseModel):
    page_title: str
    page_description: str
    page_content: str
    url: str


class AnalyzedSearchResult(SearchResult):
    job_title: Optional[str] = None
    job_summary: Optional[str] = None
    should_skip: bool
    is_good_fit: bool
    fit_reasoning: Optional[str] = None


class ProspectiveJob(BaseModel):
    job_title: str
    job_summary: str
    url: str
    draft_cover_letter: str


class GraphOutput(BaseModel):
    prospective_jobs: List[ProspectiveJob]


@dataclass
class GraphDeps:
    search_max_results: int = 10
    max_search_iterations: int = 3
    target_num_prospective_jobs: int = 5


@dataclass
class GraphState:
    search_query: str
    candidate_resume: str
    prospective_jobs: List[ProspectiveJob] = field(default_factory=list)
    search_iteration: int = 0
    search_results: List[SearchResult] = field(default_factory=list)

In [6]:
from pydantic_graph.beta import GraphBuilder, StepContext, TypeExpression


gb = GraphBuilder[GraphState, GraphDeps, None, GraphOutput](
    state_type=GraphState, deps_type=GraphDeps, output_type=GraphOutput
)

# Search Prep


In [7]:
@dataclass
class SearchPrepDeps:
    search_query: str
    search_results: List[SearchResult]


class SearchPrepOutput(BaseModel):
    search_query: str
    reasoning: str


search_prep_agent = Agent[SearchPrepDeps, SearchPrepOutput](
    model,
    deps_type=SearchPrepDeps,
    output_type=NativeOutput[SearchPrepOutput](
        SearchPrepOutput,
        name="SearchPrepOutput",
        description="A refined search query for finding job postings",
    ),
    system_prompt="""
    you are an expert in job recruiting and job search.
    When given a search query, you can vary it in creative ways to find more job postings.
    """,
    name="Search Prep Agent",
)


@search_prep_agent.instructions
async def refine_search_instruction(ctx: RunContext[SearchPrepDeps]) -> str:
    """Refines the search query based on recent job postings."""
    deps = ctx.deps
    if deps.search_query:
        logfire.info("refining search query")
        recent_page_titles = [r.page_title for r in deps.search_results[-5:]]
        titles_str = ", ".join(recent_page_titles)
        instruction = f"""
        Based on these recent job posting titles we found: {titles_str}
        Suggest a refined search query to find similar but different job postings. 
        Current query: {deps.search_query}
        Provide a new refined search query that is under 400 characters.
        """
        return instruction
    else:
        return None


@logfire.instrument()
async def _search_prep(search_query: str, search_results: List[SearchResult]) -> str:
    logfire.info(f"refining {search_query=}")
    result = await search_prep_agent.run(
        deps=SearchPrepDeps(search_query=search_query, search_results=search_results)
    )
    output = result.output
    refined_search_query = output.search_query
    logfire.info(f"refined search query to {refined_search_query}")
    return refined_search_query


@gb.step
async def search_prep(ctx: StepContext[GraphState, GraphDeps, None]) -> str:
    """Prepares the search query for the job search."""
    state = ctx.state
    if state.search_iteration > 0:
        state.search_query = await _search_prep(
            state.search_query, state.search_results
        )
    else:
        logfire.info(f"using initial search query {state.search_query}")
    return state.search_query


In [8]:
if run_tests:
    refined_search_query = await _search_prep(initial_search_query, [])
    rich_print(refined_search_query)

10:07:32.942 Calling __main__._search_prep
10:07:32.945   refining search_query='executive leadership government defense jobs'
10:07:32.946   Search Prep Agent run
10:07:32.950     refining search query
10:07:32.951     chat gpt-oss:20b
10:07:33.116       POST localhost/v1/chat/completions
10:08:06.788   refined search query to ("executive leadership" OR "executive director" OR "senior exe...OR procurement OR risk) NOT (entry OR internship OR associate)


# Seach Job Postings


In [9]:
JOB_BOARD_DOMAINS = [
    "myworkdayjobs.com",
    "boards.greenhouse.io",
    "jobs.lever.co",
    "icims.com",
    "smartrecruiters.com",
    "workable.com",
    "jobvite.com",
    "bamboohr.com",
    "breezyhr.com",
    "jobs.ashbyhq.com",
    "taleo.net",
    "paylocity.com",
    "adp.com",
    "jazz.co",
]


@logfire.instrument()
async def _search_job_postings(
    search_query: str, search_max_results: int = 5
) -> List[SearchResult]:
    search_response = await tavily_client.search(
        query=search_query,
        auto_paragraphs=True,
        include_raw_content="markdown",
        max_results=search_max_results,
        include_domains=JOB_BOARD_DOMAINS,
        country="united states",
    )

    results = search_response.get("results", [])
    search_results = [
        SearchResult(
            page_title=result.get("title") or "",
            page_description=result.get("description") or "",
            page_content=result.get("raw_content") or "",
            url=result.get("url") or "",
        )
        for result in results
        if result.get("url")
    ]
    return search_results


@gb.step
async def search_job_postings(
    ctx: StepContext[GraphState, GraphDeps, str],
) -> List[SearchResult]:
    """Calls the tavily search tool to find new job postings, and tweaks the search query based on recent job postings."""
    search_query = ctx.inputs
    deps = ctx.deps
    state = ctx.state
    state.search_iteration += 1

    search_results = await _search_job_postings(search_query, deps.search_max_results)

    state.search_results = search_results
    return search_results

In [10]:
if run_tests:
    search_results = await _search_job_postings("AI engineer in Dallas, TX", 5)
    rich_print(search_results)

10:08:06.802 Calling __main__._search_job_postings
10:08:06.809   POST api.tavily.com/search


# Analyze Search Results


In [11]:
@dataclass
class SearchAnalyzerDeps:
    candidate_resume: str
    search_result: SearchResult


search_analyzer_agent = Agent[None, AnalyzedSearchResult](
    model,
    deps_type=SearchAnalyzerDeps,
    output_type=NativeOutput[AnalyzedSearchResult](
        AnalyzedSearchResult,
        name="AnalyzedSearchResult",
        description="Analysis search result determining if a job posting is a good fit for the candidate",
    ),
    name="Job Posting Analyzer",
)


@search_analyzer_agent.instructions
async def analyze_search_result_instructions(
    ctx: RunContext[SearchAnalyzerDeps],
) -> str:
    """Analyzes a job posting to determine if it is a good fit for the candidate."""
    deps = ctx.deps
    search_result = deps.search_result
    instructions = f"""
    Analyze this job posting and determine if the candidate is a good fit.

Job Posting:
Title: {search_result.page_title}
URL: {search_result.url}
Content: {search_result.page_content[:2000]}

Candidate Resume:
{ctx.deps.candidate_resume[:2000]}

Determine if this is:
1. A legitimate individual job posting (not a job board listing page or category page)
2. A good fit for the candidate based on their experience and skills


Set should_skip to true if this is not an individual job posting.
Set is_good_fit to true if the candidate is a good fit, false otherwise.
Provide clear reasoning for your decision.
Also set the job_title and job_summary fields to the job title and summary of the job posting.
"""
    return instructions


@logfire.instrument()
async def _analyze_search_result(
    search_result: SearchResult, candidate_resume: str
) -> AnalyzedSearchResult:
    result = await search_analyzer_agent.run(
        deps=SearchAnalyzerDeps(
            candidate_resume=candidate_resume,
            search_result=search_result,
        )
    )
    return result.output


@gb.step
async def analyze_search_result(
    ctx: StepContext[GraphState, GraphDeps, SearchResult],
) -> AnalyzedSearchResult:
    """Analyzes job postings to determine if they are a good fit for the candidate."""
    search_result = ctx.inputs
    candidate_resume = ctx.state.candidate_resume
    analyzed_search_result = await _analyze_search_result(
        search_result, candidate_resume
    )
    logfire.info(f"{analyzed_search_result=}")
    return analyzed_search_result


In [12]:
if run_tests:
    analyzed_search_result = await _analyze_search_result(
        search_results[0], candidate_resume
    )
    rich_print(analyzed_search_result)


10:08:07.941 Calling __main__._analyze_search_result
10:08:07.943   Job Posting Analyzer run
10:08:07.944     chat gpt-oss:20b
10:08:07.945       POST localhost/v1/chat/completions


# Analyze Prospective Job


In [13]:
@dataclass
class ProspectiveJobAnalyzerDeps:
    candidate_resume: str
    analyzed_search_result: AnalyzedSearchResult


prospective_job_analyzer_agent = Agent[None, ProspectiveJob](
    model,
    deps_type=ProspectiveJobAnalyzerDeps,
    output_type=NativeOutput[ProspectiveJob](
        ProspectiveJob,
        name="ProspectiveJob",
        description="Job Posting which includes a cover letter draft for the candidate.",
    ),
    name="Prospective Job Analyzer",
)


@prospective_job_analyzer_agent.instructions
async def analyze_prospective_job_instructions(
    ctx: RunContext[ProspectiveJobAnalyzerDeps],
) -> str:
    """Analyzes a job posting to determine if it is a good fit for the candidate."""
    deps = ctx.deps
    analyzed_search_result = deps.analyzed_search_result

    instructions = f"""
    Analyze this job posting and draft a cover letter for the candidate appropriate for their resume.
    Include a cover letter draft in the draft_cover_letter field along with a clear job_title and job_summary.
    Be sure to keep the URL unchanged in the url field.

Job Posting:
Title: {analyzed_search_result.job_title}
URL: {analyzed_search_result.url}
Job Description: {analyzed_search_result.page_content[:2000]}

Candidate Resume:
{ctx.deps.candidate_resume[:2000]}
"""
    return instructions


@logfire.instrument()
async def _analyze_prospective_job(
    candidate_resume: str, analyzed_search_result: AnalyzedSearchResult
) -> ProspectiveJob | None:
    if not analyzed_search_result.is_good_fit or analyzed_search_result.should_skip:
        return None
    result = await prospective_job_analyzer_agent.run(
        deps=ProspectiveJobAnalyzerDeps(
            candidate_resume=candidate_resume,
            analyzed_search_result=analyzed_search_result,
        )
    )
    prospective_job = result.output
    logfire.info(f"{prospective_job=}")
    return prospective_job


@gb.step
async def analyze_prospective_job(
    ctx: StepContext[GraphState, GraphDeps, AnalyzedSearchResult],
) -> ProspectiveJob | None:
    """Analyzes a job posting to determine if it is a good fit for the candidate."""
    state = ctx.state
    candidate_resume = state.candidate_resume
    analyzed_search_result = ctx.inputs
    prospective_job = await _analyze_prospective_job(
        candidate_resume, analyzed_search_result
    )
    if prospective_job is None:
        return None
    logfire.info(f"{prospective_job=}")
    if prospective_job.url not in [j.url for j in state.prospective_jobs]:
        state.prospective_jobs.append(prospective_job)
    return prospective_job

In [14]:
if run_tests:
    prospective_job = await _analyze_prospective_job(
        candidate_resume, analyzed_search_result
    )
    rich_print(prospective_job)

10:08:24.105 Calling __main__._analyze_prospective_job


# Construct the Agent Graph


In [15]:
from pydantic_graph.beta.join import reduce_list_append


collect = gb.join(reduce_list_append, initial_factory=list[ProspectiveJob])


@gb.step
@logfire.instrument()
async def output_decider(
    ctx: StepContext[GraphState, GraphDeps, None],
) -> Literal["continue_search"] | GraphOutput:
    state = ctx.state
    prospective_jobs = state.prospective_jobs
    if len(prospective_jobs) >= ctx.deps.target_num_prospective_jobs:
        return GraphOutput(prospective_jobs=prospective_jobs)
    elif state.search_iteration >= ctx.deps.max_search_iterations:
        return GraphOutput(prospective_jobs=prospective_jobs)
    else:
        return "continue_search"


gb.add(
    gb.edge_from(gb.start_node).to(search_prep),
    gb.edge_from(search_prep).to(search_job_postings),
    gb.edge_from(search_job_postings).map().to(analyze_search_result),
    gb.edge_from(analyze_search_result).to(analyze_prospective_job),
    gb.edge_from(analyze_prospective_job).to(collect),
    gb.edge_from(collect).to(output_decider),
    gb.edge_from(output_decider).to(
        gb.decision()
        .branch(gb.match(TypeExpression[Literal["continue_search"]]).to(search_prep))
        .branch(gb.match(TypeExpression[GraphOutput]).to(gb.end_node))
    ),
)

job_search_graph = gb.build()

In [16]:
from mermaid import Mermaid

mermaid_diagram = job_search_graph.render()
render = Mermaid(mermaid_diagram)
render

In [17]:
prospective_jobs = await job_search_graph.run(
    state=GraphState(
        candidate_resume=candidate_resume,
        search_query=initial_search_query,
    ),
    deps=GraphDeps(
        search_max_results=5,
        max_search_iterations=3,
        target_num_prospective_jobs=2,
    ),
)

10:08:24.794 run graph job_search_graph
10:08:24.795   run node search_prep
10:08:24.796     using initial search query executive leadership government defense jobs
10:08:24.796   run node search_job_postings
10:08:24.797     Calling __main__._search_job_postings
10:08:24.804       POST api.tavily.com/search
10:08:25.049   run node analyze_search_result
10:08:25.049     Calling __main__._analyze_search_result
10:08:25.051       Job Posting Analyzer run
10:08:25.051   run node analyze_search_result
10:08:25.051     Calling __main__._analyze_search_result
10:08:25.052       Job Posting Analyzer run
10:08:25.052   run node analyze_search_result
10:08:25.052     Calling __main__._analyze_search_result
10:08:25.053       Job Posting Analyzer run
10:08:25.053   run node analyze_search_result
10:08:25.054     Calling __main__._analyze_search_result
10:08:25.054       Job Posting Analyzer run
               run node analyze_search_result
                 Calling __main__._analyze_search_result

In [18]:
rich_print(prospective_jobs)

# Non-Graph version


In [19]:
import asyncio


async def non_graph_job_search(
    candidate_resume: str,
    initial_search_query: str,
    search_max_results: int = 5,
    max_search_iterations: int = 1,
    target_num_prospective_jobs: int = 2,
):
    search_query = initial_search_query
    prospective_jobs = []
    search_iteration = 0
    done = False
    while not done:
        search_iteration += 1
        search_results = await _search_job_postings(search_query, search_max_results)
        analyzed_search_results = await asyncio.gather(
            *[_analyze_search_result(sr, candidate_resume) for sr in search_results]
        )
        maybe_prospective_jobs = await asyncio.gather(
            *[
                _analyze_prospective_job(candidate_resume, asr)
                for asr in analyzed_search_results
            ]
        )
        prospective_jobs.extend([p for p in maybe_prospective_jobs if p is not None])
        if len(prospective_jobs) >= target_num_prospective_jobs:
            done = True
        elif search_iteration >= max_search_iterations:
            done = True
        else:
            search_query = await _search_prep(search_query, search_results)
    return prospective_jobs


In [20]:
prospective_jobs = await non_graph_job_search(candidate_resume, initial_search_query)

10:15:32.245 Calling __main__._search_job_postings
10:15:32.255   POST api.tavily.com/search
10:15:32.512 Calling __main__._analyze_search_result
10:15:32.514   Job Posting Analyzer run
10:15:32.514 Calling __main__._analyze_search_result
10:15:32.515   Job Posting Analyzer run
10:15:32.515 Calling __main__._analyze_search_result
10:15:32.516   Job Posting Analyzer run
10:15:32.517 Calling __main__._analyze_search_result
10:15:32.517   Job Posting Analyzer run
             Calling __main__._analyze_search_result
               Job Posting Analyzer run
10:15:32.520     chat gpt-oss:20b
10:15:32.523       POST localhost/v1/chat/completions
             Calling __main__._analyze_search_result
               Job Posting Analyzer run
10:15:32.524     chat gpt-oss:20b
10:15:32.526       POST localhost/v1/chat/completions
             Calling __main__._analyze_search_result
               Job Posting Analyzer run
10:15:32.526     chat gpt-oss:20b
10:15:32.527       POST localhost/v1/chat/comp

In [21]:
rich_print(prospective_jobs)