# Multi-Agent Scheduler System

- Author: [Ilgyun Jeong](https://github.com/johnny9210)
- Design: 
- Peer Review: [Mark()](https://github.com/obov), [Taylor(Jihyun Kim)](https://github.com/Taylor0819)
- This is a part of [LangChain Open Tutorial](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial)

[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/19-Cookbook/03-MultiAgentSystem/01-MultiAgentScheduler.ipynb) [![Open in GitHub](https://img.shields.io/badge/Open%20in%20GitHub-181717?style=flat-square&logo=github&logoColor=white)](https://github.com/LangChain-OpenTutorial/LangChain-OpenTutorial/blob/main/19-Cookbook/03-MultiAgentSystem/01-MultiAgentScheduler.ipynb)

## Overview

The Multi-Agent Scheduler System is an advanced automation solution designed to process, schedule, and deliver information based on user queries. The system employs multiple specialized agents working in coordination, backed by a vector database for efficient information storage and retrieval. This system particularly excels at handling complex queries that involve timing specifications and information retrieval tasks.

For example, when a user requests "Find and recommend RAG-related papers at 7 AM," the system coordinates multiple agents to analyze the query, schedule the task, gather information, and deliver results at the specified time.

### System Architecture

1) Query Analysis and Task Distribution
- Specialized QueryAnalysisAgent breaks down complex user requests
- Extracts timing information, search parameters, and user intent
- Distributes tasks to appropriate specialized agents

2) Information Management
- Integrates with SerpAPI for real-time web information retrieval
- Utilizes ChromaDB as a vector database for efficient information storage
- Implements RAG (Retrieval-Augmented Generation) for enhanced response quality

3) Scheduling and Delivery
- Manages timing of information delivery through dedicated SchedulerAgent
- Handles multiple concurrent requests and scheduling conflicts
- Provides status updates and delivery confirmations

**Key Components**:

- **Specialized Agents**: 
  - SchedulerAgent: Time management and delivery coordination
  - QueryAnalysisAgent: User intent analysis and task distribution
  - SearchAgent: SerpAPI integration and information gathering
  - RAGAgent: Information processing and summarization
  - ResponseAgent: Final response generation and delivery

- **Vector Database Integration**: 
  - ChromaDB implementation for efficient information storage
  - Vector similarity search for relevant information retrieval
  - Caching mechanism for improved response times

- **Error Handling and Recovery**:  
  - Robust error detection and recovery mechanisms
  - Fallback options for agent failures
  - API retry logic and conflict resolution
  
- **Evaluation System**: 
  - Response quality metrics
  - User feedback collection and analysis
  - System performance monitoring
  - Continuous improvement mechanisms

### Process Flow

1. Query Reception and Analysis
   - Natural language query input
   - Intent extraction and task categorization
   - Timing requirement analysis

2. Task Distribution
   - Agent selection based on query requirements
   - Resource allocation
   - Priority assignment

3. Information Gathering
   - SerpAPI integration for web searches
   - Vector database querying
   - Information validation and filtering

4. Processing and Synthesis
   - RAG-based information processing
   - Content summarization and recommendation
   - Quality assurance checks

5. Scheduling and Delivery
   - Timing coordination
   - Delivery method selection
   - Status tracking and updates

### Table of Contents

- [Overview](#overview)
- [Environment Setup](#environment-setup)
- [Agent Architecture](#agent-architecture)
  - [QueryAnalysisAgent Implementation](#queryanalysisagent-implementation)
  - [SearchAgent and SerpAPI Integration](#searchagent-and-serpapi-integration)
  - [RAGAgent Implementation](#ragagent-implementation)
  - [SchedulerAgent Design](#scheduleragent-design)
  - [ResponseAgent Implementation](#responseagent-implementation)
- [Vector Database Setup](#vector-database-setup)
- [Mail Sending](#mail-sending)

### References


---


## Environment Setup

Set up the environment. You may refer to [Environment Setup](https://wikidocs.net/257836) for more details.

**[Note]**

- `langchain-opentutorial` is a package that provides a set of easy-to-use environment setup, useful functions and utilities for tutorials.
- You can checkout the [`langchain-opentutorial`](https://github.com/LangChain-OpenTutorial/langchain-opentutorial-pypi) for more details.


In [1]:
%%capture --no-stderr
%pip install langchain-opentutorial

In [2]:
# Install required packages
from langchain_opentutorial import package

package.install(
    [
        "langsmith",
        "langchain",
        "chromadb",
        "langchain_chroma",
        "langchain_openai",
        "pytz",
    ],
    verbose=False,
    upgrade=False,
)

In [None]:
# Set environment variables
from langchain_opentutorial import set_env

set_env(
    {
        "OPENAI_API_KEY": "",
        "LANGCHAIN_API_KEY": "",
        "LANGCHAIN_TRACING_V2": "true",
        "LANGCHAIN_ENDPOINT": "https://api.smith.langchain.com",
        "LANGCHAIN_PROJECT": "ResumeRecommendationReview",
        "UPSTAGE_API_KEY": "",
    }
)

In [54]:
from dotenv import load_dotenv

load_dotenv(override=True)

True

## Agent Architecture

This section explains the structure and roles of each Agent that composes the Multi-Agent Scheduler System. The system consists of five key Agents, each responsible for a specific set of tasks.

`QueryAnalysisAgent`
Analyzes the user's natural language query to extract time-related information and search requirements. For example, from the query “Please find RAG-related papers and recommend them to me at 7 in the morning,” it identifies the execution time and relevant search keywords.

`SearchAgent`
Performs web-based searches using SerpAPI. This includes gathering and organizing information from various sources such as arXiv, Papers with Code, GitHub, and more.

`RAGAgent`
Analyzes and processes the collected information, then generates recommendations that meet the user’s requirements. It integrates with a vector database to facilitate efficient information retrieval and analysis.

`SchedulerAgent`
Responsible for managing time-based tasks. It executes tasks according to the user’s specified schedule and monitors the status of each task.

`ResponseAgent`
Formats the processed information so that it can be conveniently delivered to the user. This can include sending the results via email alerts or similar means.

----
**Agent Flow**

```mermaid
graph TD
    A[User Query] --> B[QueryAnalysisAgent]
    B --> C[SearchAgent]
    C --> D[RAGAgent]
    D --> E[ResponseAgent]
    E --> F[User Notification]
    B --> G[SchedulerAgent]
    G --> E
```

### QueryAnalysisAgent Implementation
The QueryAnalysisAgent is a specialized component of a multi-agent scheduler system designed to parse and analyze user queries. 

Its primary purpose is to extract timing information and task-related details from natural language inputs, enabling scheduled task execution.

Core Functionality
The agent performs two main functions:
- Time extraction from natural language queries
- Task analysis and intent recognition
----
Usage Example
```python
agent = QueryAnalysisAgent()
query = "아침 7시에 RAG 관련 논문을 찾아서 추천해줘"
result = agent.analyze_query(query)
```

Expected output:
```json
{
    "target_time": "2025-01-22 07:00:00+0000",
    "execution_time": "2025-01-22 06:55:00+0000",
    "task_type": "추천",
    "keywords": ["아침", "7시", "RAG", "관련", "논문"],
    "requirements": "없음",
    "original_query": "아침 7시에 RAG 관련 논문을 찾아서 추천해줘",
    "status": "success"
}
```
----
Integration Points
The QueryAnalysisAgent is designed to work seamlessly with:
- SchedulerAgent for timing coordination
- SearchAgent for executing search tasks
- RAGAgent for processing retrieved information
- ResponseAgent for formatting results

Install and Import Required Libraries

In [1]:
# Cell 1: 필요한 라이브러리 임포트
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from datetime import datetime, timedelta
import pytz
import json
import os

True

Prompt Templates
The agent uses two main prompt templates:

a) Time Extraction Prompt:
- Purpose: Converts various time formats into 24-hour format
- Examples: 
  - "아침 7시" → "07:00"
  - "저녁 9시" → "21:00"
  - "오후 3시" → "15:00"

b) Task Analysis Prompt:
- Purpose: Extracts task information in JSON format
- Output Structure:
  ```json
  {
      "task_type": "검색 또는 요약 또는 추천 중 하나",
      "keywords": ["키워드1", "키워드2"],
      "requirements": "추가 요구사항"
  }
  ```

In [2]:
# Cell 2: 클래스 정의와 __init__, setup_prompt_templates 메서드
class QueryAnalysisAgent:
    def __init__(self, model_name="gpt-3.5-turbo"):
        self.llm = ChatOpenAI(model_name=model_name, temperature=0)
        self.setup_prompt_templates()

    def setup_prompt_templates(self):
        # 시간 추출을 위한 프롬프트
        self.time_extraction_prompt = PromptTemplate.from_template(
            """
            사용자의 쿼리에서 시간 정보를 추출하고 24시간 형식으로 변환하세요.
            예시: 
            - "아침 7시" -> "07:00"
            - "저녁 9시" -> "21:00"
            - "오후 3시" -> "15:00"
            
            쿼리: {query}
            시간 (HH:MM 형식):
        """
        )

        # 작업 의도 분석을 위한 프롬프트
        self.task_analysis_prompt = PromptTemplate.from_template(
            """
            사용자의 쿼리에서 다음 정보를 JSON 형식으로 추출하세요.
            
            쿼리: {query}
            
            다음 형식으로 응답해주세요:
            {{
                "task_type": "검색 또는 요약 또는 추천 중 하나",
                "keywords": ["키워드1", "키워드2"],
                "requirements": "추가 요구사항"
            }}
            
            JSON 형식으로만 응답해주세요.
        """
        )

Core Methods 1

`extract_time()`
- Functionality: Extracts and processes time information from queries
- Features:
  - Converts various time formats to datetime objects
  - Handles timezone awareness using pytz
  - Automatically adjusts for next day if time has passed
- Error Handling: Raises ValueError for invalid time formats


In [3]:
# Cell 3: extract_time 메서드 추가
def extract_time(self, query: str) -> datetime:
    """쿼리에서 시간 정보를 추출하고 datetime 객체로 반환"""
    time_extraction_chain = self.time_extraction_prompt | self.llm
    time_str = time_extraction_chain.invoke({"query": query})

    try:
        # ChatCompletion 응답에서 실제 시간 문자열 추출
        time_str = time_str.content.strip()

        # 현재 시간 기준으로 다음 예정 시간 계산
        current_time = datetime.now(pytz.utc)
        hour, minute = map(int, time_str.split(":"))

        target_time = current_time.replace(
            hour=hour, minute=minute, second=0, microsecond=0
        )

        # 이미 지난 시간이면 다음 날로 설정
        if target_time <= current_time:
            target_time += timedelta(days=1)

        return target_time
    except Exception as e:
        raise ValueError(f"시간 추출 실패: {e}")


# Cell 실행 후에는 이 메서드를 클래스에 추가해야 합니다
QueryAnalysisAgent.extract_time = extract_time

Core Methods 2

`analyze_task()`
- Functionality: Analyzes query content for task information
- Features:
  - Extracts task type, keywords, and requirements
  - Returns structured JSON output
  - Maintains context of the original query
- Error Handling: Handles JSON parsing errors

In [4]:
# Cell 4: analyze_task 메서드 추가
def analyze_task(self, query: str) -> dict:
    """쿼리에서 작업 의도와 키워드를 추출"""
    task_analysis_chain = self.task_analysis_prompt | self.llm
    response = task_analysis_chain.invoke({"query": query})

    try:
        # ChatCompletion 응답에서 JSON 추출 및 파싱
        return json.loads(response.content.strip())
    except json.JSONDecodeError as e:
        raise ValueError(f"작업 분석 결과 파싱 실패: {e}")


# 클래스에 메서드 추가
QueryAnalysisAgent.analyze_task = analyze_task

Core Methods 3

`analyze_query()`
- Functionality: Combines time extraction and task analysis
- Output Structure:
  ```python
  {
      "target_time": datetime,
      "execution_time": datetime,
      "task_type": str,
      "keywords": List[str],
      "requirements": str,
      "original_query": str,
      "status": str
  }
  ```

In [5]:
# Cell 5: analyze_query 메서드 추가
def analyze_query(self, query: str) -> dict:
    """쿼리 전체 분석을 수행하고 결과 반환"""
    try:
        target_time = self.extract_time(query)
        task_info = self.analyze_task(query)

        return {
            "target_time": target_time,
            "execution_time": target_time - timedelta(minutes=5),
            "task_type": task_info["task_type"],
            "keywords": task_info["keywords"],
            "requirements": task_info["requirements"],
            "original_query": query,
            "status": "success",
        }
    except Exception as e:
        return {"status": "error", "error_message": str(e), "original_query": query}


# 클래스에 메서드 추가
QueryAnalysisAgent.analyze_query = analyze_query

Test

In [6]:
# Cell 6: 테스트 (datetime 직렬화 처리 추가)
def datetime_handler(obj):
    if isinstance(obj, datetime):
        return obj.strftime("%Y-%m-%d %H:%M:%S%z")
    raise TypeError(f"Object of type {type(obj)} is not JSON serializable")


def test_query():
    agent = QueryAnalysisAgent()
    query = "아침 7시에 RAG 관련 논문을 찾아서 추천해줘"
    result = agent.analyze_query(query)
    return json.dumps(result, indent=2, ensure_ascii=False, default=datetime_handler)


print(test_query())

{
  "target_time": "2025-01-22 07:00:00+0000",
  "execution_time": "2025-01-22 06:55:00+0000",
  "task_type": "추천",
  "keywords": [
    "아침",
    "7시",
    "RAG",
    "관련",
    "논문"
  ],
  "requirements": "없음",
  "original_query": "아침 7시에 RAG 관련 논문을 찾아서 추천해줘",
  "status": "success"
}


### SearchAgent and SerpAPI Integration


The SearchAgent is a specialized component that performs web searches using SerpAPI to gather information related to RAG (Retrieval Augmented Generation). 

It processes search results into a structured format that can be utilized by other agents in the system.

----

Key Features

- 1. Multi-Source Search
- arXiv: Academic paper search
- Google Scholar: Scholarly material search
- Papers with Code: Papers with implementations
- GitHub: Implementation repositories

- 2. Result Filtering and Sorting
- Relevance score-based filtering
- Publication year-based filtering (prioritizing recent materials)
- Source-based result integration and sorting

----

### Usage Example

```python
# Initialize SearchAgent
agent = SearchAgent(serpapi_key="your_api_key")

# Perform search
query_info = {
    "keywords": ["RAG", "paper", "latest"],
    "original_query": "Find RAG related papers"
}

results = agent.perform_search(query_info)
```

### Example Search Result

```json
{
    "status": "success",
    "results": [
        {
            "type": "paper",
            "source": "arxiv",
            "title": "Recent Advances in RAG",
            "content": "This paper discusses recent advances in RAG...",
            "url": "https://arxiv.org/...",
            "year": "2024",
            "relevance_score": 1.0
        }
    ],
    "total_results_found": 13,
    "filtered_results_count": 8,
    "sources_searched": [
        "arXiv",
        "Google Scholar",
        "Papers with Code",
        "GitHub"
    ]
}
```
----

In [85]:
# Cell 1: 필요한 라이브러리 임포트
from langchain_community.utilities import SerpAPIWrapper
from typing import List, Dict, Any

The SearchAgent class is designed to perform web searches across multiple academic and technical sources using the SerpAPI wrapper. 

Here's a detailed breakdown of its implementation:

1. `Class Initialization`
- Takes an optional SerpAPI key
- Sets up the API key as an environment variable if provided
- Initializes the SerpAPI wrapper for search operations

2. `Search Parameter Setup`
This method constructs search queries for different sources:
- arXiv for academic papers
- Google Scholar for academic research
- Papers with Code for implementations
- GitHub for code repositories

Each source gets a specialized query format. For example:
- arXiv: `site:arxiv.org {keywords} RAG Retrieval Augmented Generation`
- GitHub: `site:github.com {keywords} RAG implementation`

3. `Search Execution`
Main features:
- Executes searches across all configured sources
- Handles each source's results separately
- Limits results per source using max_results parameter
- Returns a structured response with:
  - Search status
  - Combined results
  - List of sources searched
  - Original query reference

4. `Result Parsing`
Processes results based on source type:
- arXiv papers: 
  ```python
  {
      "type": "paper",
      "source": "arXiv",
      "title": parsed_title,
      "content": result,
      "url": extracted_url
  }
  ```
- GitHub implementations:
  ```python
  {
      "type": "implementation",
      "source": "GitHub",
      "title": parsed_title,
      "content": result,
      "url": extracted_url
  }
  ```
- Other sources:
  ```python
  {
      "type": "reference",
      "source": source_name,
      "title": parsed_title,
      "content": result,
      "url": extracted_url
  }
  ```

5. `URL Extraction`
- Placeholder method for URL extraction
- Intended to be implemented with regex or other parsing logic
- Currently returns empty string as temporary implementation
----
Return Format
Successful search result:
```python
{
    "status": "success",
    "results": [parsed_results],
    "sources_searched": ["arXiv", "Google Scholar", "Papers with Code", "GitHub"],
    "original_query": original_query
}
```

Error response:
```python
{
    "status": "error",
    "error_message": error_description,
    "original_query": original_query
}
```

This implementation provides a robust framework for executing multi-source academic and technical searches, with structured result parsing and error handling.

 It's particularly focused on RAG-related content across academic papers and technical implementations.

In [86]:
class SearchAgent:
    def __init__(self, serpapi_key: str = None):
        if serpapi_key:
            os.environ["SERPAPI_API_KEY"] = serpapi_key
        self.search = SerpAPIWrapper()

    def setup_search_parameters(self, query_info: Dict[str, Any]) -> List[str]:
        """검색 전략 수립"""
        search_queries = []

        # 학술 논문 검색을 위한 키워드 구성
        if "논문" in query_info["keywords"]:
            # arXiv 검색
            search_queries.append(
                f"site:arxiv.org {' '.join(query_info['keywords'])} RAG Retrieval Augmented Generation"
            )

            # Google Scholar 검색
            search_queries.append(
                f"site:scholar.google.com {' '.join(query_info['keywords'])} RAG LLM"
            )

            # Papers with Code 검색
            search_queries.append(
                f"site:paperswithcode.com {' '.join(query_info['keywords'])} RAG"
            )

            # 깃허브 구현체 검색
            search_queries.append(
                f"site:github.com {' '.join(query_info['keywords'])} RAG implementation"
            )

        return search_queries

    def perform_search(
        self, query_info: Dict[str, Any], max_results: int = 5
    ) -> Dict[str, Any]:
        """여러 소스에서 검색 수행"""
        try:
            search_queries = self.setup_search_parameters(query_info)
            all_results = []

            for query in search_queries:
                raw_results = self.search.run(query)

                # 소스별 결과 파싱
                source = (
                    "arxiv"
                    if "arxiv.org" in query
                    else (
                        "scholar"
                        if "scholar.google.com" in query
                        else (
                            "papers_with_code"
                            if "paperswithcode.com" in query
                            else "github"
                        )
                    )
                )

                # 결과 정리 및 메타데이터 추가
                parsed_results = self._parse_results(raw_results, source)
                all_results.extend(parsed_results[:max_results])

            return {
                "status": "success",
                "results": all_results,
                "sources_searched": [
                    "arXiv",
                    "Google Scholar",
                    "Papers with Code",
                    "GitHub",
                ],
                "original_query": query_info["original_query"],
            }

        except Exception as e:
            return {
                "status": "error",
                "error_message": str(e),
                "original_query": query_info["original_query"],
            }

    def _parse_results(self, raw_results: str, source: str) -> List[Dict[str, str]]:
        """검색 결과 파싱 및 구조화"""
        parsed_results = []
        for result in raw_results.split("\n"):
            if result.strip():
                # 소스별 파싱 로직
                if source == "arxiv":
                    # arXiv 논문 정보 추출
                    parsed_results.append(
                        {
                            "type": "paper",
                            "source": "arXiv",
                            "title": (
                                result.split(" - ")[0] if " - " in result else result
                            ),
                            "content": result,
                            "url": self._extract_url(result),
                        }
                    )
                elif source == "github":
                    # GitHub 구현체 정보 추출
                    parsed_results.append(
                        {
                            "type": "implementation",
                            "source": "GitHub",
                            "title": (
                                result.split(" - ")[0] if " - " in result else result
                            ),
                            "content": result,
                            "url": self._extract_url(result),
                        }
                    )
                else:
                    # 기타 소스 정보 추출
                    parsed_results.append(
                        {
                            "type": "reference",
                            "source": source,
                            "title": (
                                result.split(" - ")[0] if " - " in result else result
                            ),
                            "content": result,
                            "url": self._extract_url(result),
                        }
                    )

        return parsed_results

    def _extract_url(self, result: str) -> str:
        """결과에서 URL 추출"""
        # URL 추출 로직 구현
        # 실제 구현에서는 정규식 등을 사용하여 URL 추출
        return ""  # 임시 반환

Search Result Utility Functions

1. `clean_and_parse_results`
Purpose: Cleans and parses raw search result strings into a structured format.

Key features:
- Handles string-to-list conversion
- Validates string format
- Uses safe evaluation of string content

2. `extract_url_from_content`
Purpose: Extracts URLs from content using regex pattern matching.

Features:
- Uses regex pattern for URL detection
- Handles HTTP and HTTPS URLs
- Returns first found URL or empty string

3. `extract_year`
Purpose: Extracts year information from content text.

Features:
- Extracts years starting with '20'
- Returns most recent year if multiple found
- Returns empty string if no year found

----
Usage Context
These utility functions are used within the SearchAgent class to:
1. Process raw search results from SerpAPI
2. Extract metadata from search results
3. Standardize result format for further processing

This set of utility functions provides robust preprocessing of search results, ensuring clean and structured data for the rest of the system to process.

In [87]:
# Cell 3: 검색 결과 처리를 위한 유틸리티 함수 추가
def _clean_and_parse_results(self, raw_results: str) -> List[str]:
    """검색 결과 문자열을 깔끔하게 파싱"""
    if isinstance(raw_results, str):
        # 문자열이 리스트처럼 생겼다면 실제 리스트로 변환
        if raw_results.startswith("[") and raw_results.endswith("]"):
            try:
                # 문자열을 실제 파이썬 객체로 변환
                results = eval(raw_results)
                if isinstance(results, list):
                    return results
            except:
                pass
    return [raw_results]


def _extract_url_from_content(self, content: str) -> str:
    """컨텐츠에서 URL 추출"""
    import re

    url_pattern = r"https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+"
    urls = re.findall(url_pattern, content)
    return urls[0] if urls else ""


def _extract_year(self, content: str) -> str:
    """컨텐츠에서 연도 추출"""
    import re

    year_pattern = r"(20\d{2})"
    years = re.findall(year_pattern, content)
    return max(years) if years else ""

SearchAgent's perform_search Method Documentation

A method that performs searches across multiple academic and technical sources and processes the results.

Key Features

1. `Search Execution`
- Generates search queries using setup_search_parameters
- Initializes list for storing results

2. `Source-Specific Processing`
- Executes each search query
- Cleans and parses results

3. `Source Identification`
- Identifies source based on query URL
- Supports arXiv, Google Scholar, Papers with Code, GitHub

4. `Result Entry Creation`
- Creates structured result data
- Calculates relevance score (based on RAG presence)

5. `Result Filtering and Sorting`
- Filters results with relevance score > 0.5
- Includes results from 2022 or newer
- Sorts by relevance and year

This method effectively performs RAG-related searches and systematically processes results into a usable format, making it a valuable component of the search system.

In [88]:
# Cell 4: perform_search 메서드 개선
def perform_search(
    self, query_info: Dict[str, Any], max_results: int = 5
) -> Dict[str, Any]:
    """여러 소스에서 검색 수행"""
    try:
        search_queries = self.setup_search_parameters(query_info)
        all_results = []

        for query in search_queries:
            try:
                # SerpAPI 검색 실행
                raw_results = self.search.run(query)
                results_list = self._clean_and_parse_results(raw_results)

                # 소스 확인
                source = (
                    "arxiv"
                    if "arxiv.org" in query
                    else (
                        "scholar"
                        if "scholar.google.com" in query
                        else (
                            "papers_with_code"
                            if "paperswithcode.com" in query
                            else "github"
                        )
                    )
                )

                # 각 결과 처리
                for result in results_list:
                    url = self._extract_url_from_content(result)
                    year = self._extract_year(result)

                    # 결과 정리
                    result_entry = {
                        "type": (
                            "paper"
                            if source in ["arxiv", "scholar"]
                            else "implementation"
                        ),
                        "source": source,
                        "title": (
                            result[: result.find(".")]
                            if "." in result
                            else result[:100]
                        ),
                        "content": result,
                        "url": url,
                        "year": year,
                        "relevance_score": 1 if "RAG" in result.upper() else 0.5,
                    }

                    all_results.append(result_entry)

            except Exception as query_error:
                print(f"Query '{query}' failed: {str(query_error)}")
                continue

        # 결과 정렬 및 필터링
        filtered_results = [
            r
            for r in all_results
            if r["relevance_score"] > 0.5
            and (
                not r["year"] or int(r["year"]) >= 2022
            )  # 2022년 이후 또는 연도 없는 결과
        ]

        sorted_results = sorted(
            filtered_results,
            key=lambda x: (x["relevance_score"], x["year"] if x["year"] else "0"),
            reverse=True,
        )

        return {
            "status": "success",
            "results": sorted_results[:max_results],
            "total_results_found": len(all_results),
            "filtered_results_count": len(filtered_results),
            "sources_searched": [
                "arXiv",
                "Google Scholar",
                "Papers with Code",
                "GitHub",
            ],
            "original_query": query_info["original_query"],
        }

    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e),
            "original_query": query_info.get("original_query", ""),
        }


# 클래스에 메서드 추가
SearchAgent._clean_and_parse_results = _clean_and_parse_results
SearchAgent._extract_url_from_content = _extract_url_from_content
SearchAgent._extract_year = _extract_year
SearchAgent.perform_search = perform_search

Test

In [97]:
# Cell 4: 테스트
def test_search():
    # QueryAnalysisAgent에서 받은 결과를 시뮬레이션
    query_info = {
        "keywords": ["RAG", "논문", "최신"],
        "original_query": "논문 Find RAG related paper",
    }

    agent = SearchAgent()
    results = agent.perform_search(query_info)
    return json.dumps(results, indent=2, ensure_ascii=False)


print(test_search())

Query 'site:scholar.google.com RAG 논문 최신 RAG LLM' failed: Got error from SerpAPI: Google hasn't returned any results for this query.
{
  "status": "success",
  "results": [
    {
      "type": "implementation",
      "source": "github",
      "title": "논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ",
      "content": "논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ...",
      "url": "",
      "year": "2024",
      "relevance_score": 1
    },
    {
      "type": "paper",
      "source": "arxiv",
      "title": "Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ",
      "content": "Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ...",
      

In [98]:
data = test_search()

Query 'site:scholar.google.com RAG 논문 최신 RAG LLM' failed: Got error from SerpAPI: Google hasn't returned any results for this query.


In [99]:
data

'{\n  "status": "success",\n  "results": [\n    {\n      "type": "implementation",\n      "source": "github",\n      "title": "논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ",\n      "content": "논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ...",\n      "url": "",\n      "year": "2024",\n      "relevance_score": 1\n    },\n    {\n      "type": "paper",\n      "source": "arxiv",\n      "title": "Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ",\n      "content": "Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ...",\n      "url": "",\n      "year": "",\n      "relevance_score": 1\n    },\n    {\n      "type": "paper",\n      "source": "

In [103]:
import json

# 문자열을 파이썬 딕셔너리로 변환하고 results 키의 값을 가져옴
results = json.loads(data)["results"]

In [104]:
results

[{'type': 'implementation',
  'source': 'github',
  'title': '논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ',
  'content': '논문 리뷰와 코드를 준비했습니다 - YouTube · RAG관련 최신 논문 RAPTOR: Recursive Abstractive Processing for Tree-Organized Retrieval(2024) · VidiGo | 비디고 ...',
  'url': '',
  'year': '2024',
  'relevance_score': 1},
 {'type': 'paper',
  'source': 'arxiv',
  'title': 'Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ',
  'content': 'Retrieval-augmented generation (RAG) addresses these limitations by integrating an external knowledge source, such as a database or search ...',
  'url': '',
  'year': '',
  'relevance_score': 1},
 {'type': 'paper',
  'source': 'arxiv',
  'title': 'To evaluate the risk of multilingual harmful content generation, we augmented ',
  'content': 'To evaluate the risk of multilingual harm

### RAGAgent Implementation

In [105]:
# Cell 1: 필요한 라이브러리 임포트
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
from typing import List, Dict, Any
import json

In [110]:
# Cell 2: RAGAgent 클래스 및 기본 메서드 정의
class RAGAgent:
    def __init__(self, model_name="gpt-3.5-turbo"):
        self.llm = ChatOpenAI(model_name=model_name, temperature=0)
        self.embeddings = OpenAIEmbeddings()
        self.setup_prompt_templates()

    def setup_prompt_templates(self):
        self.analysis_prompt = PromptTemplate.from_template(
            """
            Analyze the following research papers related to RAG (Retrieval Augmented Generation) and provide a comprehensive summary and recommendations.
            
            Papers:
            {papers}
            
            Please provide:
            1. Brief summary of each paper
            2. Key innovations and contributions
            3. Potential applications
            4. Recommendations based on user's interests
            
            Format your response as JSON with the following structure:
            {{
                "summaries": [
                    {{"title": "", "summary": "", "key_points": []}}
                ],
                "recommendations": [
                    {{"title": "", "reason": ""}}
                ],
                "overall_analysis": ""
            }}
            
            Original user query: {original_query}
            """
        )

In [112]:
# Cell 3: 데이터 처리 메서드
def preprocess_papers(
    self, search_results: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
    """검색 결과를 분석하기 좋은 형태로 전처리"""
    processed_papers = []
    for result in search_results:
        processed_papers.append(
            {
                "title": result["title"],
                "content": result["content"],
                "year": result.get("year", ""),
                "source": result["source"],
                "url": result["url"],
                "relevance_score": result["relevance_score"],
            }
        )
    return processed_papers


RAGAgent.preprocess_papers = preprocess_papers


def create_vector_store(self, papers: List[Dict[str, Any]]):
    """논문 내용을 벡터 스토어에 저장"""
    texts = [paper["content"] for paper in papers]
    metadatas = [
        {
            "title": paper["title"],
            "year": paper["year"],
            "source": paper["source"],
            "url": paper["url"],
        }
        for paper in papers
    ]

    return Chroma.from_texts(
        texts=texts, embedding=self.embeddings, metadatas=metadatas
    )


RAGAgent.create_vector_store = create_vector_store

In [113]:
# Cell 5: analyze_and_recommend 메서드 추가
def analyze_and_recommend(
    self, papers: List[Dict[str, Any]], query_info: Dict[str, Any]
) -> Dict[str, Any]:
    """논문 분석 및 추천 수행"""
    try:
        # 벡터 스토어 생성
        vector_store = self.create_vector_store(papers)

        # 분석을 위한 컨텍스트 준비
        papers_context = "\n\n".join(
            [
                f"Title: {paper['title']}\nContent: {paper['content']}\nYear: {paper['year']}\nSource: {paper['source']}"
                for paper in papers
            ]
        )

        # LLM을 사용한 분석 수행
        analysis_chain = self.analysis_prompt | self.llm
        analysis_result = analysis_chain.invoke(
            {"papers": papers_context, "original_query": query_info["original_query"]}
        )

        try:
            # JSON 파싱
            result = json.loads(analysis_result.content)
            result["status"] = "success"
            return result
        except json.JSONDecodeError:
            # JSON 파싱 실패 시 원본 텍스트 반환
            return {
                "status": "partial_success",
                "raw_analysis": analysis_result.content,
                "error": "JSON parsing failed",
            }

    except Exception as e:
        return {"status": "error", "error_message": str(e)}


# 클래스에 메서드 추가
RAGAgent.analyze_and_recommend = analyze_and_recommend

In [114]:
# Cell 6: process_papers 메서드 추가
def process_papers(
    self, search_results: List[Dict[str, Any]], query_info: Dict[str, Any]
) -> Dict[str, Any]:
    """전체 논문 처리 프로세스 실행"""
    try:
        # 데이터 전처리
        processed_papers = self.preprocess_papers(search_results)

        # 분석 및 추천
        analysis_results = self.analyze_and_recommend(processed_papers, query_info)

        return {
            "status": "success",
            "analysis_results": analysis_results,
            "papers_processed": len(processed_papers),
            "original_query": query_info["original_query"],
        }

    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e),
            "original_query": query_info["original_query"],
        }


# 클래스에 메서드 추가
RAGAgent.process_papers = process_papers

In [115]:
# Cell 7: 테스트
def test_rag_agent():
    # SearchAgent 결과 시뮬레이션
    search_results = [
        {
            "title": "Recent Advances in RAG",
            "content": "This paper discusses recent advances in Retrieval Augmented Generation...",
            "year": "2024",
            "source": "arxiv",
            "url": "https://example.com",
            "relevance_score": 1,
        },
        # 더 많은 테스트 데이터 추가 가능
    ]

    query_info = {
        "keywords": ["RAG", "paper", "latest"],
        "original_query": "Find RAG related papers at 7 AM and recommend them",
    }

    agent = RAGAgent()
    results = agent.process_papers(search_results, query_info)
    return json.dumps(results, indent=2, ensure_ascii=False)


print(test_rag_agent())

{
  "status": "success",
  "analysis_results": {
    "summaries": [
      {
        "title": "Recent Advances in RAG",
        "summary": "This paper discusses recent advances in Retrieval Augmented Generation...",
        "key_points": [
          "Discusses recent advances in RAG",
          "Provides insights into the latest developments in the field",
          "Highlights key research findings and trends"
        ]
      }
    ],
    "recommendations": [
      {
        "title": "Recent Advances in RAG",
        "reason": "This paper provides a comprehensive overview of the recent advancements in RAG, making it a valuable resource for anyone interested in staying up-to-date with the latest developments in the field."
      }
    ],
    "overall_analysis": "The paper on Recent Advances in RAG is a valuable resource for researchers, practitioners, and anyone interested in the field of Retrieval Augmented Generation. It offers insights into the latest trends, key findings, and potent

### SchedulerAgent Design

In [116]:
# Cell 1: 필요한 라이브러리 임포트
import asyncio
from typing import Dict, Any, Optional, List, Callable
import nest_asyncio

nest_asyncio.apply()

In [126]:
# Cell 2: SchedulerAgent 클래스의 schedule_task 메서드 수정
class SchedulerAgent:
    def __init__(self):
        self.scheduled_tasks: Dict[str, Dict[str, Any]] = {}
        self.task_queue = asyncio.Queue()

    def schedule_task(
        self,
        task_id: str,
        target_time: datetime,
        task_info: Dict[str, Any],
        callback: Optional[Callable] = None,
    ) -> Dict[str, Any]:
        """작업 스케줄링"""
        try:
            current_time = datetime.now(pytz.utc)
            # 목표 시간이 현재보다 이전인 경우
            if target_time <= current_time:
                return {
                    "status": "error",
                    "error_message": f"Target time ({target_time}) must be in the future. Current time: {current_time}",
                    "task_id": task_id,
                }

            # 실행 시간 계산 (목표 시간 5분 전)
            execution_time = target_time - timedelta(minutes=5)

            # 실행 시간이 현재보다 이전인 경우, 바로 실행하도록 현재 시간으로 설정
            if execution_time <= current_time:
                execution_time = current_time

            print(f"Task Scheduling Details:")
            print(f"- Current Time: {current_time}")
            print(f"- Execution Time: {execution_time}")
            print(f"- Target Time: {target_time}")

            # 작업 정보 저장
            self.scheduled_tasks[task_id] = {
                "task_id": task_id,
                "target_time": target_time,
                "execution_time": execution_time,
                "task_info": task_info,
                "callback": callback,
                "status": "scheduled",
                "result": None,
            }

            return {
                "status": "success",
                "message": "Task scheduled successfully",
                "task_id": task_id,
                "scheduled_time": target_time.isoformat(),
                "execution_time": execution_time.isoformat(),
            }

        except Exception as e:
            return {"status": "error", "error_message": str(e), "task_id": task_id}

In [127]:
# Cell 3: 작업 실행 관련 메서드
async def execute_task(self, task_id: str) -> Dict[str, Any]:
    """작업 실행"""
    try:
        task = self.scheduled_tasks[task_id]

        # 이미 완료된 작업이면 건너뛰기
        if task["status"] == "completed":
            return {
                "status": "skipped",
                "message": "Task already completed",
                "task_id": task_id,
            }

        # 실행 시간 확인
        current_time = datetime.now(pytz.utc)
        if current_time < task["execution_time"]:
            return {
                "status": "waiting",
                "message": "Not yet execution time",
                "task_id": task_id,
            }

        # 상태 업데이트
        task["status"] = "executing"
        print(f"Executing task {task_id} at {current_time}")

        # 콜백 함수 실행
        if task["callback"]:
            result = await task["callback"](task["task_info"])
            task["result"] = result

        # 작업 완료 처리
        task["status"] = "completed"
        task["completed_at"] = datetime.now(pytz.utc)

        return {
            "status": "success",
            "task_id": task_id,
            "execution_result": task["result"],
        }

    except Exception as e:
        if task_id in self.scheduled_tasks:
            self.scheduled_tasks[task_id]["status"] = "failed"
        return {"status": "error", "error_message": str(e), "task_id": task_id}


async def monitor_tasks(self):
    """작업 모니터링 및 실행"""
    while True:
        try:
            current_time = datetime.now(pytz.utc)

            # 실행할 작업 확인
            for task_id, task in list(self.scheduled_tasks.items()):
                if (
                    task["status"] == "scheduled"
                    and current_time >= task["execution_time"]
                    and task_id not in self.task_queue._queue
                ):
                    print(f"Adding task {task_id} to queue at {current_time}")
                    await self.task_queue.put(task_id)

            await asyncio.sleep(1)  # 1초마다 체크

        except Exception as e:
            print(f"Error in monitor_tasks: {e}")
            await asyncio.sleep(1)


async def process_task_queue(self):
    """작업 큐 처리"""
    while True:
        try:
            # 큐에서 작업 가져오기
            task_id = await self.task_queue.get()

            # 작업 실행
            await self.execute_task(task_id)

            # 작업 완료 표시
            self.task_queue.task_done()

        except Exception as e:
            print(f"Error in process_task_queue: {e}")

        await asyncio.sleep(1)

In [128]:
# Cell 4: 상태 확인 메서드
def get_task_status(self, task_id: str) -> Dict[str, Any]:
    """작업 상태 조회"""
    if task_id in self.scheduled_tasks:
        task = self.scheduled_tasks[task_id]
        status_info = {
            "status": "success",
            "task_info": {
                "task_id": task_id,
                "current_status": task["status"],
                "target_time": task["target_time"].isoformat(),
                "execution_time": task["execution_time"].isoformat(),
                "result": task.get("result"),
            },
        }
        # 완료 시간이 있으면 추가
        if "completed_at" in task:
            status_info["task_info"]["completed_at"] = task["completed_at"].isoformat()
        return status_info

    return {"status": "error", "error_message": "Task not found", "task_id": task_id}


# 클래스에 메서드 추가
SchedulerAgent.execute_task = execute_task
SchedulerAgent.monitor_tasks = monitor_tasks
SchedulerAgent.process_task_queue = process_task_queue
SchedulerAgent.get_task_status = get_task_status

In [129]:
# Cell 5: 테스트 코드
async def test_callback(task_info: Dict[str, Any]) -> Dict[str, Any]:
    await asyncio.sleep(2)  # 작업 시뮬레이션
    return {"status": "success", "message": f"Processed task with info: {task_info}"}


def run_scheduler_test():
    async def test_scheduler():
        scheduler = SchedulerAgent()

        # 현재 시간 + 30초로 목표 시간 설정
        current_time = datetime.now(pytz.utc)
        target_time = current_time + timedelta(seconds=30)

        task_info = {
            "query": "Find RAG related papers",
            "keywords": ["RAG", "paper", "latest"],
        }

        # 작업 스케줄링
        schedule_result = scheduler.schedule_task(
            task_id="test_task_1",
            target_time=target_time,
            task_info=task_info,
            callback=test_callback,
        )

        print("\nSchedule Result:", json.dumps(schedule_result, indent=2))

        # 모니터링 및 큐 처리 시작
        monitoring_task = asyncio.create_task(scheduler.monitor_tasks())
        queue_processing_task = asyncio.create_task(scheduler.process_task_queue())

        # 상태를 주기적으로 확인
        check_times = 6
        for i in range(check_times):
            await asyncio.sleep(5)  # 5초마다
            status = scheduler.get_task_status("test_task_1")
            print(
                f"\nTask Status at {datetime.now(pytz.utc)} (Check {i+1}/{check_times}):"
            )
            print(json.dumps(status, indent=2))

        # 정리
        monitoring_task.cancel()
        queue_processing_task.cancel()

    # 테스트 실행
    asyncio.get_event_loop().run_until_complete(test_scheduler())


# 테스트 실행
run_scheduler_test()

Task Scheduling Details:
- Current Time: 2025-01-21 15:04:10.929193+00:00
- Execution Time: 2025-01-21 15:04:10.929193+00:00
- Target Time: 2025-01-21 15:04:40.929116+00:00

Schedule Result: {
  "status": "success",
  "message": "Task scheduled successfully",
  "task_id": "test_task_1",
  "scheduled_time": "2025-01-21T15:04:40.929116+00:00",
  "execution_time": "2025-01-21T15:04:10.929193+00:00"
}
Adding task test_task_1 to queue at 2025-01-21 15:04:10.929547+00:00
Executing task test_task_1 at 2025-01-21 15:04:10.929593+00:00

Task Status at 2025-01-21 15:04:15.930575+00:00 (Check 1/6):
{
  "status": "success",
  "task_info": {
    "task_id": "test_task_1",
    "current_status": "completed",
    "target_time": "2025-01-21T15:04:40.929116+00:00",
    "execution_time": "2025-01-21T15:04:10.929193+00:00",
    "result": {
      "status": "success",
      "message": "Processed task with info: {'query': 'Find RAG related papers', 'keywords': ['RAG', 'paper', 'latest']}"
    },
    "complete

### ResponseAgent Implementation

In [130]:
# Cell 2: ResponseAgent 클래스 정의
class ResponseAgent:
    def __init__(self, model_name="gpt-3.5-turbo"):
        self.llm = ChatOpenAI(model_name=model_name, temperature=0.3)
        self.setup_prompt_templates()

    def setup_prompt_templates(self):
        # 응답 생성을 위한 프롬프트
        self.response_prompt = PromptTemplate.from_template(
            """
            다음 RAG 논문 분석 결과를 바탕으로 사용자에게 전달할 응답을 생성해주세요.
            
            분석 결과:
            {analysis_results}
            
            다음 형식으로 응답해주세요:
            1. 핵심 요약 (2-3줄)
            2. 추천 논문 목록 (관련도 순)
            3. 각 논문별 주요 포인트
            4. 실용적 시사점
            
            응답은 다음 JSON 형식으로 작성해주세요:
            {{
                "summary": "전체 요약",
                "recommended_papers": [
                    {{"title": "제목", "relevance": "관련도", "key_points": ["포인트1", "포인트2"]}}
                ],
                "practical_implications": ["시사점1", "시사점2"],
                "next_steps": ["추천 액션1", "추천 액션2"]
            }}
            """
        )

In [131]:
# Cell 3: 응답 생성 메서드
def format_response(
    self, rag_results: Dict[str, Any], query_info: Dict[str, Any]
) -> Dict[str, Any]:
    """RAG 결과를 기반으로 응답 생성"""
    try:
        # 분석 결과를 문자열로 변환
        analysis_str = json.dumps(rag_results, ensure_ascii=False, indent=2)

        # LLM을 사용하여 응답 생성
        response_chain = self.response_prompt | self.llm
        response = response_chain.invoke({"analysis_results": analysis_str})

        try:
            formatted_response = json.loads(response.content)
            return {
                "status": "success",
                "response": formatted_response,
                "original_query": query_info["original_query"],
            }
        except json.JSONDecodeError:
            return {
                "status": "error",
                "error_message": "Response formatting failed",
                "raw_response": response.content,
            }

    except Exception as e:
        return {"status": "error", "error_message": str(e)}


ResponseAgent.format_response = format_response

In [132]:
# Cell 4: 메시지 생성 메서드
def generate_message(self, formatted_response: Dict[str, Any]) -> str:
    """사용자에게 보낼 최종 메시지 생성"""
    try:
        message = []

        # 전체 요약 추가
        message.append(f"📌 요약:\n{formatted_response['summary']}\n")

        # 추천 논문 목록
        message.append("📚 추천 논문:")
        for paper in formatted_response["recommended_papers"]:
            message.append(f"\n제목: {paper['title']}")
            message.append(f"관련도: {paper['relevance']}")
            message.append("주요 포인트:")
            for point in paper["key_points"]:
                message.append(f"- {point}")

        # 실용적 시사점
        message.append("\n💡 실용적 시사점:")
        for implication in formatted_response["practical_implications"]:
            message.append(f"- {implication}")

        # 다음 단계
        message.append("\n🔜 추천 액션:")
        for step in formatted_response["next_steps"]:
            message.append(f"- {step}")

        return "\n".join(message)

    except Exception as e:
        return f"메시지 생성 중 오류 발생: {str(e)}"


ResponseAgent.generate_message = generate_message

In [133]:
# Cell 5: 테스트 코드
def test_response_agent():
    # RAG 결과 시뮬레이션
    rag_results = {
        "summaries": [
            {
                "title": "Recent Advances in RAG",
                "summary": "This paper discusses the latest developments in RAG...",
                "key_points": [
                    "Improved retrieval methods",
                    "Better integration with LLMs",
                    "Enhanced performance metrics",
                ],
            }
        ],
        "recommendations": [
            {
                "title": "Recent Advances in RAG",
                "reason": "Comprehensive overview of latest developments",
            }
        ],
        "overall_analysis": "Significant progress in RAG technologies...",
    }

    query_info = {
        "original_query": "Find RAG related papers at 7 AM and recommend them"
    }

    agent = ResponseAgent()

    # 응답 포맷팅
    formatted_response = agent.format_response(rag_results, query_info)
    print("\nFormatted Response:")
    print(json.dumps(formatted_response, indent=2, ensure_ascii=False))

    # 최종 메시지 생성
    if formatted_response["status"] == "success":
        message = agent.generate_message(formatted_response["response"])
        print("\nFinal Message:")
        print(message)


# 테스트 실행
test_response_agent()


Formatted Response:
{
  "status": "success",
  "response": {
    "summary": "RAG 기술의 중요한 발전이 있었으며, 개선된 검색 방법, LLM과의 효율적인 통합, 향상된 성능 지표 등이 주요 개발 사항으로 소개되었습니다.",
    "recommended_papers": [
      {
        "title": "Recent Advances in RAG",
        "relevance": "높음",
        "key_points": [
          "Improved retrieval methods",
          "Better integration with LLMs",
          "Enhanced performance metrics"
        ]
      }
    ],
    "practical_implications": [
      "RAG 기술의 발전을 따라가며 적용할 수 있는 방법을 고민해보는 것이 중요합니다."
    ],
    "next_steps": [
      "RAG 기술을 실제 업무나 연구에 적용해보고 성능을 평가해보는 것이 좋을 것입니다."
    ]
  },
  "original_query": "Find RAG related papers at 7 AM and recommend them"
}

Final Message:
📌 요약:
RAG 기술의 중요한 발전이 있었으며, 개선된 검색 방법, LLM과의 효율적인 통합, 향상된 성능 지표 등이 주요 개발 사항으로 소개되었습니다.

📚 추천 논문:

제목: Recent Advances in RAG
관련도: 높음
주요 포인트:
- Improved retrieval methods
- Better integration with LLMs
- Enhanced performance metrics

💡 실용적 시사점:
- RAG 기술의 발전을 따라가며 적용할 수 있는 방법을 고민해보는 것이 중요합니다.

## Vector Database Setup

In [140]:
# Cell 2: VectorDB 클래스 정의
class VectorDB:
    def __init__(self, persist_directory="./vector_db"):
        self.embeddings = OpenAIEmbeddings()
        self.persist_directory = persist_directory
        self.ensure_directory()
        self.db = self.initialize_db()

    def ensure_directory(self):
        """벡터 DB를 저장할 디렉토리 생성"""
        if not os.path.exists(self.persist_directory):
            os.makedirs(self.persist_directory)
            print(f"Created directory: {self.persist_directory}")

In [141]:
# Cell 3: 데이터베이스 초기화 및 관리 메서드
def initialize_db(self) -> Chroma:
    """ChromaDB 초기화"""
    return Chroma(
        persist_directory=self.persist_directory, embedding_function=self.embeddings
    )


# 클래스에 메서드 추가
VectorDB.initialize_db = initialize_db


def store_papers(self, papers: List[Dict[str, Any]]) -> Dict[str, Any]:
    """논문 정보 저장"""
    try:
        # 논문 컨텐츠와 메타데이터 준비
        texts = []
        metadatas = []

        for paper in papers:
            texts.append(paper["content"])
            metadatas.append(
                {
                    "title": paper["title"],
                    "source": paper.get("source", "unknown"),
                    "url": paper.get("url", ""),
                    "year": paper.get("year", ""),
                    "stored_at": datetime.now().isoformat(),
                }
            )

        # 벡터 DB에 저장
        self.db.add_texts(texts=texts, metadatas=metadatas)

        return {
            "status": "success",
            "message": f"Stored {len(papers)} papers successfully",
            "stored_count": len(papers),
        }

    except Exception as e:
        return {"status": "error", "error_message": str(e)}

In [142]:
# Cell 4: store_papers 메서드 추가
def store_papers(self, papers: List[Dict[str, Any]]) -> Dict[str, Any]:
    """논문 정보 저장"""
    try:
        # 논문 컨텐츠와 메타데이터 준비
        texts = []
        metadatas = []

        for paper in papers:
            texts.append(paper["content"])
            metadatas.append(
                {
                    "title": paper["title"],
                    "source": paper.get("source", "unknown"),
                    "url": paper.get("url", ""),
                    "year": paper.get("year", ""),
                    "stored_at": datetime.now().isoformat(),
                }
            )

        # 벡터 DB에 저장
        self.db.add_texts(texts=texts, metadatas=metadatas)

        return {
            "status": "success",
            "message": f"Stored {len(papers)} papers successfully",
            "stored_count": len(papers),
        }

    except Exception as e:
        return {"status": "error", "error_message": str(e)}


# 클래스에 메서드 추가
VectorDB.store_papers = store_papers

In [143]:
# Cell 5: 검색 메서드 추가
def search_papers(self, query: str, n_results: int = 5) -> Dict[str, Any]:
    """유사도 기반 논문 검색"""
    try:
        results = self.db.similarity_search_with_relevance_scores(query, k=n_results)

        formatted_results = []
        for doc, score in results:
            formatted_results.append(
                {
                    "content": doc.page_content,
                    "metadata": doc.metadata,
                    "relevance_score": score,
                }
            )

        return {"status": "success", "results": formatted_results, "query": query}

    except Exception as e:
        return {"status": "error", "error_message": str(e), "query": query}


# 클래스에 메서드 추가
VectorDB.search_papers = search_papers

In [144]:
# Cell 6: 메타데이터 검색 및 관리 메서드 추가
def get_paper_by_metadata(self, metadata_field: str, value: str) -> Dict[str, Any]:
    """메타데이터 기반 논문 검색"""
    try:
        results = self.db.get(where={metadata_field: value})

        return {
            "status": "success",
            "results": results,
            "search_criteria": {metadata_field: value},
        }

    except Exception as e:
        return {
            "status": "error",
            "error_message": str(e),
            "search_criteria": {metadata_field: value},
        }


def get_collection_stats(self) -> Dict[str, Any]:
    """컬렉션 통계 정보 조회"""
    try:
        return {
            "status": "success",
            "stats": {
                "total_documents": len(self.db.get()["ids"]),
                "persist_directory": self.persist_directory,
            },
        }
    except Exception as e:
        return {"status": "error", "error_message": str(e)}


def clear_database(self) -> Dict[str, Any]:
    """데이터베이스 초기화"""
    try:
        self.db = self.initialize_db()
        return {"status": "success", "message": "Database cleared successfully"}
    except Exception as e:
        return {"status": "error", "error_message": str(e)}


# 클래스에 메서드들 추가
VectorDB.get_paper_by_metadata = get_paper_by_metadata
VectorDB.get_collection_stats = get_collection_stats
VectorDB.clear_database = clear_database

In [145]:
# Cell 7: 테스트
def test_vector_db():
    # 테스트용 논문 데이터
    test_papers = [
        {
            "title": "Recent Advances in RAG",
            "content": "This paper discusses recent advances in Retrieval Augmented Generation...",
            "source": "arxiv",
            "url": "https://example.com/paper1",
            "year": "2024",
        },
        {
            "title": "Improving RAG Performance",
            "content": "A comprehensive study on improving RAG performance in various scenarios...",
            "source": "conference",
            "url": "https://example.com/paper2",
            "year": "2023",
        },
    ]

    # VectorDB 인스턴스 생성
    db = VectorDB(persist_directory="./test_vector_db")

    # 논문 저장
    store_result = db.store_papers(test_papers)
    print("\nStore Result:")
    print(json.dumps(store_result, indent=2))

    # 검색 테스트
    search_result = db.search_papers("RAG implementation techniques")
    print("\nSearch Result:")
    print(json.dumps(search_result, indent=2))

    # 통계 정보 확인
    stats = db.get_collection_stats()
    print("\nCollection Stats:")
    print(json.dumps(stats, indent=2))


# 테스트 실행
test_vector_db()

  return Chroma(



Store Result:
{
  "status": "success",
  "message": "Stored 2 papers successfully",
  "stored_count": 2
}


Number of requested results 5 is greater than number of elements in index 2, updating n_results = 2



Search Result:
{
  "status": "success",
  "results": [
    {
      "content": "A comprehensive study on improving RAG performance in various scenarios...",
      "metadata": {
        "source": "conference",
        "stored_at": "2025-01-22T00:21:22.263140",
        "title": "Improving RAG Performance",
        "url": "https://example.com/paper2",
        "year": "2023"
      },
      "relevance_score": 0.8446409962394603
    },
    {
      "content": "This paper discusses recent advances in Retrieval Augmented Generation...",
      "metadata": {
        "source": "arxiv",
        "stored_at": "2025-01-22T00:21:22.263134",
        "title": "Recent Advances in RAG",
        "url": "https://example.com/paper1",
        "year": "2024"
      },
      "relevance_score": 0.7008961117884753
    }
  ],
  "query": "RAG implementation techniques"
}

Collection Stats:
{
  "status": "success",
  "stats": {
    "total_documents": 2,
    "persist_directory": "./test_vector_db"
  }
}


### mail

In [None]:
ing