In [0]:
from typing import Any, Generator, Optional, Sequence, Union, Annotated, Literal, List, Dict
from pydantic import BaseModel, Field, ValidationError
from databricks.sdk import WorkspaceClient
from databricks_langchain import ChatDatabricks, UCFunctionToolkit, VectorSearchRetrieverTool, DatabricksVectorSearch
from databricks_langchain.genie import GenieAgent
from databricks.vector_search.client import VectorSearchClient
from databricks.sdk.service import sql
from databricks.sdk.service.sql import StatementState

import mlflow
from mlflow.langchain.chat_agent_langgraph import ChatAgentState, ChatAgentToolNode
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import ChatAgentChunk, ChatAgentMessage, ChatAgentResponse, ChatContext

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.language_models import LanguageModelLike
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage, AIMessageChunk
from langchain_core.tools import BaseTool, Tool
from langchain_core.output_parsers import JsonOutputParser, StrOutputParser
from langchain.callbacks import LangChainTracer
from langchain_huggingface import HuggingFaceEmbeddings
from sentence_transformers import SentenceTransformer

from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode

import base64, io, requests, re, json, pandas as pd, math, os, uuid, numpy as np, wave, operator, time, tempfile, concurrent.futures, pytz, threading, random
from google.cloud import vision
from google.oauth2 import service_account
from pypdf import PdfReader
from docx import Document
from datetime import date, datetime
from io import BytesIO
import functools
from math import radians, sin, cos, sqrt, atan2
from langchain_tavily import TavilySearch
from dotenv import load_dotenv

############################################
# databricks sdk client
############################################
load_dotenv()
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_API_TOKEN = os.getenv("DATABRICKS_API_TOKEN")
w = WorkspaceClient(host=DATABRICKS_HOST, token=DATABRICKS_API_TOKEN)

def get_secret(scope: str, key: str) -> str:
    def _maybe_b64decode(s: str) -> str:
        try:
            dec = base64.b64decode(s, validate=True)
            if base64.b64encode(dec).decode("utf-8").strip("=") == s.strip("="):
                return dec.decode("utf-8")
        except Exception as e:
            print(f"failed to decode base64: {e}")
        return s
    try:
        resp = w.secrets.get_secret(scope=scope, key=key)
        return _maybe_b64decode(resp.value)
    except Exception as e:
        raise RuntimeError(f"failed to get secret '{scope}/{key}': {e}")

# カタログとスキーマ
UC_CATALOG = 'hhhd_demo_itec'
UC_SCHEMA = 'commuting_allowance'

# google api key
GOOGLE_API_KEY = get_secret("commute_agent", "GOOGLE_API_KEY")
# 駅すぱあと api key
EKISPERT_API_KEY = get_secret("commute_agent", "EKISPERT_API_KEY")

############################################
# LLM endpoint
############################################
LLM_ENDPOINT_NAME_1 = "databricks-claude-3-7-sonnet"
LLM_ENDPOINT_NAME_2 = "databricks-claude-sonnet-4"
LLM_ENDPOINT_NAME_3 = "databricks-claude-sonnet-4-5"
LLM = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME_3, temperature=0.0, extra_body={"enable_safety_filter": True})

############################################
# LangSmith
############################################
os.environ["LANGCHAIN_API_KEY"] = get_secret("commute_agent", "LANGSMITH_API_KEY")
os.environ["LANGCHAIN_PROJECT"] = get_secret("commute_agent", "LANGSMITH_PROJECT")
tracer = LangChainTracer()

In [0]:
# utils関数
def convert_dates(obj):
  if isinstance(obj, (date, datetime)):
    return obj.isoformat()
  return obj

def safe_json(obj):
  if isinstance(obj, list):
    return [safe_json(item) for item in obj]
  elif isinstance(obj, dict):
    return {k: safe_json(v) for k, v in obj.items()}
  else:
    return convert_dates(obj)
  
def extract_text_from_ocr(image_path):
    try:
        # サービスアカウントキーを指定
        OCR_CREDENTIALS_RAW = get_secret("commute_agent", "OCR_CREDENTIALS")

        # jsonから認証情報を読み込む
        credentials_info = json.loads(OCR_CREDENTIALS_RAW)
        
        # エスケープされた \\n を 改行 \n に変換
        credentials_info["private_key"] = credentials_info["private_key"].replace("\\n", "\n")

        # 認証オブジェクト生成
        credentials = service_account.Credentials.from_service_account_info(credentials_info)
        client = vision.ImageAnnotatorClient(credentials=credentials)

        # OCR対象画像の読み込み
        try:
            with w.dbfs.download(image_path) as f:
                image_bytes = f.read(-1) # すべて読み込む
                image = vision.Image(content=image_bytes)
        except FileNotFoundError:
            return f"{image_path} が見つかりません"

        # OCR実行（日本語含む）
        response = client.text_detection(image=image)
        texts = response.text_annotations

        if response.error.message:
            raise Exception(f"APIエラー: {response.error.message}")
        if texts:
            vision_text = texts[0].description
            return json.dumps({"ocr全文": vision_text.strip()}, ensure_ascii=False)
        else:
            return json.dumps({"ocr全文": "テキストが検出されませんでした。"}, ensure_ascii=False)
    except Exception as e:
        return json.dumps({
        "エラー": f"OCR処理中にエラーが発生しました: {str(e)}"
        }, ensure_ascii=False)

def extract_text_from_pdf(pdf_path):
  try:
    with w.dbfs.download(pdf_path) as f:
      pdf_bytes = f.read(-1)  # 全バイト読み込み
      if not pdf_bytes:
        raise ValueError(f"{pdf_path} は空のファイルです")
  
      # BytesIOでfile-likeオブジェクトにする
      reader = PdfReader(BytesIO(pdf_bytes))
      texts = []
      for page in reader.pages:
        text = page.extract_text()
        if text:
          texts.append(text)
      return "\n".join(texts)
  except Exception as e:
    return f"{pdf_path} の処理に失敗しました: {str(e)}"

def extract_text_from_docx(docx_path):
  with w.dbfs.download(docx_path) as f:
    docx_bytes = f.read(-1)

  with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as tmp:
    tmp.write(docx_bytes)
    tmp_path = tmp.name

  doc = Document(tmp_path)
  text = ""

  # 段落
  for para in doc.paragraphs:
    if para.text.strip():
      text += para.text.strip() + "\n"

  # テーブル
  for table in doc.tables:
    processed_cells = set()
    for row in table.rows:
      row_data = []
      for cell in row.cells:
        cell_text = cell.text.strip()
        if (cell._tc not in processed_cells) and cell_text:
          row_data.append(cell_text)
          processed_cells.add(cell._tc)
        else:
          row_data.append("")
      text += " | ".join(row_data) + "\n"

  return text

def extract_text_from_xlsx(xlsx_path):
  with w.dbfs.download(xlsx_path) as f:
    xlsx_bytes = f.read(-1)

  with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") as tmp:
    tmp.write(xlsx_bytes)
    tmp_path = tmp.name

  df = pd.read_excel(tmp_path, engine='openpyxl', sheet_name=None)
  text = ""
  for sheet_name, sheet_df in df.items():
    text += f"### {sheet_name} シート ###\n"
    text += sheet_df.to_string(index=False, header=True) + "\n"
  return text

def extract_text_from_txt(txt_path):
  with w.dbfs.download(txt_path) as f:
    text = f.read(-1).decode("utf-8")  # 文字コードがShift_JISなら "shift_jis" に変更
  return text

def extract_text(file_path):
  ext = os.path.splitext(file_path)[1].lower()
  if ext == ".pdf":
    return extract_text_from_pdf(file_path)
  elif ext == ".docx":
    return extract_text_from_docx(file_path)
  elif ext in [".xls", ".xlsx"]:
    return extract_text_from_xlsx(file_path)
  elif ext == ".txt":
    return extract_text_from_txt(file_path)
  elif ext == ".jpg" or ext == ".jpeg" or ext == ".png":
    return extract_text_from_ocr(file_path)
  else:
    raise ValueError(f"対応していないファイル形式です: {ext}")


In [0]:
############################################
# tools
############################################
# ファイルからテキストを抽出する関数
def extract_text_from_file(file_name: str) -> str:
    file_path = f"/Volumes/hhhd_demo_itec/commuting_allowance/申請書/{file_name}"
    try:
        extracted_text = extract_text(file_path)
        text = f"\n---\n\n### {os.path.basename(file_path)} の申請内容\n\n```\n{extracted_text}\n```\n"
    except Exception as e:
        return f"エラー: {file_path} の処理に失敗しました: {e}"

    if not text.strip():
        return "ファイルの読み取りに失敗した、もしくは内容が空でした。"
    try:
      extract_prompt_template = PromptTemplate.from_template("""
      下記の従業員が提出した「通勤手当申請書データ」から、次のルールに従って申請情報を正しく整理してください。

      【ステップ1：以下の情報を抽出】
      - 社員ID 
      - 申請者名
      - 勤務先住所
      - 自宅住所
      - 起点最寄り駅
      - 終点最寄り駅
      - 申請理由
      - 事象発生日
      - 全線乗車証
      - 職務乗車証
      - 新規申請経路情報（複数の交通機関を利用する場合、乗り換えは徒歩とします）
      - 取消経路情報（記載があれば）
      - 備考

      【ステップ2：結果の出力】
      以下のMarkdown形式で出力してください：

      ---
      ##申請書内容##

      - **社員ID**: 
      - **申請者名**:  
      - **勤務先住所**:  
      - **自宅住所**:  
      - **起点最寄り駅**: 
      - **終点最寄り駅**:
      - **申請理由**:  
      - **事象発生日**:  
      - **全線乗車証**:（阪急 / 阪神 / なし）
      - **職務乗車証**:（OO線 / なし）
      - **新規申請経路**:  （駅A → 「交通機関名 or 徒歩」 → 駅B → 「交通機関名 or 徒歩」 → 駅C）
      - **取消経路**:  （駅A → 「交通機関名 or 徒歩」 → 駅B → 「交通機関名 or 徒歩」 → 駅C）
      - **備考**:  
      ---

      以下が「通勤手当申請書データ」です：

      {input_text}
      """)
      model = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME_2, temperature=0.0, disable_streaming=True)
      extract_fields_chain = extract_prompt_template | model | StrOutputParser()
      llm_text = extract_fields_chain.invoke({"input_text": text})
    except Exception as e:
      return text.strip()
    return llm_text

# Web検索
def web_search_by_tavily(text: str, k: int = 5) ->str:
  os.environ["TAVILY_API_KEY"] = "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"
  tool = TavilySearch(
    max_results=k,
    topic="general",
    # include_answer=False,
    # include_raw_content=False,
    # include_images=False,
    # include_image_descriptions=False,
    # search_depth="basic",
    # time_range="day",
    # include_domains=["nlftp.mlit.go.jp"],
    # exclude_domains=None
  )
  result = tool.invoke(text)
  return json.dumps(result, ensure_ascii=False)

# 歩行距離計算
def calculate_walking_distance(requests_data: str) -> str:
 try:
  data = json.loads(requests_data)
  origin = data.get("from_info")
  destination = data.get("to_info")
  if not origin or not destination:
   return json.dumps({"error": "from_info と to_info は必須です。"})

  url = "https://maps.googleapis.com/maps/api/distancematrix/json"
  params = {
   "origins": origin,
   "destinations": destination,
   "mode": "walking",
   "language": "ja",
   "key": GOOGLE_API_KEY
  }
  response = requests.get(url, params=params)
  result = response.json()

  if result["status"] != "OK":
   return json.dumps({"error": f"Distance Matrix エラー: {result['status']}"})

  info = result["rows"][0]["elements"][0]
  if info["status"] != "OK":
   return json.dumps({"error": f"ルートエラー: {info['status']}"})

  return json.dumps({
   "distance": info["distance"]["text"],
   "duration": info["duration"]["text"]
  }, ensure_ascii=False)

 except Exception as e:
  return json.dumps({"error": str(e)})

# 本日の日付と現在の時刻を取得
def get_current_datetime(requests_data: str = None) -> str:
    jst = pytz.timezone('Asia/Tokyo')
    now = datetime.now(jst)
    weekday_jp = ["月", "火", "水", "木", "金", "土", "日"]
    weekday = weekday_jp[now.weekday()]
    return now.strftime("%Y-%m-%d") + f"({weekday})" + now.strftime("%H:%M:%S")

In [0]:
# 状態管理カスタマイズ　ChatAgentState拡張
class MyState(ChatAgentState):
    # Supervisor-Workerのやりとり回数（再帰防止に使う）
    iteration_count: int

    # 使用されたツール名の集合（重複排除）
    tools_ran: Annotated[set[str], operator.or_]

    # ベクトル検索ツールの呼び出し回数（制限のために使用）
    vector_tool_count: int

    # 処理開始時間
    started_at: datetime

def custom_tool_node(state: dict, tools: Union[ToolNode, Sequence[BaseTool]]) -> dict:
    # ベクトル検索ツール名と最大実行回数
    VECTOR_TOOL_NAMES = {"search_company_regulations", "vector_search"}  # 必要に応じて追加
    MAX_VECTOR_CALLS = 2
    
    # 現在のベクトル検索回数を取得（初期値 0）
    vector_tool_count = state.get("vector_tool_count", 0)
    
    # 最新メッセージを取得し、ツール呼び出し一覧を取り出す
    last_msg = state["messages"][-1]
    tool_calls = last_msg.get("tool_calls", []) if isinstance(last_msg, dict) else getattr(last_msg, "tool_calls", [])
    
    # ベクトル検索制限チェック
    vector_calls_in_this_request = vector_tool_count
    for call in tool_calls:
        # tool_name は function.name にネストされていることに注意
        if isinstance(call, dict):
            tool_name = call.get("function", {}).get("name", "")
            call_id = call.get("id")
        else:
            func = getattr(call, "function", None)
            tool_name = getattr(func, "name", "") if func else ""
            call_id = getattr(call, "id", None)
        if tool_name in VECTOR_TOOL_NAMES:
            if vector_calls_in_this_request >= MAX_VECTOR_CALLS:
                # 呼び出しをスキップしてダミー応答を追加
                dummy_result = {
                    "role": "tool",
                    "content": f"{tool_name}の実行上限（{MAX_VECTOR_CALLS}回）に達したため、検索出来ません。既にある検索結果で答えてください。",
                    "name": tool_name,
                    "id": str(uuid.uuid4()),
                    "tool_call_id": call_id
                }
                state["messages"] = [dummy_result]
                return state
            else:
                vector_calls_in_this_request += 1
    
    # レビュアーツールを使う場合
    last_tool_name = tool_calls[-1].get("function", {}).get("name", "")
    if last_tool_name == "reviewer":
      full_history = state.get("messages", [])
      try:
        history_as_text = json.dumps(full_history, ensure_ascii=False, indent=2)
      except Exception as e:
        tool_results = []
        for call in tool_calls:
          tool_name = call.get("function", {}).get("name", "")
          call_id = call.get("id")
          if call_id:
            tool_results.append(ToolMessage(
              tool_call_id=call_id,
              name=tool_name,
              content=f"会話履歴の変換に失敗したためレビューツールを実行できません: {e}"
            ))
        state["messages"].extend(tool_results)
        return state

      # 対象となるtool_callの arguments を履歴に置き換える
      for call in tool_calls:
        func = call.get("function", {})
        if func.get("name") == "reviewer":
          func["arguments"] = json.dumps({
            "chat_history": history_as_text  # ツール側は chat_history: str を受け取る想定
          }, ensure_ascii=False)
    
    # ツールノードの通常呼び出し
    tool_node = ChatAgentToolNode(tools)
    try:
        tool_result_state = tool_node.invoke(state, config=None)
    except Exception as e:
        tool_results = []
        for call in tool_calls:
            if isinstance(call, dict):
                tool_name = call.get("function", {}).get("name", "")
                call_id = call.get("id")
            else:
                func = getattr(call, "function", None)
                tool_name = getattr(func, "name", "") if func else ""
                call_id = getattr(call, "id", None)
            
            if call_id:
                tool_results.append(ToolMessage(
                    tool_call_id=call_id, 
                    name=tool_name,  # name パラメータを追加
                    content=f"ツール呼び出しに失敗しました: {e}"
                ))
        tool_result_state["messages"].extend(tool_results)
    
    # 使用済みツールの記録
    tools_ran = set(state.get("tools_ran", set()))
    for call in tool_calls:
        if isinstance(call, dict):
            tool_name = call.get("function", {}).get("name", "")
        else:
            func = getattr(call, "function", None)
            tool_name = getattr(func, "name", "") if func else ""
        if tool_name:
            tools_ran.add(tool_name)
    
    # state に記録を追加
    tool_result_state["tools_ran"] = tools_ran
    tool_result_state["vector_tool_count"] = vector_calls_in_this_request
    
    return tool_result_state

#####################
## ワーカーエージェント
#####################
def commute_operations_agent_tool(requests_data: str) -> str:
  try:
    data = json.loads(requests_data)
    questions: List[str] = data.get("questions")
    if not questions or not isinstance(questions, list):
      return json.dumps({"error": "commute_operations_agent_tool: 'questions' はリストである必要があります。"}, ensure_ascii=False)

    if not DATABRICKS_API_TOKEN:
      return json.dumps({"error": "commute_operations_agent_tool: トークンが設定されていません。"}, ensure_ascii=False)

    url = get_secret("commute_agent", "WORKER_AGENT")
    headers = {
      "Authorization": f"Bearer {DATABRICKS_API_TOKEN}",
      "Content-Type": "application/json",
    }

    # 並列数・リトライ・ジッタ設定（必要に応じて調整）
    MAX_CONCURRENCY = min(5, max(2, len(questions)))  # 過剰並列を抑制
    MAX_RETRIES = 4                                    # 429/5xx時に再試行
    BASE_BACKOFF = 0.6                                 # 初回待機秒
    JITTER_MAX = 0.35                                  # 軽いランダム遅延
    REQ_TIMEOUT = (5, 120)                              # (connect, read)

    # スレッドローカルでセッションを保持（各スレッド専用）
    thread_local = threading.local()

    def get_session() -> requests.Session:
      s = getattr(thread_local, "session", None)
      if s is None:
        s = requests.Session()
        # 必要ならHTTPAdapterで接続プール拡張（デフォでも可）
        adapter = requests.adapters.HTTPAdapter(pool_connections=MAX_CONCURRENCY*2, pool_maxsize=MAX_CONCURRENCY*2)
        s.mount("https://", adapter)
        s.mount("http://", adapter)
        thread_local.session = s
      return s

    def parse_retry_after(resp: requests.Response) -> float:
      # Retry-Afterがあれば優先
      ra = resp.headers.get("Retry-After")
      if not ra:
        return 0.0
      try:
        return float(ra)
      except Exception:
        return 0.0

    def send_request(question: str) -> str:
      # 送信前の軽いジッタ（スパイク緩和）
      time.sleep(random.uniform(0, JITTER_MAX))
      payload = {"messages": [{"role": "user", "content": question}]}
      session = get_session()

      for attempt in range(MAX_RETRIES + 1):
        try:
          resp = session.post(url, headers=headers, json=payload, timeout=REQ_TIMEOUT)
          # 成功
          if resp.status_code == 200:
            data = resp.json()
            # 返却構造に依存：messages[-1].content が最終応答である前提
            return data.get("messages", [{}])[-1].get("content", "")
          # レート制限・一時エラーはリトライ
          if resp.status_code in (429, 500, 502, 503, 504):
            retry_after = parse_retry_after(resp)
            if attempt < MAX_RETRIES:
              backoff = retry_after if retry_after > 0 else (BASE_BACKOFF * (2 ** attempt))
              backoff += random.uniform(0, JITTER_MAX)  # ジッタを合成
              time.sleep(backoff)
              continue
          # その他はエラー返却
          return f"[Error {resp.status_code}]: {resp.text}"
        except requests.RequestException as e:
          # ネットワーク例外はバックオフして再試行
          if attempt < MAX_RETRIES:
            backoff = BASE_BACKOFF * (2 ** attempt) + random.uniform(0, JITTER_MAX)
            time.sleep(backoff)
            continue
          return f"[Request error]: {str(e)}"
        except Exception as e:
          return f"[Unexpected error]: {str(e)}"

      return "[Error]: リトライ上限に達しました。"

    # 並列実行（過剰コンテキストスイッチを避けるためmax_workersを制限）
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_CONCURRENCY) as executor:
      results = list(executor.map(send_request, questions))

    return json.dumps({"results": results}, ensure_ascii=False)

  except Exception as e:
    return json.dumps({"error": f"commute_operations_agent_tool: {e}"}, ensure_ascii=False)


# 適切な通勤ルートを取得するためのツール
def commute_allowance_route_finder_tool(requests_data: str) -> str:
  try:
    data = json.loads(requests_data)
    from_info = data.get("from_info")
    to_info = data.get("to_info")

    # 都道府県チェック（より寛容なパターン）
    pref_pattern = r"^(北海道|東京都|大阪府|京都府|.+?(都|道|府|県))"
    if not from_info or not to_info:
      return json.dumps({"error": "エラー: 出発地と到着地の住所は必須です。"}, ensure_ascii=False)
    if not re.match(pref_pattern, from_info):
      return json.dumps({"error": "from_info は都道府県から始めてください。"}, ensure_ascii=False)
    if not re.match(pref_pattern, to_info):
      return json.dumps({"error": "to_info は都道府県から始めてください。"}, ensure_ascii=False)

    if not DATABRICKS_API_TOKEN:
      return json.dumps({"error": "commute_operations_agent_tool: トークンが設定されていません。"}, ensure_ascii=False)
    
    url = get_secret("commute_agent", "WORKER_AGENT")
    headers = {
      "Authorization": f"Bearer {DATABRICKS_API_TOKEN}",
      "Content-Type": "application/json",
    }

    def send_request(question):
      payload = {"messages": [{"role": "user", "content": question}]}
      try:
        response = requests.post(url, headers=headers, json=payload)
        if response.status_code == 200:
          return response.json()["messages"][-1]["content"]
        else:
          return f"[Error {response.status_code}]: {response.text}"
      except Exception as e:
        return f"[Request error]: {str(e)}"

    question = f"""
      起点は {from_info}、終点は {to_info} です。
      必ず以下の手順で実行し、ルートを選定してください。

      手順1．通勤ルートの検索
      - 平日8:45に到着可能な通勤ルートを検索すること。

      手順2．通勤ルートの選定
      - 以下のルールで複数のルート(あれば)を選定すること。
        1. 自社線を含むルート。
        2. 早いルート（所要時間が一番短い）。
        3. 安いルート（定期代が一番安い）。
      - 補足: ここでいう「自社線」とは、阪急線および阪神線（いずれも鉄道）を指す。

      手順3．起点最寄り駅の確認と修正
      - 起点から半径1km以内にある最寄り駅を調査すること。
      - 選定したルート上の起点最寄り駅を以下のルールで確認し、不適切な場合は修正すること。
        1. 複数の最寄り駅がある場合、最寄り駅から最寄り駅への移動は不可とし、最寄り駅を修正すること。
        2. 以下の説明例を参考にすること。
          - 修正前（誤）: 起点 → 徒歩 → 駅A → バス → 駅B → 電車 → 駅C → 徒歩 → 終点
          - 修正後（正）: 起点 → 徒歩 → 駅B → 電車 → 駅C → 徒歩 → 終点
            （駅A・駅Bがいずれも起点から1km以内の最寄り駅である場合）
        3.最寄り駅修正した場合は所要時間も合わせて修正すること。
        4.選定した通勤ルートが複数ある場合は全て修正すること。

      手順4．結果出力
      - 選定した通勤ルートが複数がある場合は、複数ルートを出力すること。
      - 結果を以下のフォーマットで出力すること。
        - ルート
          ・所要時間: xx分（乗換x回）
          ・経路: 起点 → 徒歩6分 → A駅(XX線) → 電車4分 → B駅(XX線) → 徒歩6分 → C駅 → バス1分 → D駅 → 徒歩2分 → 終点 (列車待ち合わせ時間x分)
          ・定期料金: 1ヶ月 xx円 / 3ヶ月 xx円 / 6ヶ月 xx円
          ・路線別定期代（あれば）: 詳しい内訳を記載すること
      """

    results = send_request(question)
    return json.dumps({"results": results}, ensure_ascii=False)

  except Exception as e:
    return json.dumps({"error": f"commute_allowance_route_finder_tool: {e}"}, ensure_ascii=False)

# 色々なGenieエージェント
GENIE_SPACE_ID = "01f05e029d691aed8af55fc0cf674622"
genie_agent_description = "社員乗車証保有情報を調べるエージェントです。"
EmployeeCommuterPasses = GenieAgent(
    genie_space_id=GENIE_SPACE_ID,
    genie_agent_name="EmployeeCommuterPasses-Genie",
    description=genie_agent_description,
    client=w
)

GENIES = {
    "EmployeeCommuterPasses-Genie": EmployeeCommuterPasses,
}

def genie_agent_tool(requests_data: str) -> str:
  try:
    data = json.loads(requests_data)
    question = data.get("question")
    genie_name = data.get("genie_name")
    if not question or not isinstance(question, str):
      return json.dumps({"error": "genie_agent_tool: 'question' は文字列で正しく指定する必要があります。"}, ensure_ascii=False)
    if not genie_name or not isinstance(genie_name, str):
      return json.dumps({"error": "genie_agent_tool: 'genie_name' は文字列で正しく指定する必要があります。"}, ensure_ascii=False)

    agent = GENIES.get(genie_name)
    if not agent:
      return json.dumps({"error": f"genie_agent_tool: '{genie_name}' は存在しません。"}, ensure_ascii=False)

    input_text= { 
        "messages": [
            {
                "role": "user",
                "content": question,
            }
        ]
    }
    response = agent.invoke(input_text)
    messages = response.get("messages", [])

    if not messages:
      return json.dumps({"error": "genie_agent_tool: レスポンスに messages が含まれていません。"}, ensure_ascii=False)

    content = messages[0].content
    return json.dumps({"results": content}, ensure_ascii=False)

  except Exception as e:
    return json.dumps({"error": f"genie_agent_tool: {e}"}, ensure_ascii=False)

In [0]:
tools = [
    Tool(
        name="extract_text_from_file",
        func=extract_text_from_file,
        description="""
        申請書名を渡して、申請書内容を取得するツールです。
        入力例: "sample.pdf"、"sample.docx"、"sample.txt"、"sample.xlsx" など
        """
    ),
    Tool(
        name="commute_operations_agent",
        func=commute_operations_agent_tool,
        description="""
        通勤に関わる各種業務処理を行うエージェントです。
        このエージェントは、次のような機能があります：
        - 最寄り駅検索（最寄り駅の一覧と各駅の駅コードが返されます）
        - ルート検索（駅コード、住所、場所の名称などで検索可能）
        - 始発・終電検索
        - バス本数検索
        - 定期代払い戻し計算
        - 前後の列車ダイヤ検索

        入力は JSON 文字列で与えてください。最大五つの質問を一度に処理できます。

        入力形式（JSON文字列）: 
        {
            "questions": [
                "(駅コード)駅から(駅コード)駅までの通勤経路を検索してください",
                "XX駅からYY駅までのバス運行本数を確認してください",
                "XXXX年XX月XX日に購入したXX駅からXX駅までの6ヶ月定期券をXXXX年XX月XX日に払い戻す場合の払い戻し金額を計算してください"
            ]
        }

        出力は、各質問に対する結果の配列（JSON）として返されます。
        """
    ),
    Tool(
        name="calculate_walking_distance",
        func=calculate_walking_distance,
        description="""
        2つの場所間の徒歩による距離と所要時間をAPIを使って算出するツールです。
        入力形式はJSON文字列で、"from_info"と"to_info"にそれぞれの住所や駅名やビル名などを指定してください。
        注意：
            - 駅名やビル名などの名称で検索する場合、同じ名称の駅やビルが複数存在する可能性があるため、出来れば都道府県名や市区町村名を含めてください。
            - 駅名の場合、路線名の付与は必須です。（例）「福島駅」ではなく「福島駅（JR線）」など）
        入力例: '{"from_info": "東京都三鷹市下連雀3丁目10-12", "to_info": ""東京都 吉祥寺駅（JR線）"}'
        """
    ),
    Tool(
        name="get_current_datetime",
        func=get_current_datetime,
        description="本日の日付（曜日）と現在の時刻を取得するツールです。（引数不要）"
    ),
    Tool(
        name="commute_allowance_route_finder",
        func=commute_allowance_route_finder_tool,
        description = """
        JSON形式の文字列で渡された出発地と到着地の「住所」(駅名は不可です)を使って、適切なルートを取得するツールです。

        注意事項:
        - "from_info" は出発地で、必ず都道府県から始まる完全な日本語**住所**（ビル名、マンション名、部屋番号などは不要）で指定してください。
        - "to_info" は到着地で、必ず都道府県から始まる完全な日本語**住所**（ビル名、マンション名、部屋番号などは不要）で指定してください。

        入力例（JSON文字列）: '{"from_info": "大阪府大阪市中央区1-1-1", "to_info": "大阪府大阪市福島区1-2-3"}'
        """
    ) 
]

#####################
## スーパーバイザー
#####################
def create_supervisor_agent(
    model: LanguageModelLike,
    tools: Union[ToolNode, Sequence[BaseTool]]
    ) -> CompiledGraph:
    # モデルにツールをバインド
    model = model.bind_tools(tools)

    # 次の状態を判断する関数
    def should_continue(state: MyState) -> str:
        if state.get("vector_tool_count") == None:
            state["vector_tool_count"] = 0
        if state.get("started_at") == None:
            state["started_at"] = time.time()
        last_msg = state["messages"][-1]
        tools_ran = state.get("tools_ran", set())
        if "tool_calls" in last_msg and last_msg["tool_calls"]:
            return "tools"
        return "end"
    
    def _maybe_b64decode(s: str) -> str:
        try:
            dec = base64.b64decode(s, validate=True)
            if base64.b64encode(dec).decode("utf-8").strip("=") == s.strip("="):
                return dec.decode("utf-8")
        except Exception as e:
            print(f"failed to decode base64: {e}")
        return s

    def get_secret(scope: str, key: str) -> str:
        try:
            load_dotenv()
            DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
            DATABRICKS_API_TOKEN = os.getenv("DATABRICKS_API_TOKEN")
            w = WorkspaceClient(host=DATABRICKS_HOST, token=DATABRICKS_API_TOKEN)
            resp = w.secrets.get_secret(scope=scope, key=key)
            return _maybe_b64decode(resp.value)
        except Exception as e:
            raise RuntimeError(f"failed to get secret '{scope}/{key}': {e}")

    # system prompt を動的に取得
    def preprocessor(state: MyState):
        system_prompt = get_secret("commute_agent", "SUPERVISOR_SYSTEM_PROMPT")
        messages = state["messages"]
        if system_prompt:
            messages = [{"role": "system", "content": system_prompt}] + messages
        return messages

    preprocessor = RunnableLambda(preprocessor)

    # パイプライン定義
    model_runnable = preprocessor | model
    
    def call_model(state: MyState, config: RunnableConfig):
        response = model_runnable.invoke(state, config)
        if response is None:
            return {"messages": []}

        return {"messages": [response]}

    # 新しい状態stateを定義
    workflow = StateGraph(MyState)
    tool_node = functools.partial(custom_tool_node, tools=tools)
    # nodeを追加
    workflow.add_node("supervisor", RunnableLambda(call_model))
    workflow.add_node("tools", RunnableLambda(tool_node))

    ## 状態遷移ルールを定義
    # 入口
    workflow.set_entry_point("supervisor")
    # 条件edge追加
    workflow.add_conditional_edges(
        "supervisor",
        should_continue,
        {
            "tools": "tools",
            "end": END,
        },
    )
    # 一般edge追加
    workflow.add_edge("tools", "supervisor")
    # コンパイル
    return workflow.compile()

# 出力結果の形を整形する
class LangGraphChatAgent(ChatAgent):
    def __init__(self, agent: CompiledStateGraph):
        self.agent = agent

    def predict(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> ChatAgentResponse:
        request = {"messages": self._convert_messages_to_dict(messages)}
        config = {"callbacks": [tracer], "recursion_limit": 30}
        final_messages = []
        for event in agent.stream(request, config=config, stream_mode="updates"):
            for node_data in event.values():
                if isinstance(node_data, dict):
                    final_messages = [
                        ChatAgentMessage(**msg) for msg in node_data.get("messages", [])
                    ]
                elif isinstance(node_data, list):
                    final_messages = [
                        ChatAgentMessage(**msg) for msg in node_data
                    ]
        return ChatAgentResponse(messages=final_messages)

    def predict_stream(
        self,
        messages: list[ChatAgentMessage],
        context: Optional[ChatContext] = None,
        custom_inputs: Optional[dict[str, Any]] = None,
    ) -> Generator[ChatAgentChunk, None, None]:
        request = {"messages": self._convert_messages_to_dict(messages)}
        config = {"callbacks": [tracer], "recursion_limit": 30}
        for event in self.agent.stream(request, config=config, stream_mode="updates"):
            for node_data in event.values():
                yield from (
                    ChatAgentChunk(**{"delta": msg}) for msg in node_data["messages"]
                )

agent = create_supervisor_agent(LLM, tools)
AGENT = LangGraphChatAgent(agent)
mlflow.models.set_model(AGENT)