# Multi-Agent Scheduler System

- Author: [Ilgyun Jeong](https://github.com/johnny9210)
- Design: 
- Peer Review: [Mark()](https://github.com/obov), [Taylor(Jihyun Kim)](https://github.com/Taylor0819)
- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)


[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/19-Cookbook/03-MultiAgentSystem/01-MultiAgentScheduler.ipynb) [![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/19-Cookbook/03-MultiAgentSystem/01-MultiAgentScheduler.ipynb)


## Overview

The Multi-Agent Scheduler System represents an innovative approach to automating information retrieval and delivery through a coordinated network of specialized AI agents. At its core, this system transforms simple natural language requests into scheduled, automated search and delivery operations, making it particularly valuable for researchers, analysts, and anyone needing regular, scheduled information updates.

Imagine asking "Find me the latest RAG papers at 7 AM tomorrow." Instead of manually searching and compiling information early in the morning, the system automatically handles the entire process - from understanding your request to delivering a well-formatted email with relevant research papers at precisely 7 AM. This automation eliminates the need for manual intervention while ensuring timely delivery of crucial information.

### System Architecture

The system's architecture is built around five specialized agents, each handling a crucial aspect of the information retrieval and delivery process:

1. `Query Analysis Agent`
   This agent serves as the system's front door, interpreting natural language queries to extract critical information 

2. `Search Router`
   Acting as the system's traffic controller, the Search Router directs queries to the most appropriate specialized search agent:
   
3. `Response Agent`
   This agent transforms raw search results into well-structured, readable content by:

4. `Scheduling System and Email Service`
   The scheduling component manages the temporal aspects of the system:
   This ensures that all operations occur at their specified times without conflicts.
   The system implements a robust email delivery service using yagmail that provides:

### System Flow

The entire process follows this sequence:

![Multi-Agent Scheduler System Flow](assets/21-Multi-AgentSchedulerSystem.png)

This architecture ensures reliable, automated information retrieval and delivery, with each agent optimized for its specific role in the process.

### Table of Contents
- [Overview](#overview)
- [System Architecture](#system-architecture)
- [Environment Setup](#environment-setup)
- [Query Analysis Agent](#query-analysis-agent)
- [Search Router and Specialized Agents](#Search-Router-and-Specialized-Agents)
- [Response Agent](#response-agent)
- [Scheduling System and Email Service](#Scheduling-System-and-Email-Service)  

The system's modular design allows for easy expansion and customization, making it adaptable to various use cases while maintaining consistent performance and reliability. Whether you're tracking research papers, monitoring news, or gathering general information, the Multi-Agent Scheduler System automates the entire process from query to delivery, saving time and ensuring consistent, timely access to important information.

### References
- [How to get NewsAPI](https://newsapi.org)
- [How to get SerpAPI](https://serpapi.com/)
- [How to get Google password](https://support.google.com/accounts/answer/185833?visit_id=638745290390245053-2925662375&p=InvalidSecondFactor&rd=1)

## Environment Setup

Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.

**[Note]**

- `langchain-opentutorial` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials.
- You can checkout the [`langchain-opentutorial`](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details.


In [1]:
%%capture --no-stderr
%pip install langchain-opentutorial

In [2]:
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langsmith",
        "langchain",
        "chromadb",
        "langchain_chroma",
        "langchain_openai",
        "langchain_community",
        "pytz",
        "google-search-results",
        "yagmail",
        "schedule",
    ],
    verbose=False,
    upgrade=False,
)


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "LANGCHAIN_API_KEY": "",
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "21-Multi-AgentSchedulerSystem",
        "NEWS_API_KEY": "",
        "SERPAPI_API_KEY": "",
    }
)

Environment variables have been set successfully.


In [4]:
from dotenv import load_dotenv

load_dotenv(override=True)

True

### Query Analysis Agent
The QueryAnalysisAgent serves as the initial interpreter in our multi-agent scheduler system, transforming natural language queries into structured data that other agents can process. This agent employs advanced language understanding capabilities through GPT-4 to accurately parse user intentions and timing requirements.

### Core Components
The agent is built around three essential classes:
- Time Extraction Processor: Handles temporal information
- Task Analysis Engine: Understands search requirements
- Query Coordinator: Combines and validates results


### Core Functionality
The agent performs two primary functions:

1. Time Extraction
```python
def extract_time(self, query: str) -> datetime:
    """Extracts time information from queries"""
    time_extraction_chain = self.time_extraction_prompt | self.llm
    time_str = time_extraction_chain.invoke({"query": query})
    # Converts to standardized datetime format
    return self._process_time(time_str)
```

2. Task Analysis
```python
def analyze_task(self, query: str) -> dict:
    """Analyzes query content for search parameters"""
    task_analysis_chain = self.task_analysis_prompt | self.llm
    response = task_analysis_chain.invoke({"query": query})
    return self._parse_response(response)
```

### Usage Example

```python
# Initialize agent
agent = QueryAnalysisAgent()

# Process a sample query
query = "Find RAG papers tomorrow at 9 AM"  # Changed from "내일 오전 9시에 RAG 논문 찾아줘"
result = agent.analyze_query(query)

Expected output:
```json
{
    "target_time": "2025-02-06 09:00:00+0000",
    "execution_time": "2025-02-06 08:55:00+0000",
    "task_type": "search",
    "search_type": "research_paper",
    "keywords": ["RAG", "papers"],  # Changed from ["RAG", "논문"]
    "requirements": "minimum 5 results",
    "time_sensitivity": "normal",
    "original_query": "Find RAG papers tomorrow at 9 AM",  # Changed from Korean query
    "status": "success"
}
```

This structured approach ensures reliable query interpretation while maintaining flexibility for various query types and formats.

Install and Import Required Libraries

In [5]:
# Cell 1: Importing Required Libraries
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from datetime import datetime, timedelta
import pytz
import json
import os

The QueryAnalysisAgent class represents a specialized natural language query processor that utilizes OpenAI's language models. Let's break down its core components:
The initialization method sets up the language model with temperature=0 to ensure consistent, deterministic responses:

The setup_prompt_templates method defines two essential templates:

1. Time Extraction Template
This template focuses solely on extracting and standardizing time information from queries.

2. Task Analysis Template
This template structures the query analysis with specific rules:

- Categorizes searches into three types: research_paper, news, or general
- Distinguishes between normal and urgent time sensitivity
- Separates search keywords from temporal terms
- Maintains consistent task typing as "search"

These templates work together to transform natural language queries into structured, actionable data that the rest of the system can process efficiently. The clear separation between time extraction and task analysis allows for precise handling of each aspect of the query.

For example, a query like "아침 7시에 RAG 논문 찾아줘" would be processed to extract both the time (07:00) and the search parameters (research papers about RAG), while filtering out temporal terms from the actual search keywords.

In [6]:
# Cell 2: Class Definition and __init__, setup_prompt_templates Methods
class QueryAnalysisAgent:
    def __init__(self, model_name="gpt-4o"):
        self.llm = ChatOpenAI(model_name=model_name, temperature=0)
        self.setup_prompt_templates()

    def setup_prompt_templates(self):
        self.time_extraction_prompt = PromptTemplate.from_template(
            "Extract time and convert to 24h format from: {query}\nReturn HH:MM only"
        )

        self.task_analysis_prompt = PromptTemplate.from_template(
            """Analyze the query and return only a JSON object. For the query: {query}

        Return this exact format:
        {{
            "task_type": "search",
            "search_type": "research_paper",
            "keywords": ["rag", "papers"],
            "requirements": "minimum 5 results",
            "time_sensitivity": "normal",
            "search_terms": ["rag", "papers"]  # Actual keywords to use for search
        }}

        Rules:
        - search_type must be one of: "research_paper", "news", "general"
        - time_sensitivity must be one of: "normal", "urgent"
        - keywords should include all words from query including time-related terms
        - search_terms should exclude time-related terms and only include actual search keywords
        - task_type should always be "search"
        """
        )

Core Methods 1

`extract_time()`
- Functionality: Extracts and processes time information from natural language queries
- Features:
  - Converts various time formats (e.g., "아침 7시", "오후 3:30") to standardized datetime objects
  - Maintains timezone awareness using pytz for accurate scheduling
  - Automatically schedules for next day if requested time has already passed
  - Strips unnecessary time components (seconds, microseconds) for cleaner scheduling
- Error Handling: Raises ValueError with detailed error messages for invalid time formats
- Returns: UTC-aware datetime object representing the target execution time

In [7]:
# Cell 3: Adding extract_time Method
def extract_time(self, query: str) -> datetime:
    """Extracts time information from the query and returns a datetime object."""
    time_extraction_chain = self.time_extraction_prompt | self.llm
    time_str = time_extraction_chain.invoke({"query": query})

    try:
        # Extract the actual time string from the ChatCompletion response
        time_str = time_str.content.strip()

        # Calculate the next scheduled time based on the current time
        current_time = datetime.now(pytz.utc)
        hour, minute = map(int, time_str.split(":"))

        target_time = current_time.replace(
            hour=hour, minute=minute, second=0, microsecond=0
        )

        # If the extracted time has already passed, set it for the next day
        if target_time <= current_time:
            target_time += timedelta(days=1)

        return target_time
    except Exception as e:
        raise ValueError(f"Time extraction failed: {e}")


# After executing this cell, the method should be added to the class
QueryAnalysisAgent.extract_time = extract_time

Core Methods 2

`analyze_task()`
- Functionality: Breaks down queries into structured task components
- Features:
  - Identifies search type (research_paper, news, general)
  - Extracts relevant keywords while filtering temporal terms
  - Determines task urgency (normal vs urgent)
  - Identifies specific requirements (e.g., minimum result count)
- Error Handling: Handles JSON parsing errors and invalid query formats
- Returns: Dictionary containing parsed task information and parameters

In [8]:
# Cell 4: Adding analyze_task Method
def analyze_task(self, query: str) -> dict:
    """Extracts task intent and keywords from the query."""
    task_analysis_chain = self.task_analysis_prompt | self.llm
    response = task_analysis_chain.invoke({"query": query})

    try:
        # Clean response content to ensure valid JSON format
        content = response.content.strip()
        content = content.replace("```json", "").replace("```", "")
        return json.loads(content)
    except json.JSONDecodeError as e:
        print(f"Original response: {response.content}")
        raise ValueError(f"Failed to parse task analysis result: {e}")


# Adding the method to the class
QueryAnalysisAgent.analyze_task = analyze_task

Core Methods 3

`analyze_query()`
- Functionality: Combines time extraction and task analysis into a complete query interpretation
- Features:
  - Coordinates between time extraction and task analysis
  - Sets execution time 5 minutes before target time
  - Validates and combines all query components
- Error Handling: Catches and reports errors from both time and task processing
- Returns: Combined dictionary with timing and task information

In [9]:
def analyze_query(self, query: str) -> dict:
    """Performs a full query analysis and returns the results.

    Args:
        query (str): The user query to analyze.

    Returns:
        dict: A dictionary containing the analysis results.
    """
    try:
        # Extract time information
        target_time = self.extract_time(query)

        # Analyze task information
        task_info = self.analyze_task(query)

        # Return the results including all necessary details
        return {
            "target_time": target_time,
            "execution_time": target_time - timedelta(minutes=5),
            "task_type": task_info["task_type"],
            "search_type": task_info["search_type"],  # Newly added
            "keywords": task_info["keywords"],
            "requirements": task_info["requirements"],
            "time_sensitivity": task_info["time_sensitivity"],  # Newly added
            "original_query": query,
            "status": "success",
        }
    except Exception as e:
        return {"status": "error", "error_message": str(e), "original_query": query}


# Adding the method to the class
QueryAnalysisAgent.analyze_query = analyze_query

Core Methods 4

`datetime_handler(obj)`
* Functionality: Converts datetime objects to JSON-serializable string format
* Features:
   * Accepts any object and checks if it's a datetime instance
   * Converts datetime to standardized string format (YYYY-MM-DD HH:MM:SS+ZZZZ)
   * Maintains timezone information in the output string
* Error Handling: Raises TypeError with descriptive message for non-datetime objects
* Returns: String representation of datetime in consistent format
* Use Cases:
   * JSON serialization for API responses
   * Database storage of temporal data
   * Logging and debugging timestamp formatting
* Examples:
   * Input: `datetime(2024, 2, 6, 15, 30, tzinfo=pytz.UTC)`
   * Output: `"2024-02-06 15:30:00+0000"`

The function serves as a critical utility for converting Python's datetime objects into a standardized string format that can be easily stored, transmitted, and later reconstructed. This is particularly important in our scheduling system where accurate time representation and timezone awareness are essential for reliable task execution.

In [10]:
# Cell 6: Testing QueryAnalysisAgent
def datetime_handler(obj):
    """Handler to convert datetime objects into JSON serializable strings.

    Args:
        obj: The object to convert.

    Returns:
        str: A formatted datetime string (Format: YYYY-MM-DD HH:MM:SS+ZZZZ).

    Raises:
        TypeError: Raised if the object is not a datetime instance.
    """
    if isinstance(obj, datetime):
        return obj.strftime("%Y-%m-%d %H:%M:%S%z")
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")

Test

In [11]:
def run_query_test():
    """Runs a test for QueryAnalysisAgent and returns the results.

    Returns:
        str: The analysis result in JSON format.
    """
    # Create an instance of QueryAnalysisAgent
    agent = QueryAnalysisAgent()

    # Test query
    test_query = "Find and recommend news related to RAG at 7 AM."

    # Execute query analysis
    result = agent.analyze_query(test_query)

    # Convert the result to JSON format
    return json.dumps(result, indent=2, ensure_ascii=False, default=datetime_handler)


# Execute test and print results
if __name__ == "__main__":
    test_result = run_query_test()
    print("\n=== QueryAnalysisAgent Test Result ===")
    print(test_result)

Failed to multipart ingest runs: langsmith.utils.LangSmithAuthError: Authentication failed for https://api.smith.langchain.com/runs/multipart. HTTPError('401 Client Error: Unauthorized for url: https://api.smith.langchain.com/runs/multipart', '{"detail":"Invalid token"}')trace=7ecfe457-b9e4-4653-819e-3adb9473d2c5,id=7ecfe457-b9e4-4653-819e-3adb9473d2c5; trace=7ecfe457-b9e4-4653-819e-3adb9473d2c5,id=e61dafec-908c-48f9-a7fb-c19a038d24e8; trace=7ecfe457-b9e4-4653-819e-3adb9473d2c5,id=d1500e6f-541a-4920-896c-203d4fe745c8
Failed to multipart ingest runs: langsmith.utils.LangSmithAuthError: Authentication failed for https://api.smith.langchain.com/runs/multipart. HTTPError('401 Client Error: Unauthorized for url: https://api.smith.langchain.com/runs/multipart', '{"detail":"Invalid token"}')trace=b73e5269-54b3-4702-b8cb-479d0bcac8a8,id=b73e5269-54b3-4702-b8cb-479d0bcac8a8; trace=b73e5269-54b3-4702-b8cb-479d0bcac8a8,id=ca5225c3-c44f-45f9-be48-fe208bbb1065; trace=b73e5269-54b3-4702-b8cb-479d0bc


=== QueryAnalysisAgent Test Result ===
{
  "target_time": "2025-02-08 07:00:00+0000",
  "execution_time": "2025-02-08 06:55:00+0000",
  "task_type": "search",
  "search_type": "news",
  "keywords": [
    "rag",
    "7 AM"
  ],
  "requirements": "minimum 5 results",
  "time_sensitivity": "normal",
  "original_query": "Find and recommend news related to RAG at 7 AM.",
  "status": "success"
}


### Search Router and Specialized Agents

The Search Router system acts as an intelligent traffic controller, directing queries to the most appropriate specialized search agent based on the query analysis. This architecture ensures that each type of search request is handled by an agent specifically optimized for that domain.

### Core Components

```python
class SearchRouter:
    def __init__(self):
        # Initialize specialized search agents
        self.paper_search_agent = PaperSearchAgent()
        self.news_search_agent = NewsSearchAgent()
        self.general_search_agent = GeneralSearchAgent()
```
Each specialized agent is designed to handle specific types of searches:

1. `Paper Search Agent`
This agent specializes in academic paper searches, interfacing with arXiv's API to retrieve scholarly articles and research papers.

2. `News Search Agent`
This agent handles news-related searches, connecting to NewsAPI to gather current events and news articles.

3. `General Search Agent`
This agent manages general web searches using SerpAPI, handling broader information gathering needs.

This routing system ensures that each query is handled by the most appropriate agent while maintaining consistent error handling and result formatting across all search types. The modular design allows for easy addition of new specialized agents as needed, making the system highly extensible and maintainable.

Each agent provides standardized outputs despite their different data sources and search methodologies, enabling seamless integration with the rest of the system components.

`PaperSearchAgent`

This agent focuses on academic content retrieval. It interfaces with the arXiv API to fetch scholarly papers and research documents. Key features include filtering papers by relevance, date ranges, and processing XML responses into structured data. The agent is particularly useful for researchers and academics needing current papers in their field.

In [12]:
import urllib.request
import xml.etree.ElementTree as ET
from typing import Dict, Any, List
from datetime import datetime


class PaperSearchAgent:
    def __init__(self):
        self.base_url = "http://export.arxiv.org/api/query"

    def perform_search(self, query_info: Dict[str, Any]) -> Dict[str, Any]:
        try:
            keywords = self._process_keywords(query_info["keywords"])
            max_results = self._extract_max_results(query_info.get("requirements", ""))

            url = f"{self.base_url}?search_query=all:{keywords}&start=0&max_results={max_results}"
            response = urllib.request.urlopen(url)
            data = response.read().decode("utf-8")

            results = self._parse_arxiv_results(data)

            return {
                "status": "success",
                "results": results,
                "total_found": len(results),
                "returned_count": len(results),
                "query_info": query_info,
            }
        except Exception as e:
            return {
                "status": "error",
                "error_message": str(e),
                "query_info": query_info,
            }

    def _process_keywords(self, keywords: List[str]) -> str:
        # Remove time-related keywords
        filtered_keywords = [
            k
            for k in keywords
            if not any(
                time in k.lower()
                for time in ["hour", "morning", "afternoon", "evening"]
            )
        ]
        return "+".join(filtered_keywords)

    def _extract_max_results(self, requirements: str) -> int:
        import re

        # extracting numbers
        numbers = re.findall(r"\d+", requirements)
        return int(numbers[0]) if numbers else 5

    def _parse_arxiv_results(self, data: str) -> List[Dict[str, Any]]:
        root = ET.fromstring(data)
        results = []

        for entry in root.findall("{http://www.w3.org/2005/Atom}entry"):
            title = entry.find("{http://www.w3.org/2005/Atom}title").text
            url = entry.find("{http://www.w3.org/2005/Atom}id").text
            published = entry.find("{http://www.w3.org/2005/Atom}published").text
            summary = entry.find("{http://www.w3.org/2005/Atom}summary").text

            results.append(
                {
                    "type": "research_paper",
                    "title": title,
                    "url": url,
                    "published_date": published[:10],
                    "summary": summary,
                    "source": "arxiv",
                }
            )

        return results

`NewsSearchAgent`

This agent handles current events and news article searches. It connects to NewsAPI to access a wide range of news sources. The agent supports features like language filtering, date range specification, and source selection. It's especially valuable for users needing real-time information or tracking specific topics in the news.

In [13]:
import requests
from urllib.parse import urlencode
import os
from datetime import datetime, timedelta
from typing import Dict, Any


class NewsSearchAgent:
    def __init__(self, api_key: str = None):
        """Initializes a news search agent using NewsAPI.

        NewsAPI follows a REST API structure with the base URL 'https://newsapi.org/v2'.
        It provides two main endpoints:
        - /everything: Searches the entire news archive.
        - /top-headlines: Retrieves the latest top headlines.
        """

        self.api_key = os.environ["NEWS_API_KEY"]

        if not self.api_key:
            raise ValueError("NewsAPI key is required")

        self.base_url = "https://newsapi.org/v2"

    def perform_search(
        self, query_info: Dict[str, Any], max_results: int = 5
    ) -> Dict[str, Any]:
        """Performs a news search based on the given query information.

        Args:
            query_info (Dict[str, Any]): Dictionary containing search parameters.
            max_results (int): Maximum number of results to return.

        Returns:
            Dict[str, Any]: A dictionary containing search results or an error message.
        """
        try:
            # Extract actual search terms (excluding time-related keywords)
            search_terms = query_info.get(
                "search_terms", query_info.get("keywords", [])
            )

            # Check if the search is for real-time news
            is_realtime = query_info.get("time_sensitivity") == "urgent"
            from_date = datetime.now() - timedelta(hours=1 if is_realtime else 24)

            # Configure parameters for the 'everything' endpoint
            params = {
                "q": " ".join(search_terms),  # Exclude time-related keywords
                "from": from_date.strftime("%Y-%m-%d"),
                "sortBy": "publishedAt",
                "language": "en",
                "apiKey": self.api_key,
            }

            # Construct API request URL
            url = f"{self.base_url}/everything?{urlencode(params)}"

            # Send API request
            response = requests.get(url)
            data = response.json()

            # Check response status
            if response.status_code != 200:
                return {
                    "status": "error",
                    "error_message": data.get("message", "Unknown error"),
                    "query_info": query_info,
                }

            # Process and format results
            articles = data.get("articles", [])
            formatted_results = []

            for article in articles[:max_results]:
                formatted_results.append(
                    {
                        "title": article.get("title"),
                        "description": article.get("description"),
                        "url": article.get("url"),
                        "published_at": article.get("publishedAt"),
                        "source": article.get("source", {}).get("name"),
                        "content": article.get("content"),
                    }
                )

            return {
                "status": "success",
                "results": formatted_results,
                "total_results": data.get("totalResults", 0),
                "returned_count": len(formatted_results),
                "search_parameters": {
                    "keywords": query_info["keywords"],
                    "from_date": from_date.strftime("%Y-%m-%d"),
                    "language": "en",
                },
            }

        except Exception as e:
            return {
                "status": "error",
                "error_message": str(e),
                "query_info": query_info,
            }

    def get_top_headlines(
        self, country: str = "us", category: str = None
    ) -> Dict[str, Any]:
        """Fetches top news headlines.

        Args:
            country (str): Country code (default: 'us' for the United States).
            category (str, optional): News category (e.g., business, technology).

        Returns:
            Dict[str, Any]: A dictionary containing top headlines.
        """
        params = {"country": country, "apiKey": self.api_key}

        if category:
            params["category"] = category

        url = f"{self.base_url}/top-headlines?{urlencode(params)}"
        response = requests.get(url)

        return response.json()

`GeneralSearchAgent`

This agent manages broader web searches through SerpAPI. It handles diverse information needs that don't fit strictly into academic or news categories. The agent includes features like language-specific searches, result ranking, and content type filtering. It's particularly useful for general research, product information, or any broad information gathering needs.

In [14]:
from langchain_community.utilities import SerpAPIWrapper


class GeneralSearchAgent:
    def __init__(self, serpapi_key: str = None):
        if serpapi_key:
            os.environ["SERPAPI_API_KEY"] = serpapi_key
        self.search = SerpAPIWrapper()

    def setup_search_parameters(self, query_info: Dict[str, Any]) -> List[str]:
        """Constructs search queries for general search."""
        keywords = " ".join(query_info["keywords"])

        # Set up base search queries
        search_queries = [
            f"{keywords} lang:ko",
            keywords,
        ]  # Korean results  # General search

        return search_queries

    def perform_search(
        self, query_info: Dict[str, Any], max_results: int = 5
    ) -> Dict[str, Any]:
        """Performs general search and returns results."""
        try:
            search_queries = self.setup_search_parameters(query_info)
            all_results = []

            for query in search_queries:
                raw_results = self.search.run(query)
                parsed_results = self._parse_general_results(raw_results)
                all_results.extend(parsed_results)

            # Sort by relevance score
            sorted_results = sorted(
                all_results, key=lambda x: x.get("relevance_score", 0), reverse=True
            )[:max_results]

            return {
                "status": "success",
                "results": sorted_results,
                "total_found": len(all_results),
                "returned_count": len(sorted_results),
                "query_info": query_info,
            }

        except Exception as e:
            return {
                "status": "error",
                "error_message": str(e),
                "query_info": query_info,
            }

    def _parse_general_results(self, raw_results: str) -> List[Dict[str, Any]]:
        """Parses general search results."""
        parsed_results = []

        for result in raw_results.split("\n"):
            if not result.strip():
                continue

            parsed_results.append(
                {
                    "type": "general",
                    "title": self._extract_title(result),
                    "content": result,
                    "url": self._extract_url(result),
                    "relevance_score": self._calculate_relevance(result),
                }
            )

        return parsed_results

    def _extract_title(self, result: str) -> str:
        """Extracts title from the result."""
        return result.split(".")[0].strip()[:100]

    def _extract_url(self, result: str) -> str:
        """Extracts URL from the result."""
        import re

        urls = re.findall(r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[^\s]*", result)
        return urls[0] if urls else ""

    def _calculate_relevance(self, result: str) -> float:
        """Calculates relevance score for the search result."""
        relevance_score = 0.5  # Base score

        # Calculate score based on keyword matching
        keywords = ["official", "guide", "tutorial", "review", "recommendation"]
        lower_result = result.lower()

        for keyword in keywords:
            if keyword in lower_result:
                relevance_score += 0.1

        return min(1.0, relevance_score)

`SearchRouter`: The System's Traffic Controller

The `SearchRouter` acts as the central coordinator for our multi-agent search system, intelligently directing queries to specialized search agents based on the type of information needed. Think of it as an expert traffic controller at a busy airport, making sure each "flight" (query) goes to the right "runway" (search agent).

The `SearchRouter`'s modular design allows for easy expansion - new specialized search agents can be added without modifying the existing code, making the system highly adaptable to evolving search needs.

Through this central coordination, the `SearchRouter` ensures efficient and reliable information retrieval across different types of searches while maintaining a consistent interface for the rest of the system.

In [15]:
class SearchRouter:
    def __init__(self):
        # Get API key directly or from environment variables
        # Initialize agents for each search type
        self.paper_search_agent = PaperSearchAgent()
        self.news_search_agent = NewsSearchAgent()
        self.general_search_agent = GeneralSearchAgent()

    def route_and_search(self, query_analysis: dict) -> dict:
        """Routes the search request to appropriate search agent based on query analysis

        Args:
            query_analysis (dict): Query analysis results from QueryAnalysisAgent

        Returns:
            dict: Dictionary containing search results, including success/failure status and related info
        """
        try:
            # Check search type
            search_type = query_analysis.get("search_type")

            # Record start time for logging
            start_time = datetime.now(pytz.utc)

            # Perform search
            if search_type == "research_paper":
                print("Performing research paper search...")
                result = self.paper_search_agent.perform_search(query_analysis)
            elif search_type == "news":
                print("Performing news search...")
                result = self.news_search_agent.perform_search(query_analysis)
            elif search_type == "general":
                print("Performing general search...")
                result = self.general_search_agent.perform_search(query_analysis)
            else:
                return {
                    "status": "error",
                    "error_message": f"Unsupported search type: {search_type}",
                    "original_query": query_analysis.get("original_query"),
                }

            # Calculate search duration
            end_time = datetime.now(pytz.utc)
            search_duration = (end_time - start_time).total_seconds()

            # Add metadata to results
            result.update(
                {
                    "search_type": search_type,
                    "search_duration": search_duration,
                    "search_timestamp": end_time.isoformat(),
                    "original_query": query_analysis.get("original_query"),
                }
            )

            return result

        except Exception as e:
            return {
                "status": "error",
                "error_message": str(e),
                "original_query": query_analysis.get("original_query"),
                "search_type": query_analysis.get("search_type"),
            }

    # def get_agent_status(self) -> dict:
    #     """Check the status of each search agent.

    #     Returns:
    #         dict: Dictionary containing status information for each agent
    #     """
    #     return {
    #         "paper_search_agent": "ready",
    #         "news_search_agent": "ready",
    #         "general_search_agent": "ready",
    #         "last_checked": datetime.now(pytz.utc).isoformat(),
    #     }

### Response Agent
Crafting User-Friendly Information Delivery

The ResponseAgent serves as our system's expert communicator, transforming raw search results into well-structured, readable content that meets users' needs. This agent is particularly crucial as it represents the final step in our information delivery pipeline, ensuring that complex search results are presented in a clear, digestible format.

The agent maintains three specialized prompt templates for different types of content:

Key Features of the Response Agent:

1. Content Customization
   - Adapts formatting based on content type (papers, news, general)
   - Maintains consistent structure while accommodating different information types
   - Ensures appropriate context and explanations are included

2. Email Optimization
   - Creates clear, professional email subjects
   - Structures content for easy scanning and reading
   - Includes all necessary context and source information

The ResponseAgent represents the crucial final step in our information delivery pipeline, ensuring that users receive not just raw data, but well-organized, contextually relevant information that directly addresses their queries. Through its careful formatting and organization, it helps users quickly understand and act upon the information they've requested.

This agent demonstrates how automated systems can maintain a human touch in their communications, making complex information accessible and actionable for end users.

In [16]:
class ResponseAgent:
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o")
        self.setup_prompts()

    def setup_prompts(self):
        # Research paper search prompt
        self.paper_prompt = PromptTemplate.from_template(
            """Please organize the following research paper search results in email format.
            Search term: {query}
            Search results: {results}
            
            Format as follows:
            1. Email subject: "Research Paper Search Results for [search term]"
            2. Body:
               - Greeting
               - "Here are the organized research paper results for your search."
               - Number each paper and format as follows:
                 1. Paper title: [title]
                    - Summary: [core research content and key findings]
                    - URL: [link]
               - Closing remarks
            """
        )

        # News search prompt
        self.news_prompt = PromptTemplate.from_template(
            """Please organize the following news search results in email format.
            Search term: {query}
            Search results: {results}
            
            Format as follows:
            1. Email subject: "Latest News Updates for [search term]"
            2. Body:
               - Greeting
               - "Here are the latest news articles related to your search topic."
               - Number each news item and format as follows:
                 1. [title] - [news source]
                    - Main content: [key content summary]
                    - Published date: [date]
                    - URL: [link]
               - Closing remarks
            """
        )

        # General search prompt
        self.general_prompt = PromptTemplate.from_template(
            """Please organize the following search results in email format.
            Search term: {query}
            Search results: {results}
            
            Format as follows:
            1. Email subject: "Search Results for [search term]"
            2. Body:
               - Greeting
               - "Here are the organized results for your search."
               - Number each result and format as follows:
                 1. [title]
                    - Content: [main content summary]
                    - Source: [website or platform name]
                    - URL: [link]
               - Closing remarks
            """
        )

    def format_results(self, search_results):
        try:
            # Handle cases with no results or errors
            if search_results.get("status") == "error":
                return {
                    "subject": "Search Error Notification",
                    "body": f"An error occurred during search: {search_results.get('error_message', 'Unknown error')}",
                }

            # Handle cases with no results
            if not search_results.get("results"):
                return {
                    "subject": "No Search Results",
                    "body": f"No results found for search term '{search_results.get('original_query', '')}'.",
                }

            # Select prompt based on search type
            search_type = search_results.get("search_type", "general")
            if search_type == "research_paper":
                prompt = self.paper_prompt
            elif search_type == "news":
                prompt = self.news_prompt
            else:
                prompt = self.general_prompt

            # Prepare input for result formatting
            formatted_input = {
                "query": search_results.get("original_query", ""),
                "results": json.dumps(
                    search_results.get("results", []), ensure_ascii=False, indent=2
                ),
            }

            # Generate response through LLM
            response = prompt.format(**formatted_input)
            response = self.llm.invoke(response)

            try:
                # Attempt JSON parsing
                return json.loads(response.content)
            except json.JSONDecodeError:
                # Return default format if JSON parsing fails
                return {
                    "subject": f"Search Results for [{formatted_input['query']}]",
                    "body": response.content,
                }

        except Exception as e:
            return {
                "subject": "Result Processing Error",
                "body": f"An error occurred while processing results: {str(e)}\n\nOriginal query: {search_results.get('original_query', '')}",
            }

Test Search System

In [None]:
def test_search_system():
    query_analyzer = QueryAnalysisAgent()
    search_router = SearchRouter()
    response_agent = ResponseAgent()  # Added

    test_queries = [
        "recommend place to eat at seoul in 7 pm",
        "find rag persona paper at 3 pm",
        "find news us president speech in 7 am",
    ]

    for query in test_queries:
        print(f"\n{'='*60}")
        print(f"Test Query: {query}")
        print(f"{'='*60}")

        try:
            query_analysis = query_analyzer.analyze_query(query)
            print("\n1. Query Analysis Results:")
            print(
                json.dumps(
                    query_analysis,
                    indent=2,
                    ensure_ascii=False,
                    default=datetime_handler,
                )
            )

            if query_analysis["status"] != "success":
                print("Query analysis failed!")
                continue

            search_results = search_router.route_and_search(query_analysis)
            print("\n2. Search Results:")
            print(
                json.dumps(
                    search_results,
                    indent=2,
                    ensure_ascii=False,
                    default=datetime_handler,
                )
            )

            # Added: Result formatting
            print("\n3. Formatted Results:")
            formatted_results = response_agent.format_results(search_results)
            print(json.dumps(formatted_results, indent=2, ensure_ascii=False))

        except Exception as e:
            print(f"Error occurred during test: {str(e)}")


if __name__ == "__main__":
    test_search_system()

### Scheduling System and Email Service

The `ScheduledSearchSystem` manages the complete lifecycle of search tasks, from scheduling to result delivery. Here's its core structure and functionality:

### Key Components

1. Collection Management
2. Task Scheduling
3. Search Execution
4. Email Delivery


The system uses threading for non-blocking operation and includes comprehensive logging for monitoring task progress and debugging issues.

In [18]:
import schedule
import time
import yagmail
import chromadb
from chromadb.config import Settings
import threading
import time
from queue import Queue


class ScheduledSearchSystem:
    def __init__(self, email_config: Dict[str, Any]):
        print(f"\n[{self._get_current_time()}] Initializing system...")
        self.query_analyzer = QueryAnalysisAgent()
        self.search_router = SearchRouter()
        self.response_agent = ResponseAgent()
        self.client = chromadb.PersistentClient(path="./search_data")

        # Email configuration
        self.email_config = email_config
        self.yag = yagmail.SMTP(email_config["username"], email_config["password"])
        print(f"[{self._get_current_time()}] Email client configuration complete")

        self.scheduled_tasks = {}
        self.setup_collections()

        # Add completion flag
        self.is_completed = False
        self.completion_event = threading.Event()

        # Start scheduler
        self.scheduler_thread = threading.Thread(target=self._run_scheduler)
        self.scheduler_thread.daemon = True
        self.scheduler_thread.start()
        print(f"[{self._get_current_time()}] System initialization complete\n")

    def _get_current_time(self):
        """Return current time as string"""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def setup_collections(self):
        """Set up ChromaDB collections"""
        print(f"[{self._get_current_time()}] Starting ChromaDB collection setup...")
        self.results_collection = self.client.get_or_create_collection(
            name="search_results", metadata={"description": "Raw search results"}
        )

        self.formatted_collection = self.client.get_or_create_collection(
            name="formatted_responses",
            metadata={"description": "Formatted responses for email delivery"},
        )

        self.schedule_collection = self.client.get_or_create_collection(
            name="scheduled_tasks",
            metadata={"description": "Scheduled search and email tasks"},
        )
        print(f"[{self._get_current_time()}] ChromaDB collection setup complete")

    def schedule_task(self, query: str, user_email: str) -> Dict[str, Any]:
        """Schedule a task"""
        try:
            print(f"\n[{self._get_current_time()}] Starting new task scheduling...")
            print(f"Query: {query}")
            print(f"Email: {user_email}")

            # Query analysis
            query_analysis = self.query_analyzer.analyze_query(query)
            execution_time = query_analysis["execution_time"]
            target_time = query_analysis["target_time"]

            print(f"Scheduled search execution time: {execution_time}")
            print(f"Scheduled email delivery time: {target_time}")

            # Generate task ID
            schedule_id = f"task_{datetime.now(pytz.UTC).timestamp()}"
            print(f"Generated task ID: {schedule_id}")

            # Save task information
            task_info = {
                "query": query,
                "email": user_email,
                "execution_time": execution_time.isoformat(),
                "target_time": target_time.isoformat(),
                "search_type": query_analysis["search_type"],
                "status": "scheduled",
            }

            self.scheduled_tasks[schedule_id] = task_info

            # Save to ChromaDB
            print(
                f"[{self._get_current_time()}] Saving task information to ChromaDB..."
            )
            self.schedule_collection.add(
                documents=[json.dumps(task_info)],
                metadatas=[{"type": "schedule", "status": "pending"}],
                ids=[schedule_id],
            )
            print(f"[{self._get_current_time()}] Task information saved")

            # Schedule search execution
            execution_time_str = execution_time.strftime("%H:%M")
            schedule.every().day.at(execution_time_str).do(
                self.execute_search, schedule_id=schedule_id
            ).tag(schedule_id)

            print(f"[{self._get_current_time()}] Search task scheduling complete")

            return {
                "status": "success",
                "message": "Task successfully scheduled",
                "schedule_id": schedule_id,
                "execution_time": execution_time,
                "target_time": target_time,
            }

        except Exception as e:
            print(f"[{self._get_current_time()}] Task scheduling failed: {str(e)}")
            return {"status": "error", "error_message": str(e)}

    def execute_search(self, schedule_id: str) -> bool:
        """Execute search"""
        try:
            print(
                f"\n[{self._get_current_time()}] Starting search execution (ID: {schedule_id})"
            )

            task_info = self.scheduled_tasks.get(schedule_id)
            if not task_info:
                print(f"[{self._get_current_time()}] Task information not found")
                return False

            print(f"[{self._get_current_time()}] Analyzing search query...")
            query_analysis = self.query_analyzer.analyze_query(task_info["query"])

            print(f"[{self._get_current_time()}] Performing search...")
            search_results = self.search_router.route_and_search(query_analysis)

            print(f"[{self._get_current_time()}] Formatting search results...")
            formatted_response = self.response_agent.format_results(search_results)

            # Save results
            print(f"[{self._get_current_time()}] Saving search results to ChromaDB...")
            response_id = f"response_{schedule_id}"
            self.formatted_collection.add(
                documents=[json.dumps(formatted_response)],
                metadatas=[
                    {
                        "schedule_id": schedule_id,
                        "email": task_info["email"],
                        "target_time": task_info["target_time"],
                    }
                ],
                ids=[response_id],
            )
            print(f"[{self._get_current_time()}] Search results saved")

            # Schedule email delivery
            target_time = datetime.fromisoformat(task_info["target_time"])
            target_time_str = target_time.strftime("%H:%M")

            schedule.every().day.at(target_time_str).do(
                self.send_email, schedule_id=schedule_id
            ).tag(f"email_{schedule_id}")

            print(
                f"[{self._get_current_time()}] Email delivery scheduled (Time: {target_time_str})"
            )
            return True

        except Exception as e:
            print(f"[{self._get_current_time()}] Search execution failed: {str(e)}")
            return False

    def send_email(self, schedule_id: str) -> bool:
        """Send email"""
        try:
            print(
                f"\n[{self._get_current_time()}] Starting email delivery (ID: {schedule_id})"
            )

            response_id = f"response_{schedule_id}"
            print(f"[{self._get_current_time()}] Retrieving saved search results...")
            response_results = self.formatted_collection.get(ids=[response_id])

            if not response_results["documents"]:
                print(f"[{self._get_current_time()}] Search results not found")
                return False

            formatted_response = json.loads(response_results["documents"][0])
            metadata = response_results["metadatas"][0]

            print(f"[{self._get_current_time()}] Sending email...")
            print(f"Recipient: {metadata['email']}")
            print(f"Subject: {formatted_response['subject']}")

            self.yag.send(
                to=metadata["email"],
                subject=formatted_response["subject"],
                contents=formatted_response["body"],
            )
            print(f"[{self._get_current_time()}] Email sent successfully")

            # Update task status
            print(f"[{self._get_current_time()}] Updating task status...")
            task_info = self.scheduled_tasks[schedule_id]
            task_info["status"] = "completed"
            self.schedule_collection.update(
                documents=[json.dumps(task_info)], ids=[schedule_id]
            )

            # Clear schedule
            schedule.clear(f"email_{schedule_id}")
            print(f"[{self._get_current_time()}] Task completion processing complete\n")

            # Set completion flag
            self.is_completed = True
            self.completion_event.set()
            print(
                f"[{self._get_current_time()}] All tasks completed. Shutting down system.\n"
            )

            return True

        except Exception as e:
            print(f"[{self._get_current_time()}] Email delivery failed: {str(e)}")
            return False

    def _run_scheduler(self):
        """Run scheduler"""
        print(f"[{self._get_current_time()}] Scheduler started...")
        while not self.is_completed:
            schedule.run_pending()
            time.sleep(1)  # Check every second

    def wait_for_completion(self, timeout=None):
        """Wait for task completion"""
        try:
            completed = self.completion_event.wait(timeout=timeout)
            if not completed:
                print(f"[{self._get_current_time()}] Task completion timeout")
            if hasattr(self, "yag"):
                self.yag.close()
        except KeyboardInterrupt:
            print(f"\n[{self._get_current_time()}] Terminated by user.")
            if hasattr(self, "yag"):
                self.yag.close()

### Multi-Agent Scheduler System Usage Guide
The work starts 5 minutes before the work request time.

1. Email Configuration
- Enable Gmail 2-Step Verification
- Generate App Password: https://myaccount.google.com/security > 2-Step Verification > App passwords
```python
email_config = {
    "username": "your_email@gmail.com",
    "password": "your_app_password",  # Gmail app password
    "smtp_server": "smtp.gmail.com",
    "smtp_port": 587
}
```

2. Initialize System and Schedule Task
```python
# Initialize system
system = ScheduledSearchSystem(email_config)

# Schedule task
result = system.schedule_task(
    query="find AI papers at 9 AM",  # Search query to execute
    user_email="your_email@gmail.com"  # Email to receive results
)
```

3. Wait for Completion
```python
# Wait for task completion (max 4 hours)
system.wait_for_completion(timeout=14400)
```

Search results will be automatically emailed to the specified address upon completion.

The system supports various query types:
- Research papers: "find RAG papers at 7 AM"
- News: "find AI news at 9 AM"
- General search: "find restaurants in Seoul at 6 PM"

In [19]:
# System configuration
email_config = {
    "username": "jik9210@gmail.com",
    "password": "yqfu wwpw otrl fgji",
}

print("\n=== Starting Scheduling System Test ===\n")

# Initialize system
system = ScheduledSearchSystem(email_config)

# Schedule task
result = system.schedule_task(
    query="find Modular RAG paper at 11:30 AM",  # Example: 7:45 PM
    user_email="jik9210@gmail.com",
)

print("\n=== Scheduling Result ===")
print(json.dumps(result, indent=2, default=str))
print("\n=== Task will execute at scheduled time... ===\n")

# Wait for task completion (maximum 4 hours)
system.wait_for_completion(timeout=14400)  # 4 hours = 14400 seconds


=== Starting Scheduling System Test ===


[2025-02-08 11:24:54] Initializing system...
[2025-02-08 11:24:54] Email client configuration complete
[2025-02-08 11:24:54] Starting ChromaDB collection setup...
[2025-02-08 11:24:54] ChromaDB collection setup complete
[2025-02-08 11:24:54] Scheduler started...
[2025-02-08 11:24:54] System initialization complete


[2025-02-08 11:24:54] Starting new task scheduling...
Query: find Modular RAG paper at 11:30 AM
Email: jik9210@gmail.com
Scheduled search execution time: 2025-02-08 11:25:00+00:00
Scheduled email delivery time: 2025-02-08 11:30:00+00:00
Generated task ID: task_1738981498.110638
[2025-02-08 11:24:58] Saving task information to ChromaDB...
[2025-02-08 11:24:59] Task information saved
[2025-02-08 11:24:59] Search task scheduling complete

=== Scheduling Result ===
{
  "status": "success",
  "message": "Task successfully scheduled",
  "schedule_id": "task_1738981498.110638",
  "execution_time": "2025-02-08 11:25:00+00:00",
  "target_ti

Add of existing embedding ID: response_task_1738168330.927504


[2025-02-08 11:25:10] Saving search results to ChromaDB...
[2025-02-08 11:25:10] Search results saved
[2025-02-08 11:25:10] Email delivery scheduled (Time: 11:30)

[2025-02-08 11:30:00] Starting email delivery (ID: task_1738981498.110638)
[2025-02-08 11:30:00] Retrieving saved search results...
[2025-02-08 11:30:00] Sending email...
Recipient: jik9210@gmail.com
Subject: Search Results for [find Modular RAG paper at 11:30 AM]
[2025-02-08 11:30:04] Email sent successfully
[2025-02-08 11:30:04] Updating task status...
[2025-02-08 11:30:04] Task completion processing complete

[2025-02-08 11:30:04] All tasks completed. Shutting down system.

