<a href="https://colab.research.google.com/gist/gmsharpe/67eaa061d9c37ff447e64ba9bab21298/simple-mcp-example-using-ollama-medium.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Anthropic의 Model Context Protocol (MCP) 사용하기

# 소개
Model Context Protocol을 사용하여 간단한 서버와 클라이언트를 만들어 보겠습니다.

### 종속성 설치

In [None]:
# import sys
# !{sys.executable} -m pip install langchain-openai mcp anyio click httpx uvicorn anyio --quiet

### Azure OpenAI 설정
LangChain의 AzureOpenAI 클래스를 사용하여 Azure OpenAI 서비스에 연결합니다.

In [None]:
from langchain_openai import AzureChatOpenAI
import os
from dotenv import load_dotenv

load_dotenv()

# Azure OpenAI 클라이언트 초기화
llm = AzureChatOpenAI(
    azure_deployment="gpt-4.1",
    api_version="2024-02-15-preview",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
)

In [None]:
# 로깅 추가
import asyncio
import json
import logging
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client

logging.basicConfig(
    level=logging.CRITICAL,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logging.getLogger('httpcore').setLevel(logging.CRITICAL)
logging.getLogger('httpx').setLevel(logging.CRITICAL)
logging.getLogger('mcp').setLevel(logging.CRITICAL)

logger = logging.getLogger(__name__)

# 간단한 테스트로 연결 확인

In [None]:
from langchain_core.messages import HumanMessage

messages = [HumanMessage(content="오늘 뉴스 알려주세요")]
response = llm.invoke(messages)
print(response.content)

---
현재 아무런 tool 도 사용하지 않고 LLM에 '오늘 뉴스' 를 알려달라고 하면 예전의 정보를 알려줍니다.  
LLM은 학습에 사용된 데이터까지의 정보만 알고있기 때문에, 최신 정보를 알 수 없기 때문입니다.

## Model Context Protocol (MCP)

### MCP 서버 생성 `[server.py]`

In [None]:
%%writefile server.py
import mcp.types as types

from mcp import Tool
from mcp.server.sse import SseServerTransport
from mcp.server import Server

from starlette.applications import Starlette
from starlette.routing import Route

import uvicorn
import httpx
from dotenv import load_dotenv

load_dotenv()


app = Server("mcp-server")
sse = SseServerTransport("/messages")

port = 8010

In [None]:
%%writefile -a server.py

# SSL 경고 무시 (선택사항)
import warnings
warnings.filterwarnings('ignore', message='Unverified HTTPS request')

#### 에이전트의 '도구'로 사용할 `fetch_website` 함수

In [None]:
%%writefile -a server.py

async def fetch_website(
        url: str,
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    headers = {
        "User-Agent": "MCP Test Server (github.com/modelcontextprotocol/python-sdk)"
    }
    async with httpx.AsyncClient(follow_redirects=True, headers=headers, verify=False) as client:
        response = await client.get(url)
        response.raise_for_status()
        return [types.TextContent(type="text", text=response.text)]

In [None]:
%%writefile -a server.py

@app.call_tool()
async def call_tool(
  name: str, arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    if name == "fetch":
        if "url" not in arguments:
            raise ValueError("Missing required argument 'url'")
        return await fetch_website(arguments["url"])
    else:
        raise ValueError(f"Unknown tool '{name}'")

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="fetch",
            description="Fetches a website and returns its content",
            inputSchema={
                "type": "object",
                "required": ["url"],
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "URL to fetch",
                    }
                },
            },
        )
    ]


#### MCP tool로 웹 검색을 하고 그 결과를 스트리밍하기 위한 별도 서버 구성

MCP의 데이터 통신 방식은 크게 두 가지 입니다.  
하나는 stdio로, MCP 서버가 MCP host와 동일한 컴퓨터/환경에서 실행될 때 사용하는 방식입니다.  
또 하나는 SSE (현재는 streamableHTTP 라는 방식으로 업그레이드 되었습니다) 방식으로, MCP 서버가 MCP host와 독립적으로 실행되는 경우입니다.  

이번 예제에서는 SSE 방식을 사용하고, 로컬 환경에 MCP 서버 역할을 하는 별도의 서버를 띄워서 이 서버와 통신하는 방식으로 구현해보도록 하겠습니다.

In [None]:
%%writefile -a server.py

async def handle_sse(request):
    async with sse.connect_sse(
            request.scope, request.receive, request._send
    ) as streams:
        await app.run(
            streams[0], streams[1], app.create_initialization_options()
        )

async def handle_messages(request):
    await sse.handle_post_message(request.scope, request.receive, request._send)

starlette_app = Starlette(
    debug=True,
    routes=[
        Route("/sse", endpoint=handle_sse, methods=["GET"]),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ],
)

# uvicorn으로 서버 실행
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(starlette_app, host="127.0.0.1", port=port, log_level="error", access_log=False)

### 서버 시작

In [None]:
import threading
import subprocess
import time

def run_mcp_server():
    subprocess.Popen(["python", "server.py"])

thread = threading.Thread(target=run_mcp_server)
thread.start()
time.sleep(5)

In [None]:
# MCP에서 반환된 도구 스키마를 Azure OpenAI 호환 스키마로 변환하는 함수 정의
def convert_tool_format(tools):
    """
    도구를 Azure OpenAI에 필요한 형식으로 변환합니다.

    Args:
        tools (list): 도구 객체 리스트

    Returns:
        dict: Azure OpenAI에 필요한 형식의 도구
    """
    converted_tools = []

    for tool in tools:
        converted_tool = {
            'type': 'function',
            'function': {
                'name': tool.name,
                'description': tool.description,
                'parameters': tool.inputSchema
            }
        }

        converted_tools.append(converted_tool)

    return converted_tools

In [None]:
# 연결 테스트
# 서버가 떠있으면 Not found 에러가 반환되어 와야함
!curl 127.0.0.1:8010

In [None]:
import uuid

async def call_tool(session, response, messages):
    # 도구 사용 요청. 도구를 호출하고 결과를 모델에 전송합니다.
    tool_calls = response.tool_calls
    if not tool_calls:
        raise ValueError("응답에서 도구 요청을 찾을 수 없습니다")

    for tool_call in tool_calls:
        tool_name = tool_call["name"]
        tool_args = tool_call["args"]

        print(f"\n[도구 호출] {tool_name} - URL: {tool_args.get('url', 'N/A')}")

        try:
            # MCP 세션을 통해 도구 호출
            tool_response = await session.call_tool(tool_name, tool_args)

            # 응답 길이 확인 (디버깅용)
            response_str = str(tool_response)
            print(f"[도구 응답] 길이: {len(response_str)} 문자")

            # 도구 응답을 예상 형식으로 변환
            tool_result = {
                "toolUseId": tool_name + str(uuid.uuid4()),
                "content": [{"text": response_str}]
            }

        except Exception as err:
            print(f"[오류] 도구 호출 실패: {str(err)}")
            tool_result = {
                "toolUseId": tool_call["id"],
                "content": [{"text": f"오류: {str(err)}"}],
                "status": "error"
            }

        # 도구 결과를 메시지에 추가
        from langchain_core.messages import ToolMessage
        messages.append(ToolMessage(
            content=str({"toolResult": tool_result}),
            tool_call_id=tool_call["id"]
        ))

    return messages

In [None]:
async def converse_using_azure_openai(session, messages, tools):
    iteration = 0
    while True:
        iteration += 1
        converted_tools = convert_tool_format(tools)
        
        print(f"\n{'='*60}")
        print(f"[반복 {iteration}] LLM 호출 중...")
        print(f"{'='*60}")

        # Azure OpenAI에 도구를 바인딩하여 호출
        llm_with_tools = llm.bind_tools(converted_tools)
        response = llm_with_tools.invoke(messages)

        print(f"\n[LLM 응답]")
        if response.content:
            # 응답이 너무 길면 잘라서 표시
            content = response.content
            if len(content) > 500:
                print(f"{content[:500]}... (총 {len(content)}자)")
            else:
                print(content)
        
        if response.tool_calls:
            print(f"[도구 사용 요청] {len(response.tool_calls)}개의 도구 호출")

        messages.append(response)

        if response.tool_calls:
            await call_tool(session, response, messages)
        else:
            print(f"\n{'='*60}")
            print("✓ 완료: 더 이상 도구 사용 요청이 없습니다")
            print(f"{'='*60}\n")
            break

In [None]:
from langchain_core.messages import SystemMessage, HumanMessage

async def complete(message):
    logger.info("세션 시작")
    messages = []

    async with sse_client("http://localhost:8010/sse") as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            try:
                await session.initialize()
                # 세션이 초기화될 수 있도록 잠시 대기
                await asyncio.sleep(1)
                logger.info("세션 초기화 완료")

                # 사용 가능한 도구 목록을 가져오고 직렬화 가능한 형식으로 변환
                tools_result = await session.list_tools()
                tools_list = [{"name": tool.name, "description": tool.description,
                              "inputSchema": tool.inputSchema} for tool in tools_result.tools]
                logger.info("사용 가능한 도구: %s", tools_list)

                system_message = SystemMessage(content=f"당신은 유용한 AI 어시스턴트입니다. 다음 도구를 사용할 수 있습니다: {json.dumps(tools_list, ensure_ascii=True)} 프롬프트(사용자)의 질문에 답하기 위해 필요한 경우 이 도구들을 사용하세요.")
                messages.append(system_message)
                messages.append(message)
            except TypeError as err:
                logger.error("도구 호출 실패 - 오류: %s", str(err))
                tool_result = {
                    "content": [{"text": f"오류: {str(err)}"}],
                    "status": "error"
                }
            await converse_using_azure_openai(session, messages, tools_result.tools)

In [None]:
message = HumanMessage(content="오늘 ytn 뉴스 알려줘")

await complete(message)