# Langchain Agent with MCP Tools (비동기 버전)

이 노트북은 **Langchain**과 **Databricks**를 활용하여 MCP(Model Context Protocol) 기반 Tool-calling Agent를 구현하는 예제입니다.

## 왜 비동기(Async) 버전인가?

> ⚠️ **중요**: Langchain의 MCP Tool은 **비동기(Async) 방식만 지원**합니다. 따라서 MCP 서버와 연동하는 Agent를 구현할 때는 비동기 패턴을 사용해야 합니다. 이 노트북은 비동기 Agent 구현의 표준 예제입니다.

## 이 노트북에서 다루는 내용

- Langchain + Databricks MCP Client를 활용한 Agent 구현
- MLflow의 `ResponsesAgent`를 사용한 래퍼 클래스 작성
- Agent 출력 테스트 및 스트리밍 처리
- Mosaic AI Agent Evaluation을 통한 평가
- Unity Catalog에 모델 등록 및 배포

## 실행 환경

- Databricks Serverless 또는 DBR 17 이상 클러스터
- Unity Catalog 활성화 필수

## 사전 요구사항

- 이 노트북의 모든 `TODO` 항목을 확인하고 수정하세요.

In [0]:
%pip install -U -qqqq backoff databricks-langchain uv databricks-agents mlflow-skinny[databricks]
dbutils.library.restartPython()

## Agent 코드 정의

아래에서 Agent 코드를 단일 셀에 정의합니다. `%%writefile` 매직 명령어를 사용하여 로컬 Python 파일로 저장하면, 이후 MLflow 로깅 및 배포 시 활용할 수 있습니다.

### 주요 구성 요소
- **ChatDatabricks**: Databricks Foundation Model API 연동
- **DatabricksMCPServer**: MCP 서버 연결 설정
- **create_agent**: Langchain Agent 생성

Agent에 추가할 수 있는 도구 예제는 [공식 문서](https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html)를 참고하세요.

In [0]:
%%writefile mcp_langchain_agent.py
import asyncio
from typing import Optional
from functools import lru_cache

from databricks_langchain import ChatDatabricks
from databricks.sdk import WorkspaceClient
from databricks_langchain import (
    DatabricksMCPServer,
    DatabricksMultiServerMCPClient,
)
from langchain.agents import create_agent

DEFAULT_SYSTEM_PROMPT = """당신은 MCP(Model Context Protocol) 도구를 활용하는 AI 어시스턴트입니다.

## MCP Tool 사용 기준
- API 관련정보를 알고 싶으면 search 툴을 사용하세요.
  ex) "API 인증 방법을 알려주세요"
"""


def create_mcp_agent(
    model_name: str = "databricks-gpt-5-2",
    connection_name: str = "my-mcp-server",
    system_prompt: Optional[str] = None,
):
    """MCP 기반 Langchain Agent 생성 팩토리 함수
    
    Args:
        model_name: Databricks Foundation Model 엔드포인트 이름
        connection_name: Unity Catalog에 등록된 MCP 서버 연결 이름
        system_prompt: 커스텀 시스템 프롬프트 (선택사항)
    
    Returns:
        Langchain Agent 인스턴스
    """
    
    # 모델 설정
    model = ChatDatabricks(endpoint=model_name)
    
    # MCP 클라이언트 설정
    ws = WorkspaceClient()
    host = ws.config.host
    mcp_server_url = f"{host}/api/2.0/mcp/external/{connection_name}"
    
    mcp_server = DatabricksMCPServer(
        name="mcp_server",
        url=mcp_server_url,
        workspace_client=ws,
    )
    client = DatabricksMultiServerMCPClient(servers=[mcp_server])
    
    # Tools 로드 (비동기 - Langchain MCP Tool은 비동기만 지원)
    tools = asyncio.run(client.get_tools())

    for tool in tools:
        tool.handle_tool_error = lambda e: f"Tool error: {e}. 다른 방법을 시도해 주세요."
    
    # Agent 생성
    return create_agent(
        model=model,
        tools=tools,
        system_prompt=system_prompt or DEFAULT_SYSTEM_PROMPT
    )


@lru_cache(maxsize=1)
def get_default_agent():
    """기본 Agent 캐싱 (싱글톤)"""
    return create_mcp_agent()


In [0]:
%%writefile mcp_langchain_wrapper_agent.py
"""
MCP Langchain Agent Wrapper

MLflow ResponsesAgent를 래핑하여 Langchain의 비동기 Agent를 
동기 인터페이스로 변환합니다.

Langchain의 MCP Tool은 비동기만 지원하므로, 이 래퍼 클래스를 통해
MLflow의 predict/predict_stream 인터페이스와 호환되도록 합니다.
"""
from typing import Any, Generator, AsyncGenerator
import asyncio
import json

import mlflow

from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
    to_chat_completions_input,
    create_text_output_item,
    output_to_responses_items_stream,
)

from mcp_langchain_agent import get_default_agent, create_mcp_agent
from uuid import uuid4

from langchain_core.messages.ai import AIMessageChunk
from langchain_core.messages.tool import ToolMessage

class MCPWrappedResponsesAgent(ResponsesAgent):
    """비동기 Langchain Agent를 MLflow ResponsesAgent로 래핑하는 클래스"""

    def __init__(self):
        self.agent = get_default_agent()

    # Make a prediction (single-step) for the agent
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done" or event.type == "error"
        ]
        return ResponsesAgentResponse(output=outputs, custom_outputs=request.custom_inputs)

    async def _predict_stream_async(
        self,
        request: ResponsesAgentRequest,
    ) -> AsyncGenerator[ResponsesAgentStreamEvent, None]:
        cc_msgs = to_chat_completions_input([i.model_dump() for i in request.input])
        # Stream events from the agent graph
        async for event in self.agent.astream(
            {"messages": cc_msgs}, stream_mode=["updates", "messages"]
        ):
            print(f"\n{event=}\n")
            if event[0] == "updates":
                # Stream updated messages from the workflow nodes
                for node_data in event[1].values():
                    if len(node_data.get("messages", [])) > 0:
                        all_messages = []
                        for msg in node_data["messages"]:
                            if isinstance(msg, ToolMessage) and not isinstance(msg.content, str):
                                msg.content = json.dumps(msg.content)
                            all_messages.append(msg)
                        for item in output_to_responses_items_stream(all_messages):
                            yield item
            elif event[0] == "messages":
                # Stream generated text message chunks
                try:
                    chunk = event[1][0]
                    if isinstance(chunk, AIMessageChunk) and (content := chunk.content):
                        print(f"\n{content=}\n")
                        yield ResponsesAgentStreamEvent(
                            **self.create_text_delta(delta=content, item_id=chunk.id),
                        )
                except Exception as e:
                    print(f"DEBUG: Error in messages handling: {e}")

    # Stream predictions for the agent, yielding output as it's generated
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        agen = self._predict_stream_async(request)

        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        ait = agen.__aiter__()

        while True:
            try:
                item = loop.run_until_complete(ait.__anext__())
            except StopAsyncIteration:
                break
            else:
                yield item

# ------------------------------------------------------------------------------
# MLflow model registration
# ------------------------------------------------------------------------------
mlflow.langchain.autolog()
AGENT = MCPWrappedResponsesAgent()
mlflow.models.set_model(AGENT)


## Agent 테스트

Agent와 상호작용하여 출력을 테스트합니다. `ResponsesAgent` 내에서 메서드를 수동으로 추적했기 때문에, Agent가 수행하는 각 단계의 트레이스를 확인할 수 있습니다. Langchain을 통한 LLM 호출은 autologging에 의해 자동으로 추적됩니다.

아래의 예시 입력을 실제 도메인에 맞는 예시로 변경하여 테스트하세요.

In [0]:
dbutils.library.restartPython()

In [0]:
import nest_asyncio
nest_asyncio.apply()

In [0]:
from mcp_langchain_wrapper_agent import AGENT

# 단일 응답 테스트 (비스트리밍)
AGENT.predict(
    {
        "input": [{"role": "user", "content": "사용 가능한 API 목록을 알려주세요."}], 
        "custom_inputs": {"session_id": "test-session-123"}
    },
)

In [0]:
from mcp_langchain_wrapper_agent import AGENT

# 스트리밍 응답 테스트
for chunk in AGENT.predict_stream(
    {
        "input": [{"role": "user", "content": "사용 가능한 API 목록을 알려주세요. 상세 정보와 예제 코드도 포함해주세요."}], 
        "custom_inputs": {"session_id": "test-session-123"}
    },
):
    print("="*60)
    print(chunk.model_dump(exclude_none=True))

### Agent를 MLflow 모델로 로깅

배포 시 자동 인증 패스스루를 위해 Databricks 리소스를 지정합니다.

- **TODO**: Unity Catalog 함수가 [Vector Search 인덱스](https://docs.databricks.com/generative-ai/agent-framework/unstructured-retrieval-tools.html)를 쿼리하거나 [외부 함수](https://docs.databricks.com/generative-ai/agent-framework/external-connection-tools.html)를 활용하는 경우, 해당 Vector Search 인덱스와 UC 연결 객체를 리소스로 포함해야 합니다. 자세한 내용은 [공식 문서](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough)를 참고하세요.

Agent 코드를 Python 파일에서 로깅합니다. [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code) 참고.

In [0]:
# 배포 시 자동 인증 패스스루를 위한 Databricks 리소스 설정
import mlflow
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint, DatabricksUCConnection
from pkg_resources import get_distribution

# TODO: 환경에 맞게 아래 설정값을 수정하세요
LLM_ENDPOINT_NAME = "databricks-gpt-5-2"  # 사용할 LLM 엔드포인트
MCP_SERVER_CONNECTION_NAME = "my-mcp-server"  # Unity Catalog에 등록된 MCP 서버 연결 이름

resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksUCConnection(connection_name=MCP_SERVER_CONNECTION_NAME)
]

# 입력 예시 (모델 시그니처 추론용)
input_example = {
    "input": [
        {
            "role": "user",
            "content": "사용 가능한 API 목록을 알려주세요."
        }
    ],
    "custom_inputs": {
        "session_id": "test-session-123"
    }
}

# MLflow에 Agent 로깅
with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        name="mcp_langchain_agent",
        python_model="mcp_langchain_wrapper_agent.py",
        input_example=input_example,
        pip_requirements=[
            "databricks-langchain",
            "backoff",
            f"databricks-connect=={get_distribution('databricks-connect').version}",
        ],
        code_paths=[
            "mcp_langchain_agent.py",
        ],
        resources=resources,
    )

## [Agent Evaluation](https://docs.databricks.com/mlflow3/genai/eval-monitor)으로 Agent 평가

평가 데이터셋의 요청이나 예상 응답을 수정하고, Agent를 반복 개선하면서 평가를 실행할 수 있습니다. MLflow를 활용하여 계산된 품질 메트릭을 추적합니다.

[사전 정의된 LLM 스코어러](https://docs.databricks.com/mlflow3/genai/eval-monitor/predefined-judge-scorers)를 사용하거나, [커스텀 메트릭](https://docs.databricks.com/mlflow3/genai/eval-monitor/custom-scorers)을 추가하여 Agent를 평가하세요.

In [0]:
import mlflow
from mlflow.genai.scorers import RelevanceToQuery, Safety, RetrievalRelevance, RetrievalGroundedness

# 평가 데이터셋 정의
# TODO: 실제 사용 사례에 맞게 질문과 예상 응답을 수정하세요
eval_dataset = [
    {
        "inputs": {
            "input": [
                {
                    "role": "user",
                    "content": "사용 가능한 API 목록을 알려주세요."
                }
            ]
        },
        "expected_response": None
    },
    {
        "inputs": {
            "input": [
                {
                    "role": "user",
                    "content": "인증 방법을 알려주세요."
                }
            ]
        },
        "expected_response": None
    }
]

# Agent 평가 실행
eval_results = mlflow.genai.evaluate(
    data=eval_dataset,
    predict_fn=lambda input: AGENT.predict({"input": input, "custom_inputs": {"session_id": "evaluation-session"}}),
    scorers=[RelevanceToQuery(), Safety()],  # 필요에 따라 스코어러 추가
)

# MLflow UI에서 평가 결과 확인 (콘솔 출력 참조)

## 배포 전 Agent 검증

Agent를 등록하고 배포하기 전에 [mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API를 통해 사전 배포 검증을 수행합니다. 자세한 내용은 [공식 문서](https://docs.databricks.com/machine-learning/model-serving/model-serving-debug.html#validate-inputs)를 참고하세요.

In [0]:
# 배포 전 검증 - 로깅된 모델이 정상적으로 예측을 수행하는지 확인
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/mcp_langchain_agent",
    input_data={
        "input": [{"role": "user", "content": "사용 가능한 API 목록을 알려주세요."}], 
        "custom_inputs": {"session_id": "validation-session"}
    },
    env_manager="uv",
)

## Unity Catalog에 모델 등록

아래의 `catalog`, `schema`, `model_name`을 업데이트하여 MLflow 모델을 Unity Catalog에 등록합니다.

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: Unity Catalog 모델의 catalog, schema, model_name을 환경에 맞게 수정하세요
catalog = "ml"
schema = "default"
model_name = "mcp-langchain-agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# Unity Catalog에 모델 등록
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

## Agent 배포

Agent를 Databricks Model Serving 엔드포인트로 배포합니다.

In [0]:
from databricks import agents

# 참고: scale_to_zero=True를 전달하면 비용 절감을 위한 스케일 투 제로가 활성화됩니다.
# 프로덕션 워크로드에는 권장되지 않습니다 (스케일 제로 상태에서는 용량이 보장되지 않음).
# 스케일 제로 상태의 엔드포인트는 다시 스케일업하는 동안 응답 시간이 길어질 수 있습니다.
agents.deploy(
    UC_MODEL_NAME, 
    uc_registered_model_info.version, 
    tags={"endpointSource": "local_development"}
)

## 다음 단계

Agent가 배포되면 다음 작업을 수행할 수 있습니다:

- **AI Playground에서 테스트**: 추가 검증을 위해 대화형으로 Agent 테스트
- **피드백 수집**: 조직 내 전문가(SME)와 Agent 공유하여 피드백 수집
- **프로덕션 애플리케이션에 통합**: 실제 서비스에 Agent 임베딩

자세한 내용은 [공식 문서](https://docs.databricks.com/generative-ai/deploy-agent.html)를 참고하세요.