In [None]:
from dotenv import load_dotenv
from agents import set_default_openai_client, Agent, Runner, function_tool
from typing import Dict
import os
import requests
import asyncio
from openai import AsyncOpenAI

In [None]:
from agents import set_tracing_disabled

set_tracing_disabled(True)

In [None]:
load_dotenv(override=True)

In [None]:
client = AsyncOpenAI(base_url="https://exploration-openai-nithin.cognitiveservices.azure.com/openai/v1/")
set_default_openai_client(client)

In [None]:
PUBLIC_KEY: str =  os.getenv('EJS_PUBLIC_KEY')
SERVICE_ID: str = os.getenv('EJS_SERVICE_ID')
TEMPLATE_ID: str = os.getenv('EJS_TEMPLATE_ID')
ACCESS_TOKEN: str = os.getenv('EJS_ACCESS_TOKEN')
EMAIL_API_URL: str = os.getenv('EJS_EMAIL_API_URL')
SELF_COPY_EMAIL: str = os.getenv('EJS_SELF_EMAIL')

In [None]:
def build_email_payload(email: str, subject: str, html_body: str) -> Dict:
    return {
        "service_id": SERVICE_ID,
        "template_id": TEMPLATE_ID,
        "user_id": PUBLIC_KEY,
        "accessToken": ACCESS_TOKEN,
        "template_params": {
            "to_email": email,
            "subject": subject,
            "html_formatted_content": html_body
        }
    }

In [None]:
@function_tool
def send_email(subject: str, html_body: str, email: str) -> Dict[str, str]:
    if email != "None":
        user_payload = build_email_payload(email, subject, html_body)
        response = requests.post(url=EMAIL_API_URL, json=user_payload)
        print(response.text)
    return {"status": "ok"}

In [None]:
EMAIL_AGENT_INSTRUCTIONS = """
You are able to send a nicely formatted HTML email based on a detailed report.
You will be provided with a detailed report. You should use your tool to send one email,
providing the report converted into clean, well presented HTML with an appropriate subject line.
"""

In [None]:
email_agent = Agent(
    name="Email agent",
    instructions=EMAIL_AGENT_INSTRUCTIONS,
    tools=[send_email],
    model='gpt-4o-mini',
)

In [None]:
@function_tool
def google_search(query: str):
    """Searches Google for real-time information."""
    url = "https://google.serper.dev/search"
    payload = {"q": query}
    headers = {
        'X-API-KEY': os.environ.get("SERPER_API_KEY"),
        'Content-Type': 'application/json'
    }
    response = requests.post(url, headers=headers, json=payload)
    return response.json()

In [None]:
from datetime import datetime
now_str = datetime.now().strftime("%Y-%m")

In [None]:
HOW_MANY_SEARCHES = 2

PLANNER_AGENT_INSTRUCTIONS = f"You are a helpful research assistant. Given a query, come up with a set of web searches \
to perform to best answer the query. You are given the ability to search web for the latest results as of date {now_str} using google search tool.\
Output {HOW_MANY_SEARCHES} terms to query for."

In [None]:
from pydantic import BaseModel, Field
from agents import Agent,ModelSettings

class WebSearchItem(BaseModel):
    reason: str = Field(description="Your reasoning for why this search is important to the query.")
    query: str = Field(description="The search term to use for the web search.")


class WebSearchPlan(BaseModel):
    searches: list[WebSearchItem] = Field(description="A list of web searches to perform to best answer the query.")

In [None]:
planner_agent = Agent(
    name="PlannerAgent",
    instructions=PLANNER_AGENT_INSTRUCTIONS,
    model='gpt-4o',
    output_type=WebSearchPlan,
    tools=[google_search]
)

In [None]:
SEARCH_AGENT_INSTRUCTIONS = f"You are a research assistant. Given a search term, you search the web using google_search tool for that term, \
    get the latest results as on date {now_str} and produce a concise summary of the results. The summary must 2-3 paragraphs and less than 300 \
    words. Capture the main points. Write succintly, no need to have complete sentences or good grammar. This will be consumed by someone synthesizing a report, \
    so its vital you capture the essence and ignore any fluff. Do not include any additional commentary other than the summary itself."


In [None]:
search_agent = Agent(
    name="Search agent",
    instructions=SEARCH_AGENT_INSTRUCTIONS,
    tools=[google_search],
    model='gpt-4o'
)

In [None]:
WRITER_AGENT_INSTRUCTIONS = (
    "You are a senior researcher tasked with writing a cohesive report for a research query. "
    "You will be provided with the original query, and some initial research done by a research assistant.\n"
    "You should first come up with an outline for the report that describes the structure and "
    "flow of the report. Then, generate the report and return that as your final output.\n"
    "The final output should be in markdown format, and it should be lengthy and detailed. Aim "
    "for 5-10 pages of content, at least 1000 words."
)

In [None]:
class ReportData(BaseModel):
    short_summary: str = Field(description="A short 2-3 sentence summary of the findings.")

    markdown_report: str = Field(description="The final report")

    follow_up_questions: list[str] = Field(description="Suggested topics to research further")

In [None]:
writer_agent = Agent(
    name="WriterAgent",
    instructions=WRITER_AGENT_INSTRUCTIONS,
    model='gpt-4o',
    output_type=ReportData,
)

## Lets Stitch things together

In [None]:
async def plan_searches(query: str) -> WebSearchPlan:
    """ Plan the searches to perform for the query """
    print("Planning searches...")
    result = await Runner.run(
        planner_agent,
        f"Query: {query}",
        
    )
    print(f"Will perform {len(result.final_output.searches)} searches")
    return result.final_output_as(WebSearchPlan)

In [None]:
async def search(item: WebSearchItem) -> str | None:
    """ Perform a search for the query """
    input = f"Search term: {item.query}\nReason for searching: {item.reason}"
    try:
        result = await Runner.run(
            search_agent,
            input,
        )
        return str(result.final_output)
    except Exception:
        return None

In [None]:
async def perform_searches(search_plan: WebSearchPlan) -> list[str]:
    """ Perform the searches to perform for the query """
    print("Searching...")
    num_completed = 0
    tasks = [asyncio.create_task(search(item)) for item in search_plan.searches]
    results = []
    for task in asyncio.as_completed(tasks):
        result = await task
        if result is not None:
            results.append(result)
        num_completed += 1
        print(f"Searching... {num_completed}/{len(tasks)} completed")
    print("Finished searching")
    return results

In [None]:
async def write_report(query: str, search_results: list[str]) -> ReportData:
    """ Write the report for the query """
    print("Thinking about report...")
    input = f"Original query: {query}\nSummarized search results: {search_results}"
    result = await Runner.run(
        writer_agent,
        input,
    )

    print("Finished writing report")
    return result.final_output_as(ReportData)

In [None]:
async def send_email(report: ReportData, email: str) -> None:
    print("Writing email...")
    input = f"Email this report: \n\n{report.markdown_report} to {email}"
    result = await Runner.run(
        email_agent,
        input,
    )
    print("Email sent")
    return result

In [None]:
query = "Best Agentic AI frameworks in 2025"
search_plan = await plan_searches(query)
search_results = await perform_searches(search_plan)
report = await write_report(query, search_results)
result = await send_email(report, SELF_COPY_EMAIL)
print(result.final_output)