# Claude + Genie を使用したエージェントシステム(Mosaic AI Agent Framework)

このノートブックでは、Mosaic AI Agent Framework と [LangGraph](https://blog.langchain.dev/langgraph-multi-agent-workflows/) を使用してマルチエージェントシステムを構築する方法を説明します。ここで、[Genie](https://www.databricks.com/product/ai-bi/genie) はエージェントの 1 つです。またSupervisortとしてDatabricks内で利用可能な[AnthropicのClaude](https://www.databricks.com/jp/blog/anthropic-claude-37-sonnet-now-natively-available-databricks)を採用しております。
このノートブックでは、以下の操作を行います。
1. LangGraph を使用してマルチエージェントシステムを作成します。
1. Databricks の機能との互換性を確保するために、LangGraph エージェントを MLflow の `ChatAgent` でラップします。
1. マルチエージェントシステムの出力を手動でテストします。
1. マルチエージェントシステムのログを取得し、デプロイします。

この例は、[LangGraph ドキュメント - マルチエージェント スーパーバイザーの例](https://github.com/langchain-ai/langgraph/blob/main/docs/docs/tutorials/multi_agent/agent_supervisor.ipynb)を基にしています。

## Claudeとは

Claude 3.7 Sonnetは、Anthropicがこれまでに開発した中で最も高度なAIモデルです。複雑な推論やマルチステップの計画立案、長い文脈を伴う対話、文書やデータの深い理解といった高度なタスクにおいて、業界トップクラスの性能を発揮します。Claude 3.7 Sonnetが、AWS・Azure・GCP上のDatabricksでネイティブに利用可能になりました。推論・計画・エージェントタスクに特化したAnthropicの最先端モデルに、安全かつガバナンスの効いた形でアクセスできます。

## Genie エージェントを使用する理由

マルチエージェントシステムは、それぞれに専門能力を持つ複数のAIエージェントで構成されています。Genieは、そのエージェントの1つであり、ユーザーが自然言語を使用して構造化データとやりとりすることを可能にします。

あらかじめ定義されたクエリのみを実行できるSQL関数とは異なり、Genieはユーザーの質問に答えるために新しいクエリを作成できる柔軟性があります。

## 前提条件

- このノートブック内のすべての「TODO」を処理する。(Genie SPACE ID/ PAT / Modelを保存するCatalog)
- Genie Spaceを作成します。Databricksのドキュメントを参照してください（[AWS](https://docs.databricks.com/aws/genie/set-up) | [Azure](https://learn.microsoft.com/azure/databricks/genie/set-up))。

In [0]:
%pip install -U -qqq mlflow langgraph==0.3.4 databricks-langchain databricks-agents uv
dbutils.library.restartPython()


## マルチエージェントシステムを定義する

LangGraphで、以下のエージェントノードを指示するスーパーバイザーエージェントノードを使用してマルチエージェントシステムを作成します。
- **GenieAgent**: 構造化データに対してクエリーを実行し、推論を行うGenieエージェント。
- **Tool-calling agent**: Unityカタログ機能ツールを呼び出すエージェント。(今回のノートブックでは利用しません)

LangGraph エージェントを `ChatAgent` インターフェイスでラップする

Databricks では、Databricks の AI 機能との互換性を確保し、オープンソース標準を使用して複数ターン型の会話型エージェントの作成を簡略化するために、`ChatAgent` の使用を推奨しています。 

LangGraphChatAgent` クラスは、LangGraph エージェントをラップするために `ChatAgent` インターフェイスを実装します。

MLflowの[ChatAgentドキュメント](https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.ChatAgent)を参照してください。

エージェントコードをファイルに書き込む

以下の単一セルでエージェントコードを定義します。これにより、`%%writefile`マジックコマンドを使用して、エージェントコードをローカルPythonファイルに書き込むことができ、その後のログ取得やデプロイに使用できます。


In [0]:
genie_agent_description = "This agent can answer questions about data. but please use only one"
#code_agent_description = "今回は利用しません"
worker_descriptions = {
    "Genie": genie_agent_description,
    #"Coder": code_agent_description,   #このデモでは、Coderを利用せずGenie のみをAgentとして利用するため外しておきます。
}

formatted_descriptions = "\n".join(
    f"- {name}: {desc}" for name, desc in worker_descriptions.items()
)
print(formatted_descriptions)


In [0]:
%%writefile agent.py
import functools
import os
from typing import Any, Generator, Literal, Optional

import mlflow
from databricks.sdk import WorkspaceClient
from databricks_langchain import (
    ChatDatabricks,
    DatabricksFunctionClient,
    UCFunctionToolkit,
    set_uc_function_client,
)
from databricks_langchain.genie import GenieAgent
from langchain_core.runnables import RunnableLambda
from langgraph.graph import END, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import create_react_agent
from mlflow.langchain.chat_agent_langgraph import ChatAgentState
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
    ChatAgentChunk,
    ChatAgentMessage,
    ChatAgentResponse,
    ChatContext,
)
from pydantic import BaseModel

mlflow.langchain.autolog()

###################################################
## Create a GenieAgent with access to a Genie Space
###################################################

# TODO add GENIE_SPACE_ID and a description for this space
GENIE_SPACE_ID = "01efbd0fe8711ecd80e48dcbc4042f28"
genie_agent_description = "This agent can answer questions about user's inquiry data."

genie_agent = GenieAgent(
    genie_space_id=GENIE_SPACE_ID,
    genie_agent_name="Genie",
    description=genie_agent_description,
    # DB_MODEL_SERVING_HOST_URL is set on an agent endpoints but doesn't exist in the notebook
    client=WorkspaceClient(
        host=os.getenv("DATABRICKS_HOST") or os.getenv("DB_MODEL_SERVING_HOST_URL"),
        token=os.getenv("DATABRICKS_GENIE_PAT"),
    ),
)


############################################
# Define your LLM endpoint and system prompt
############################################

# TODO: Replace with your model serving endpoint, multi-agent Genie works best with GPT 4o and GPT o1 models.
#LLM_ENDPOINT_NAME = "databricks-meta-llama-3-1-405b-instruct"
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"

assert LLM_ENDPOINT_NAME is not None
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)


############################################################
# Create a code agent
# You can also create agents with access to additional tools
# 今回は利用しないため、スキップしてください
############################################################
client = DatabricksFunctionClient()
set_uc_function_client(client)

tools = []

# TODO if desired, add additional tools and update the description of this agent. 
uc_tool_names = ["system.ai.*"]
uc_toolkit = UCFunctionToolkit(function_names=uc_tool_names)
tools.extend(uc_toolkit.tools)
code_agent_description = (
    "The Coder agent specializes in solving programming challenges, generating code snippets, debugging issues, and explaining complex coding concepts.",
)
code_agent = create_react_agent(llm, tools=tools)

#############################
# Define the supervisor agent
#############################

worker_descriptions = {
    "Genie": genie_agent_description,
    #"Coder": code_agent_description,
}

formatted_descriptions = "\n".join(
    f"- {name}: {desc}" for name, desc in worker_descriptions.items()
)

#system_prompt = f"Decide between routing between the following workers or ending the conversation if an answer is provided. You shouldn't repeat Genie. please finish it after the first worker. \n{formatted_descriptions}"
system_prompt = f"Decide between routing between the following workers or ending the conversation if an answer is provided. Genieに問い合わせる際にはSQLでデータを取得しやすいような質問に変換して投げてください。データ内容から判断するような命令は投げないで、最後にSupvissorが回答してください。またGenieから返ってきたデータを繰り返しGenieに質問として投げないでください。Genieへのルーティングは１っ回だけとし、もしGenieからデータが返ってこない場合、ユーザーに質問を返しFinishを選択してください"


options = ["FINISH"] + list(worker_descriptions.keys())


def supervisor_agent(state):
    class nextNode(BaseModel):
        next_node: Literal[tuple(options)]

    preprocessor = RunnableLambda(
        lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
    )
    supervisor_chain = preprocessor | llm.with_structured_output(nextNode)
    return supervisor_chain.invoke(state)



#######################################
# Define our multiagent graph structure
#######################################


def agent_node(state, agent, name):
    result = agent.invoke(state)
    return {
        "messages": [
        #"messages": state["messages"] + [
            {
                "role": "assistant",
                "content": result["messages"][-1].content,
                "name": name,
            }
        ]
    }


def final_answer(state):
    system_prompt = "Using only the content in the messages, respond to the user's question using the answer given by the other agents."
    preprocessor = RunnableLambda(
        lambda state: [{"role": "system", "content": system_prompt}] + state["messages"]
    )
    final_answer_chain = preprocessor | llm
    return {"messages": [final_answer_chain.invoke(state)]}


class AgentState(ChatAgentState):
    next_node: str


code_node = functools.partial(agent_node, agent=code_agent, name="Coder")
genie_node = functools.partial(agent_node, agent=genie_agent, name="Genie")

workflow = StateGraph(AgentState)
workflow.add_node("Genie", genie_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("final_answer", final_answer)

workflow.set_entry_point("supervisor")
# We want our workers to ALWAYS "report back" to the supervisor when done
for worker in worker_descriptions.keys():
    workflow.add_edge(worker, "supervisor")

# Let the supervisor decide which next node to go
workflow.add_conditional_edges(
    "supervisor",
    lambda x: x["next_node"],
    {**{k: k for k in worker_descriptions.keys()}, "FINISH": "final_answer"},
)
workflow.add_edge("final_answer", END)
multi_agent = workflow.compile()




###################################
# Wrap our multi-agent in ChatAgent
###################################


class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }

        messages = []
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                messages.extend(
                    ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                )
        return ChatAgentResponse(messages=messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {
            "messages": [m.model_dump_compat(exclude_none=True) for m in messages]
        }
        for event in self.agent.stream(request, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg})
                    for msg in node_data.get("messages", [])
                )


# Create the agent object, and specify it as the agent object to use when
# loading the agent back for inference via mlflow.models.set_model()
AGENT = LangGraphChatAgent(multi_agent)
mlflow.models.set_model(AGENT)

## Test the agent

エージェントとやりとりして、その出力をテストします。このノートブックを `mlflow.langchain.autolog()` と呼ぶので、エージェントが実行する各ステップのトレースを表示できます。

In [0]:
dbutils.library.restartPython()

## Create a Personal Access Token (PAT) as a Databricks secret
Genie Spaceとそのリソースにアクセスするには、PATを作成する必要があります。
- これは、お客様自身の PAT またはシステムプリンシパルの PAT（[AWS](https://docs.databricks.com/aws/en/dev-tools/auth/oauth-m2m) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/dev-tools/auth/oauth-m2m)）のいずれかです。有効期限が切れた場合は、お客様自身でこのトークンを更新する必要があります。
- エンドポイントを提供するモデルに、秘密鍵ベースの環境変数を追加します。（[AWS](https://docs.databricks.com/aws/en/machine-learning/model-serving/store-env-variable-model-serving#add-secrets-based-environment-variables) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/store-env-variable-model-serving#add-secrets-based-environment-variables))。
- 各リソースの適切な権限レベルについては、デプロイドキュメントの表を参照してください。（[AWS](https://docs.databricks.com/aws/en/generative-ai/agent-framework/deploy-agent#automatic-authentication-passthrough) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/agent-framework/deploy-agent#automatic-authentication-passthrough))
  - Genie Spaceに「CAN RUN」権限でプロビジョニング
  - Genie Spaceを動かすSQL Warehouseに「CAN USE」権限でプロビジョニング
  - Unity Catalogのテーブルに「SELECT」権限でプロビジョニング 
  - 基盤となるUnityカタログ関数に対する`EXECUTE`の提供 

In [0]:
import os

# TODO: set secret_scope_name and secret_key_name to access your PAT
secret_scope_name = "<Scope>"
secret_key_name = "<Key>"
os.environ["DATABRICKS_GENIE_PAT"] = dbutils.secrets.get(
scope=secret_scope_name, key=secret_key_name
)
assert os.environ["DATABRICKS_GENIE_PAT"] is not None, (
    "The DATABRICKS_GENIE_PAT was not properly set to the PAT secret"
)

#os.environ["DATABRICKS_GENIE_PAT"] = "<PAT>" #上記Sercetを使わず直接記載する場合

**TODO**: このプレースホルダー `input_example` を、エージェント用のドメイン固有のプロンプトに置き換えます。

In [0]:
from agent import AGENT

input_example = {
    "messages": [
        {
            "role": "user",
            "content": "2024年の月毎の問い合わせ数から傾向を分析して？",
        }
    ]
}
AGENT.predict(input_example)

In [0]:
for event in AGENT.predict_stream(input_example):
  print(event, "-----------\n")

## Log the agent as an MLflow model

`agent.py` ファイルからエージェントをコードとしてログに記録します。 [MLflow - Models from Code](https://mlflow.org/docs/latest/models.html#models-from-code) を参照してください。

### Databricks リソースの自動認証を有効にする
Databricks は、最も一般的な Databricks リソースタイプについては、ログ記録時に事前にエージェントのリソース依存関係を宣言することをサポートし、推奨しています。これにより、エージェントをデプロイした際に自動認証パススルーが有効になります。自動認証パススルーにより、Databricks はエージェントエンドポイント内からこれらのリソース依存関係に安全にアクセスするために、短時間で有効期限が切れる認証情報を自動的にプロビジョニング、ローテーション、および管理します。

自動認証を有効にするには、`mlflow.pyfunc.log_model()` を呼び出す際に依存する Databricks リソースを指定します。
  - **TODO**: Unity Catalog ツールが [ベクトル検索インデックス](docs link) を照会したり、[外部関数](docs link) を利用したりする場合は、依存するベクトル検索インデックスと UC 接続オブジェクトをそれぞれリソースとして含める必要があります。docsを参照（[AWS](https://docs.databricks.com/generative-ai/agent-framework/log-agent.html#specify-resources-for-automatic-authentication-passthrough) | [Azure](https://learn.microsoft.com/azure/databricks/generative-ai/agent-framework/log-agent#resources)）。

In [0]:
# Determine Databricks resources to specify for automatic auth passthrough at deployment time
import mlflow
from agent import GENIE_SPACE_ID, LLM_ENDPOINT_NAME, tools
from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool
from mlflow.models.resources import (
    DatabricksFunction,
    DatabricksGenieSpace,
    DatabricksServingEndpoint,
)

# TODO: Manually include underlying resources if needed. See the TODO in the markdown above for more information.
resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID),
]
for tool in tools:
    if isinstance(tool, VectorSearchRetrieverTool):
        resources.extend(tool.resources)
    elif isinstance(tool, UnityCatalogTool):
        resources.append(DatabricksFunction(function_name=tool.uc_function_name))

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        artifact_path="agent",
        python_model="agent.py",
        input_example=input_example,
        pip_requirements=[
            "mlflow",
            "langgraph==0.3.4",
            "databricks-langchain",
            "pydantic",
        ],
        resources=resources,
    )

## Pre-deployment agent validation
エージェントを登録して展開する前に、[mlflow.models.predict()](https://mlflow.org/docs/latest/python_api/mlflow.models.html#mlflow.models.predict) API を使用して展開前のチェックを実行します。Databricks のドキュメントを参照してください。 ([AWS](https://docs.databricks.com/en/machine-learning/model-serving/model-serving-debug.html#validate-inputs) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/machine-learning/model-serving/model-serving-debug#before-model-deployment-validation-checks))."

(注意：社内で実行すると、IP ACLに引っかかり失敗しました。ただしこの後のステップでモデルサービングエンドポイントからであれば実行可能です。そのためエラーは無視して次のステップに行ってくさい)

In [0]:
mlflow.models.predict(
    model_uri=f"runs:/{logged_agent_info.run_id}/agent",
    input_data=input_example,
    env_manager="uv",
)

## Register the model to Unity Catalog

UnityカタログにMLflowモデルを登録するために、以下の `catalog`, `schema`, `model_name` を更新します。

In [0]:
mlflow.set_registry_uri("databricks-uc")

# TODO: define the catalog, schema, and model name for your UC model
catalog = "<Catalog>"
schema = "<Schema>"
model_name = "<ModelName>"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"

# register the model to UC
uc_registered_model_info = mlflow.register_model(
    model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)

## Deploy the agent

In [0]:
from databricks import agents

agents.deploy(
    UC_MODEL_NAME,
    uc_registered_model_info.version,
    tags={"endpointSource": "docs"},
    environment_vars={
        "DATABRICKS_GENIE_PAT": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
        # "DATABRICKS_GENIE_PAT": "<PAT>"
    },
)

## Next steps

エージェントがデプロイされた後は、AI playgroundでチャットを行い、追加のチェックを行ったり、フィードバックを得るために組織内のSMEと共有したり、本番アプリケーションに組み込んだりすることができます。Databricksのドキュメントを参照してください（[AWS](https://docs.databricks.com/en/generative-ai/deploy-agent.html) | [Azure](https://learn.microsoft.com/en-us/azure/databricks/generative-ai/deploy-agent))。