# Databricks の Mosaic AI Agent Framework を利用した AI エージェントの構築
ここからは、Databricks の Mosaic AI Agent Framework を利用した AI エージェントの構築を行います。

### Mosaic AI Agent Framework を利用した AI エージェント作成の流れ

### LangGraph カスタムスキーマエージェントノートブック
このノートブックは、Mosaic AI Agent Frameworkと互換性のあるLangGraph AIエージェントの作成を行います。  
LangGraph AIエージェントと Mosaic AI の Agent Framework の互換性を確保のために、Mosaic AI Agent Framework が定める「スキーマ要件」に準拠するエージェントの作成が必要です。
 
 詳細は([AWS](https://docs.databricks.com/aws/ja/generative-ai/agent-framework/create-chat-model) | [Azure](https://learn.microsoft.com/ja-jp/azure/databricks/generative-ai/agent-framework/create-chat-model)) を参照してください。

### Model-as-code notebook
Mosaic AI Agent Framework では、MLflow の**モデルをコードとして記述する**開発ワークフローを使用しており、これには次の 2 つのノートブックが必要です。
- エージェントノートブック：エージェントのロジックを定義するノートブック（このノートブック）
- ドライバノートブック：エージェントをログに記録、登録、デプロイするノートブック

Model-as-code の詳細については、 MLflow の Models as code ガイド(https://mlflow.org/docs/latest/model/models-from-code.html)を参照してください。  

**_NOTE:_** このノートブックはLangGraphを使用しますが、Mosaic AI Agent FrameworkはLlamaIndexなどの他のエージェントオーサリングフレームワークと互換性があります。

## 環境準備

In [0]:
%run ./00_config

In [0]:
# 本ノートブックで利用するスキーマをセット
schema_name = f"05_vector_search_index_for_{user_name}"
spark.sql(f"USE {catalog_name}.{schema_name}")

必要なライブラリのインストール

In [0]:
%pip install -U -qqqq mlflow langchain langgraph-checkpoint langchain_core langgraph pydantic databricks-langchain
dbutils.library.restartPython()

MLflow による自動ロギングを有効化

In [0]:
import mlflow
mlflow.langchain.autolog()

### Parse LangGraph output
LangGraphの出力メッセージをMosaic AIエージェントフレームワークの推奨出力スキーマに変換するためのヘルパーメソッドを定義します。wrap_outputヘルパーは、カスタム出力を含む追加の`custom_outputs`フィールドとともに、チャット補完と互換性のあるメッセージを返します。

In [0]:
from typing import Iterator, Dict, Any
from langchain_core.messages import (
    AIMessage,
    HumanMessage,
    ToolMessage,
    MessageLikeRepresentation,
)
from mlflow.types.llm import ChatCompletionRequest, ChatCompletionResponse, ChatChoice, ChatMessage
from random import randint
from dataclasses import asdict
import logging

import json

# You can add additional fields to the return object below
def create_flexible_chat_completion_response(content: str, id: int = 0) -> Dict:
    return asdict(ChatCompletionResponse(
        choices=[ChatChoice(message=ChatMessage(role="assistant", content=content))],
        custom_outputs={
            "id": id
        },
    ))

def wrap_output(stream: Iterator[MessageLikeRepresentation]) -> Iterator[Dict]:
    """
    Process and yield formatted outputs from the message stream.
    The invoke and stream langchain functions produce different output formats.
    This function handles both cases.
    """
    for event in stream:
        # the agent was called with invoke()
        if "messages" in event:
            output_content = ""
            for msg in event["messages"]:
                output_content += msg.content
            # Note: you can pass additional fields from your LangGraph nodes to the output here
            yield create_flexible_chat_completion_response(content=output_content, id=randint(100000000, 10000000000))
        # the agent was called with stream()
        else:
            for node in event:
                for key, messages in event[node].items():
                    if isinstance(messages, list):
                        for msg in messages:
                            # Note: you can pass additional fields from your LangGraph nodes to the output here
                            yield create_flexible_chat_completion_response(content=msg.content, id=randint(100000000, 10000000000))
                    else:
                        logging.warning(f"Unexpected value {messages} for key {key}. Expected a list of `MessageLikeRepresentation`'s")
                        yield create_flexible_chat_completion_response(content=str(messages))

## Create the agent
LangGraphの[`create_react_agent` 関数](https://langchain-ai.github.io/langgraph/how-tos/create-react-agent/#usage) を使用して、シンプルなグラフを作成します。  
カスタマイズする場合は、[LangGraph - Quick Start](https://langchain-ai.github.io/langgraph/tutorials/introduction/)の手順に従って、独自のLangGraphエージェントを作成できます。

### Databricks Vector Srarchを利用したRAGのretrieve

05_vector_search_index で作成したVector Search を AI Agentが利用できるように、ツールの定義を行います。  

参考：    
[DatabricksVectorSearch インスタンスの作成](https://python.langchain.com/docs/integrations/vectorstores/databricks_vector_search/)

#### 1. RAGを実行するUnity Catalog Functionの作成
DatabricksVectorSearch を Agent利用するために、Unity Catalog 関数を使用して取得ツールを作成します。  
[Unity Catalog 関数](https://docs.databricks.com/aws/ja/generative-ai/agent-framework/create-custom-tool)によって、カスタム AI エージェントツールを作成することができます。

非構造化データを取得するための AI エージェント ツールを作成する方法としては、[非構造化リトリーバー](https://docs.databricks.com/aws/ja/generative-ai/agent-framework/unstructured-retrieval-tools)を使用する方法もあります。


In [0]:
# Unity Catalog Functionにvector_searchを行う関数を作成
query = f"""
CREATE OR REPLACE FUNCTION {catalog_name}.{schema_name}.product_docs_vector_search (
  -- The agent uses this comment to determine how to generate the query string parameter.
  query STRING
  COMMENT 'The query string for searching our product documentation.'
) RETURNS TABLE
-- The agent uses this comment to determine when to call this tool. It describes the types of documents and information contained within the index.
COMMENT 'Executes a search on product documentation to retrieve text documents most relevant to the input query.' RETURN
SELECT
  id as id,
  map('url', url, 'content', content) as metadata
FROM
  vector_search(
    -- Specify your Vector Search index name here
    index => '{catalog_name}.{schema_name}.product_documentation_vs_index',
    query => query,
    num_results => 5
  )
  """
spark.sql(query)

In [0]:
# 作成した関数の結果を確認
query = f"""
select * 
from {catalog_name}.{schema_name}.product_docs_vector_search('バッテリー交換');
"""
spark.sql(query).display()

### SQL AI Functionの作成
非構造化データを検索するためのツールと同様に、構造化データを検索するツールを作成します。  
先ほどと同様、Agentがツールを利用するために、[Unity Catalog 関数](https://docs.databricks.com/aws/ja/generative-ai/agent-framework/create-custom-tool)を使用します。

ここでは、顧客からの問い合わせを管理する case テーブルを参照し、処理中の件数が多いPC製品 TOP3 を抽出する SQL関数を定義し、ツールとしてAgentが利用可能にします。

なお、AI エージェント ツールAIエージェントツールの構築については、[こちらのドキュメント](https://docs.databricks.com/aws/ja/generative-ai/agent-framework/agent-tool)も参考にしてください。

#### 2. UDFを実行するUnity Catalog Functionの作成

In [0]:
# 利用するスキーマの変更(Genie利用時にセットしたデータを利用)
schema3_name = f"03_data_analysis_by_gen_ai_for_{user_name}"
# spark.sql(f"USE {catalog_name}.{schema3_name}")

In [0]:
# 処理中の問い合わせの多い製品top3と件数を表示するSQL関数を作成

query = f"""
CREATE OR REPLACE FUNCTION
{catalog_name}.{schema3_name}.product_with_many_inquiries()
returns table(Name STRING, in_progress_count INT)
LANGUAGE SQL
-- Make sure to add a comment so that your AI understands what it does
COMMENT 'This function returns the top 3 product names with long term cases in 処理中.'
return
(SELECT
  `product2`.`Name`,
  COUNT(`case`.`Id`) AS `in_progress_count`
FROM
  {catalog_name}.{schema3_name}.`case`
    INNER JOIN
      {catalog_name}.{schema3_name}.`product2`
      ON `case`.`Product2Id__c` = `product2`.`Id`
WHERE
  `case`.`Status` = '処理中'
  AND `product2`.`Name` ILIKE '%PC%'
GROUP BY
  `product2`.`Name`
ORDER BY
  `in_progress_count` DESC
LIMIT 3)
"""

spark.sql(query)

### AI Functionの作成
作成した2つの Unity Catalog Function を ツールとして利用できるようにします。

In [0]:
func_name = [
    f"{catalog_name}.{schema3_name}.product_with_many_inquiries",
    f"{catalog_name}.{schema_name}.product_docs_vector_search"
             ]

In [0]:
from unitycatalog.ai.langchain.toolkit import UCFunctionToolkit
from unitycatalog.ai.core.databricks import DatabricksFunctionClient

In [0]:
# Databricks Function クライアントの作成
client = DatabricksFunctionClient()

# UCFunctionToolkit を利用して、Unity Catalog 関数をツールとして登録
toolkit = UCFunctionToolkit(
    function_names=func_name,
    client=client
)

tools = toolkit.tools



In [0]:
# tools = [tool,vs_tool]
tools

### Agent作成

Agentの作成を行います。  
LangGraphを使った AI Agentの作成の詳細は[LangGraohの公式ドキュメント](https://langchain-ai.github.io/langgraph/tutorials/introduction/)で確認可能です。

In [0]:
from typing import Annotated

# Databricks向けのLLMチャットインターフェースをインポート
from databricks_langchain import ChatDatabricks

from langchain_core.messages import BaseMessage
from typing_extensions import TypedDict

from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition


# 状態を表すTypedDictを定義。このStateは、会話の状態を保持します。
class State(TypedDict):
    messages: Annotated[list, add_messages]

graph_builder = StateGraph(State)

# DatabricksのLLMチャットエンジンをエンドポイント(AOAIのgpt-4o)を指定して初期化
llm = ChatDatabricks(endpoint="openhack-gpt-4o")
# 定義済みのツールと連携させたLLMインスタンスを作成
llm_with_tools = llm.bind_tools(tools)

# チャットボットの処理を定義する関数
def chatbot(state: State):
    # 現在の状態のメッセージをLLMに渡して応答を生成し、メッセージリストとして返す
    return {"messages": [llm_with_tools.invoke(state["messages"])]}

In [0]:
### グラフ定義を使ったシステム全体のワークフローを構築 ###
# ノードの追加
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=tools)
graph_builder.add_node("tools", tool_node)
# エッジ(遷移)の設定
graph_builder.add_conditional_edges(
    "chatbot",
    tools_condition,
)
# ツール処理後に常にチャットボットに戻る遷移を設定
graph_builder.add_edge("tools", "chatbot")
# エントリーポイントの指定(システムの開始はチャットボット)
graph_builder.set_entry_point("chatbot")
graph = graph_builder.compile()

In [0]:
from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

In [0]:
# Agentがuser_inputを受け取り、リアルタイムで応答する処理
def stream_graph_updates(user_input: str):
    for event in graph.stream({"messages": [{"role": "user", "content": user_input}]}):
        for value in event.values():
            print("Assistant:", value["messages"][-1].content)

In [0]:
# # 以下のコメントアウト部分を有効化して実行すると、ユーザ入力を受け付ける形でテストができます。

# while True:
#     try:
#         user_input = input("User: ")
#         if user_input.lower() in ["quit", "exit", "q"]:
#             print("Goodbye!")
#             break

#         stream_graph_updates(user_input)
#     except:
#         # fallback if input() is not available
#         user_input = "What do you know about LangGraph?"
#         print("User: " + user_input)
#         stream_graph_updates(user_input)
#         break

# # 質問例：問い合わせの多い製品の修理手順を教えてください
# # "quit", "exit", "q" で終了させてください

### Agent の生成と 出力ラップ

In [0]:

from langchain_core.runnables import RunnableGenerator
from langgraph.prebuilt import create_react_agent

# create_react_agent 関数を使い、ツールと連携したLLMエージェントを生成
agent_with_raw_output = create_react_agent(llm_with_tools, tools)
# エージェントの生出力をwrap_output関数で整形
agent = agent_with_raw_output| RunnableGenerator(wrap_output)

## エージェントのテスト
エージェントとやりとりして、その出力をテストします。  
このノートブックで `mlflow.langchain.autolog()` を呼び出しているので、エージェントが実行する各ステップのトレースを表示できます。  
プレースホルダー入力を、「問い合わせの多い製品の修理手順を教えてください」など今回の例に合わせて実行してください。

In [0]:
# TODO: replace this placeholder input example with an appropriate domain-specific example for your agent
input_messages = [ChatMessage(role="user", content="問い合わせの多い製品の修理手順を教えてください")]
input_example = asdict(ChatCompletionRequest(messages=input_messages))

for event in agent.stream(input_example):
    print(event, "---" * 20 + "\n")
# for event in agent(input_example):
#     print(event, "---" * 20 + "\n")

In [0]:
mlflow.models.set_model(agent)

## Next steps
このエージェントのログ、登録、デプロイします。07_ai-agent_driver のドライバーノートブックを参照してください。  
 ドライバーノートブックの詳細は[こちら](https://learn.microsoft.com/ja-jp/azure/databricks/generative-ai/agent-framework/author-agent#langchain-custom-schemas)もご確認ください。