# Aysnc Mastermind - Designinig End to End Rsearch Pipelines


## Objective 

 * Build and end to end autonomous research pipeline: 
    1. Aysnchronous parallel search for speed 
    2. Structured planning and reasoning 
    3. Markdown reporting
    4. Final results sent via HTML email



<img src="../End to end research pipelines.png" width="400" height="1200" />

In [2]:
from agents import function_tool
from sendgrid.helpers.mail import Email, To, Content, Mail
import sendgrid, os

# Email Sender Tool
@function_tool
def send_email(subject: str, html_body: str):
     """Send an HTML email with subject and body to the recipient list."""
     sg = sendgrid.SendGridAPIClient(api_key=os.environ.get("SENDGRID_API_KEY"))
     sender = Email("*******@gmail.com")
     recipient = To("*******@icloud.com")
     content = Content("text/html", html_body)
     email = Mail(sender, recipient, subject, content).get()
     response = sg.client.mail.send.post(request_body=email)
     return {"status": "Success" , "code": response.status_code}

In [None]:
###  This agent takes a full research report, converts it into styled HTML, and triggers send_email.

# Email Agent
INSTRUCTIONS = """You are able to send a nicely formatted HTML email based on a detailed report.
You will be provided with a detailed report. You should use your tool to send one email, providing the 
report converted into clean, well presented HTML with an appropriate subject line."""

email_agent = Agent(
    name="EmailAgent",
    instructions=INSTRUCTIONS,
    tools=[send_email],
    model="gpt-4o-mini",
)

In [None]:
## This agent takes results from all search tasks and writes a long-form, structured markdown report.

class ReportData(BaseModel):
    short_summary: str
    """A short 2-3 sentence summary of the findings."""

    markdown_report: str
    """The final report"""

    follow_up_questions: list[str]
    """Suggested topics to research further"""

# Report Generator Agent
INSTRUCTIONS = (
    "You are a senior researcher tasked with writing a cohesive report for a research query. "
    "You will be provided with the original query, and some initial research done by a research assistant.\n"
    "You should first come up with an outline for the report that describes the structure and "
    "flow of the report. Then, generate the report and return that as your final output.\n"
    "The final output should be in markdown format, and it should be lengthy and detailed. Aim "
    "for 5â€“10 pages of content, at least 1000 words."
)

writer_agent = Agent(
    name="WriterAgent",
    instructions=INSTRUCTIONS,
    model="gpt-4o-mini",
    output_type=ReportData
)

# Async Pipeline Functions

In [None]:
## First generate a reasoned plan before searching.
import asyncio
async def search_planner(query: str):
    """Use planner_agent to generate a structured list of web searches for a given research query."""
    print("Planning for searches...")
    result = await Runner.run(search_planner_agent, query)
    print(f"Will perform {len(result.final_output.searches)} searches")
    return result.final_output

## Function to Perform WebSearch : OpenAI WebSearch Tool
async def web_search(search_task: SearchTask):
    """Run a single web search using the search_agent for a given query and reason."""
    input = f"Search term: {search_task.query}\nReason for searching: {search_task.reason}"
    result = await Runner.run(web_search_agent, input)
    return result.final_output

## Running web_search in parallel
async def perform_parallel_searches(search_plan: SearchPlan):
    """Perform web searches in parallel using asyncio to speed up data collection."""
    print("Perform Parallel Searching...")
    tasks = [asyncio.create_task(web_search(task)) for task in search_plan.searches]
    results = await asyncio.gather(*tasks)
    print("Finished searching !!!")
    return results

In [None]:
## Generate Report from Search Results
async def generate_detailed_report(user_query: str, search_summaries: list[str]):
    """Use the WriterAgent to compile a structured, markdown-based research report."""
    print("Thinking deeply about the report...")
    prompt_input = f"Original query: {user_query}\nSummarized search results: {search_summaries}"
    result = await Runner.run(writer_agent, prompt_input)
    print("Finished writing the report!")
    return result.final_output

## Email the Final Report
async def dispatch_report_via_email(report: ReportData):
    """Use the EmailAgent to send the formatted report via HTML email."""
    print("Preparing email...")
    result = await Runner.run(email_agent, report.markdown_report)
    print("Email sent successfully!")
    return report

In [None]:
 Final Pipeline Execution

# Research Query
research_query = "Comparison of growth vs value investing strategies."

# Track the entire flow
with trace("Financial Research trace"):
    print("Starting full research pipeline...")

    # Step 1: Plan web searches
    search_plan = await search_planner(research_query)

    # Step 2: Perform all searches asynchronously
    search_results = await perform_parallel_searches(search_plan)

    # Step 3: Generate structured markdown report
    final_report = await generate_detailed_report(research_query, search_results)

    # Step 4: Email the report as HTML
    await dispatch_report_via_email(final_report)

    print("ðŸŽ‰ Research workflow completed! Check your inbox.")