# Prompt & Resource Features

### Resources 참고

```python
@mcp.resource("papers://folders")
def get_available_folders() -> str:
    """
    List all available topic folders in the papers directory.
    
    This resource provides a simple list of all available topic folders.
    """
    folders = []
    
    # 모든 주제 디렉토리 가져오기
    if os.path.exists(PAPER_DIR):
        for topic_dir in os.listdir(PAPER_DIR):
            topic_path = os.path.join(PAPER_DIR, topic_dir)
            if os.path.isdir(topic_path):
                papers_file = os.path.join(topic_path, "papers_info.json")
                if os.path.exists(papers_file):
                    folders.append(topic_dir)
    
    # 간단한 마크다운 리스트 생성
    content = "# Available Topics\n\n"
    if folders:
        for folder in folders:
            content += f"- {folder}\n"
        content += f"\nUse @{folder} to access papers in that topic.\n"
    else:
        content += "No topics found.\n"
    
    return content

@mcp.resource("papers://{topic}")
def get_topic_papers(topic: str) -> str:
    """
    Get detailed information about papers on a specific topic.
    
    Args:
        topic: The research topic to retrieve papers for
    """
    topic_dir = topic.lower().replace(" ", "_")
    papers_file = os.path.join(PAPER_DIR, topic_dir, "papers_info.json")
    
    if not os.path.exists(papers_file):
        return f"# No papers found for topic: {topic}\n\nTry searching for papers on this topic first."
    
    try:
        with open(papers_file, 'r') as f:
            papers_data = json.load(f)
        
        # 논문 상세 정보를 포함한 마크다운 콘텐츠 생성
        content = f"# Papers on {topic.replace('_', ' ').title()}\n\n"
        content += f"Total papers: {len(papers_data)}\n\n"
        
        for paper_id, paper_info in papers_data.items():
            content += f"## {paper_info['title']}\n"
            content += f"- Paper ID: {paper_id}\n"
            content += f"- Authors: {', '.join(paper_info['authors'])}\n"
            content += f"- Published: {paper_info['published']}\n"
            content += f"- PDF URL: [{paper_info['pdf_url']}]({paper_info['pdf_url']})\n\n"
            content += f"### Summary\n{paper_info['summary'][:500]}...\n\n"
            content += "---\n\n"
        
        return content

    except json.JSONDecodeError:
        return f"# Error reading papers data for {topic}\n\nThe papers data file is corrupted."
```

### Prompt Template 참고

```python
@mcp.prompt()
def generate_search_prompt(topic: str, num_papers: int = 5) -> str:
    """Claude가 특정 주제에 대한 학술 논문을 찾고 분석할 수 있도록 프롬프트를 생성한다."""
    return f"""'{topic}'에 관한 학술 논문 {num_papers}편을 search_papers 도구를 사용하여 검색하고 분석하라. 다음 지침을 따르라:
    1. 먼저, search_papers(topic='{topic}', max_results={num_papers}) 명령을 통해 논문을 검색한다.
    2. 검색된 각 논문에 대해 다음 정보를 추출하여 정리한다:
       - 논문 제목
       - 저자
       - 출판일
       - 주요 결과에 대한 간략한 요약
       - 핵심 기여 또는 혁신 내용
       - 사용된 연구 방법론
       - '{topic}'와의 관련성
    
    3. 다음 내용을 포함하는 종합 요약을 제공한다:
       - '{topic}' 분야의 현재 연구 동향 개요
       - 논문 간에 공통적으로 나타나는 주제 및 트렌드
       - 주요한 연구 공백 또는 향후 연구가 필요한 영역
       - 이 분야에서 가장 영향력 있거나 중요한 논문
    
    4. 결과는 명확하고 구조화된 형식으로 정리하며, 읽기 쉽게 제목과 불릿 포인트를 사용한다.
    
    각 논문에 대한 자세한 정보와 함께, '{topic}' 분야의 연구 지형에 대한 고차원적인 통찰도 함께 제공하라."""
```

## Research_Sever

In [None]:
%%writefile mcp_project/research_server.py

import arxiv
import json
import os
from typing import List
from mcp.server.fastmcp import FastMCP

# papers 디렉토리 경로
PAPER_DIR = "../../papers"

# FastMCP 서버 초기화
mcp = FastMCP("research")

@mcp.tool()
def search_papers(topic: str, max_results: int = 5) -> List[str]:
    """
    주제를 기반으로 arXiv에서 논문을 검색하고 해당 정보를 저장합니다.

    Args:
        topic: 검색할 주제
        max_results: 검색할 최대 결과 수 (기본값: 5)

    Returns:
        검색된 논문의 ID 리스트
    """
    client = arxiv.Client()
    search = arxiv.Search(
        query=topic,
        max_results=max_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
    papers = client.results(search)

    path = os.path.join(PAPER_DIR, topic.lower().replace(" ", "_"))
    os.makedirs(path, exist_ok=True)
    file_path = os.path.join(path, "papers_info.json")

    try:
        with open(file_path, "r") as json_file:
            papers_info = json.load(json_file)
    except (FileNotFoundError, json.JSONDecodeError):
        papers_info = {}

    paper_ids = []
    for paper in papers:
        pid = paper.get_short_id()
        paper_ids.append(pid)
        papers_info[pid] = {
            'title': paper.title,
            'authors': [author.name for author in paper.authors],
            'summary': paper.summary,
            'pdf_url': paper.pdf_url,
            'published': str(paper.published.date())
        }

    with open(file_path, "w") as json_file:
        json.dump(papers_info, json_file, indent=2)

    print(f"결과가 다음 위치에 저장되었습니다: {file_path}")
    return paper_ids

@mcp.tool()
def extract_info(paper_id: str) -> str:
    """
    모든 주제 디렉토리에서 특정 논문 ID에 대한 정보를 검색합니다.

    Args:
        paper_id: 검색할 논문의 ID

    Returns:
        논문 정보를 담은 JSON 문자열(찾지 못할 경우 오류 메시지)
    """
    for item in os.listdir(PAPER_DIR):
        item_path = os.path.join(PAPER_DIR, item)
        if os.path.isdir(item_path):
            file_path = os.path.join(item_path, "papers_info.json")
            if os.path.isfile(file_path):
                try:
                    with open(file_path, "r") as json_file:
                        papers_info = json.load(json_file)
                        if paper_id in papers_info:
                            return json.dumps(papers_info[paper_id], indent=2)
                except (FileNotFoundError, json.JSONDecodeError) as e:
                    print(f"{file_path} 파일 읽기 오류: {e}")
                    continue
    return f"{paper_id} 논문에 대한 저장된 정보를 찾을 수 없습니다."

@mcp.resource("papers://folders")
def get_available_folders() -> str:
    """
    papers 디렉토리의 사용 가능한 주제 폴더 목록을 Markdown 형식으로 반환합니다.
    """
    folders = []
    if os.path.exists(PAPER_DIR):
        for topic_dir in os.listdir(PAPER_DIR):
            topic_path = os.path.join(PAPER_DIR, topic_dir)
            if os.path.isdir(topic_path) and os.path.exists(os.path.join(topic_path, "papers_info.json")):
                folders.append(topic_dir)

    content = "# Available Topics\n\n"
    if folders:
        for folder in folders:
            content += f"- {folder}\n"
        content += f"\nUse @<topic> to access papers in that topic.\n"
    else:
        content += "No topics found.\n"
    return content

@mcp.resource("papers://{topic}")
def get_topic_papers(topic: str) -> str:
    """
    특정 주제(topic)에 대한 논문 정보를 Markdown 형식으로 반환합니다.

    Args:
        topic: 조회할 연구 주제
    """
    topic_dir = topic.lower().replace(" ", "_")
    papers_file = os.path.join(PAPER_DIR, topic_dir, "papers_info.json")

    if not os.path.exists(papers_file):
        return f"# No papers found for topic: {topic}\n\nTry searching for papers on this topic first."

    try:
        with open(papers_file, 'r') as f:
            papers_data = json.load(f)
    except json.JSONDecodeError:
        return f"# Error reading papers data for {topic}\n\nThe papers data file is corrupted."

    content = f"# Papers on {topic.replace('_', ' ').title()}\n\n"
    content += f"Total papers: {len(papers_data)}\n\n"
    for pid, info in papers_data.items():
        content += f"## {info['title']}\n"
        content += f"- **Paper ID**: {pid}\n"
        content += f"- **Authors**: {', '.join(info['authors'])}\n"
        content += f"- **Published**: {info['published']}\n"
        content += f"- **PDF URL**: [{info['pdf_url']}]({info['pdf_url']})\n\n"
        content += f"### Summary\n{info['summary'][:500]}...\n\n"
        content += "---\n\n"
    return content

@mcp.prompt()
def generate_search_prompt(topic: str, num_papers: int = 5) -> str:
    """특정 주제에 대한 학술 논문 검색 및 요약을 위해 Claude에게 전달할 한국어 프롬프트를 생성합니다."""
    return f"""다음 절차에 따라 '{topic}' 주제에 대한 {num_papers}개의 학술 논문을 검색하고 정보를 정리하세요:

1. search_papers(topic='{topic}', max_results={num_papers})를 호출하여 논문 ID 목록을 가져옵니다.
2. 각 논문에 대해 아래 정보를 추출합니다:
   - 제목
   - 저자
   - 출판일
   - 주요 발견 요약
   - 기여 내용 또는 혁신점
   - 사용된 연구 방법론
   - '{topic}' 주제와의 관련성

3. 최종 결과물에는 다음 내용을 포함하세요:
   - '{topic}' 분야의 연구 현황 개요
   - 논문 간 공통 주제 및 트렌드
   - 향후 연구를 위한 주요 연구 공백
   - 이 분야에서 영향력 있는 주요 논문 목록

4. 구조화된 헤딩과 불릿 포인트를 사용하여 가독성 좋게 제시하세요."""

if __name__ == "__main__":
    # 서버 실행 (stdio 방식)
    mcp.run(transport='stdio')


## mcp_chatbot

In [9]:
%%writefile mcp_project/mcp_chatbot.py

import os
import json
import asyncio
import nest_asyncio
from dotenv import load_dotenv
from typing import List, Dict, TypedDict
from contextlib import AsyncExitStack

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import OpenAI

# 환경 변수 로드
load_dotenv()

# OpenAI API 클라이언트 초기화
import config
client = OpenAI(api_key=config.API_KEY)

# 도구 정의를 위한 TypedDict
class ToolDefinition(TypedDict):
    name: str
    description: str
    input_schema: dict

class MCP_ChatBot:
    def __init__(self):
        self.exit_stack = AsyncExitStack()
        self.available_tools: List[ToolDefinition] = []
        self.available_prompts: List[Dict] = []
        self.sessions: Dict[str, ClientSession] = {}

    async def connect_to_server(self, server_name: str, server_config: dict) -> None:
        """단일 MCP 서버에 연결하고 도구/프롬프트/리소스를 로드"""
        try:
            params = StdioServerParameters(**server_config)
            read, write = await self.exit_stack.enter_async_context(stdio_client(params))
            session = await self.exit_stack.enter_async_context(ClientSession(read, write))
            await session.initialize()

            # 도구 목록 조회
            tools_resp = await session.list_tools()
            for tool in tools_resp.tools:
                self.sessions[tool.name] = session
                self.available_tools.append({
                    "name": tool.name,
                    "description": tool.description,
                    "input_schema": tool.inputSchema
                })

            # 프롬프트 목록 조회
            prompts_resp = await session.list_prompts()
            if prompts_resp and prompts_resp.prompts:
                for prompt in prompts_resp.prompts:
                    self.sessions[prompt.name] = session
                    self.available_prompts.append({
                        "name": prompt.name,
                        "description": prompt.description,
                        "arguments": prompt.arguments
                    })

            # 리소스 목록 조회
            resources_resp = await session.list_resources()
            if resources_resp and resources_resp.resources:
                for resource in resources_resp.resources:
                    uri = str(resource.uri)
                    self.sessions[uri] = session
        except Exception as e:
            print(f"Error connecting to {server_name}: {e}")

    async def connect_to_servers(self) -> None:
        """설정 파일로부터 모든 MCP 서버에 연결"""
        try:
            with open("server_config.json", "r") as f:
                cfg = json.load(f)
            for name, params in cfg.get("mcpServers", {}).items():
                await self.connect_to_server(name, params)
        except Exception as e:
            print(f"Error loading config: {e}")
            raise

    async def process_query(self, query: str) -> None:
        """사용자 쿼리를 OpenAI로 전송, 도구 호출 및 응답 처리"""
        messages = [{"role": "user", "content": query}]
        functions = [
            {"name": t["name"], "description": t["description"], "parameters": t["input_schema"]}
            for t in self.available_tools
        ]

        while True:
            resp = client.chat.completions.create(
                model="gpt-4o-mini",
                messages=messages,
                functions=functions,
                function_call="auto",
                max_tokens=2024,
                temperature=0.7
            )
            msg = resp.choices[0].message

            # 함수 호출 요청 처리
            if getattr(msg, "function_call", None):
                name = msg.function_call.name
                args = json.loads(msg.function_call.arguments)
                print(f"Calling tool {name} with args {args}")
                session = self.sessions.get(name)
                if not session:
                    print(f"Tool '{name}' not found.")
                    break
                result = await session.call_tool(name, arguments=args)
                messages.append({"role": "function", "name": name, "content": result.content})
                continue

            # 일반 응답 출력 후 종료
            print(msg.content)
            messages.append({"role": "assistant", "content": msg.content})
            break

    async def get_resource(self, uri: str) -> None:
        """리소스 URI를 통해 MCP 세션에서 콘텐츠 가져오기"""
        session = self.sessions.get(uri)
        if not session and uri.startswith("papers://"):
            for k, s in self.sessions.items():
                if k.startswith("papers://"):
                    session = s
                    uri = k
                    break
        if not session:
            print(f"Resource '{uri}' not found.")
            return
        try:
            result = await session.read_resource(uri=uri)
            if result and result.contents:
                print(f"\nResource: {uri}\n{result.contents[0].text}")
            else:
                print("No content available.")
        except Exception as e:
            print(f"Error: {e}")

    async def list_prompts(self) -> None:
        """사용 가능한 프롬프트 목록 출력"""
        if not self.available_prompts:
            print("No prompts available.")
            return
        print("\nAvailable prompts:")
        for p in self.available_prompts:
            print(f"- {p['name']}: {p['description']}")
            if p['arguments']:
                print("  Arguments:")
                for arg in p['arguments']:
                    # PromptArgument 객체에는 .name 속성만 사용
                    name = getattr(arg, 'name', '')
                    print(f"    - {name}")

    async def execute_prompt(self, prompt_name: str, args: Dict) -> None:
        """지정된 프롬프트 실행 후 결과로 쿼리 처리"""
        session = self.sessions.get(prompt_name)
        if not session:
            print(f"Prompt '{prompt_name}' not found.")
            return
        try:
            result = await session.get_prompt(prompt_name, arguments=args)
            if result and result.messages:
                content = result.messages[0].content
                if isinstance(content, str):
                    text = content
                elif hasattr(content, 'text'):
                    text = content.text
                else:
                    text = " ".join([
                        item.text if hasattr(item, 'text') else str(item)
                        for item in content
                    ])
                print(f"\nExecuting prompt '{prompt_name}'...")
                await self.process_query(text)
        except Exception as e:
            print(f"Error: {e}")

    async def chat_loop(self) -> None:
        """사용자 상호작용 메인 루프"""
        print("\nMCP Chatbot Started!")
        print("Type queries, 'quit', '@folders', '@<topic>', '/prompts', '/prompt <name> <arg=value>'")
        while True:
            q = input("\nQuery: ").strip()
            if not q:
                continue
            if q.lower() == 'quit':
                break
            if q.startswith("@"):
                topic = q[1:]
                uri = "papers://folders" if topic == "folders" else f"papers://{topic}"
                await self.get_resource(uri)
                continue
            if q.startswith("/"):
                parts = q.split()
                cmd = parts[0].lower()
                if cmd == '/prompts':
                    await self.list_prompts()
                elif cmd == '/prompt':
                    if len(parts) < 2:
                        print("Usage: /prompt <name> <arg=value> ...")
                        continue
                    pname = parts[1]
                    args = {}
                    for arg in parts[2:]:
                        if '=' in arg:
                            key, val = arg.split('=', 1)
                            args[key] = val
                    await self.execute_prompt(pname, args)
                else:
                    print(f"Unknown command: {cmd}")
                continue
            await self.process_query(q)

    async def cleanup(self) -> None:
        """리소스 정리"""
        await self.exit_stack.aclose()

async def main() -> None:
    nest_asyncio.apply()
    chatbot = MCP_ChatBot()
    try:
        await chatbot.connect_to_servers()
        await chatbot.chat_loop()
    finally:
        await chatbot.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

Overwriting mcp_project/mcp_chatbot.py
