In [None]:
# 仮想環境内でパッケージをインストール
import sys
!{sys.executable} -m pip install semantic-kernel azure-ai-projects

In [None]:
from dotenv import load_dotenv

load_dotenv('../concierge-agent/.env')

ユーティリティ関数

In [None]:
import datetime
import json
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent
from semantic_kernel.contents.text_content import TextContent


async def print_thread_message_details(thread: str):
    """
    スレッドのメッセージ詳細を表示します。

    Args:
        thread (str): スレッドのインスタンス
    """
    async for message in thread.get_messages():
        print("-----")

        for item in message.items:
            if isinstance(item, FunctionCallContent):
                print(f"[Function Calling] by {message.ai_model_id}")
                print(f" - Function Name : {item.name}")
                print(f" - Arguments     : {item.arguments}")

            elif isinstance(item, FunctionResultContent):
                print(f"[Function Result]")
                # 文字列のデコード変換
                if isinstance(item.result, str):
                    try:
                        decoded = json.loads(item.result)
                        print(f" - Result        : {decoded}") # デコード成功時は変換後の値を表示
                    except json.JSONDecodeError:
                        print(f" - Result        : {item.result}")  # デコード失敗時はそのまま
                else:
                    print(f" - Result        : {item.result}")

            elif isinstance(item, TextContent):
                if message.name:
                    print(f"[Agent Response] from {message.ai_model_id}")
                else:
                    print("[User Message]")
                print(f" - Content       : {item.text}")

            else:
                print(f"[Unknown Item Type] ({type(item).__name__})")
                print(f" - Raw Item      : {item}")


def log_with_timestamp(message: str) -> None:
    """
    現在時刻付きでメッセージを標準出力にログとして表示します。

    Args:
        message (str): 出力するログメッセージ。
    """
    timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
    print(f"[{timestamp}] {message}")


In [None]:
from datetime import datetime, timedelta
import os
from typing import Annotated
from dotenv import load_dotenv
from semantic_kernel.agents import ChatCompletionAgent
from semantic_kernel.functions import kernel_function
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel import Kernel
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import BingGroundingTool

load_dotenv()

# --- プラグイン定義 ---
class SearchPlugin:
    @kernel_function(description="Bing 検索結果を要約")
    def web_search(self, query: str) -> str:
        project_client = AIProjectClient(
            endpoint=os.environ["PROJECT_CONNECTION_STRING"],
            credential=DefaultAzureCredential(),  # Use Azure Default Credential for authentication
        )
        conn_id = os.environ["BING_CONNECTION_NAME"]  # Ensure the BING_CONNECTION_NAME environment variable is set

        # Initialize the Bing Grounding tool
        bing = BingGroundingTool(connection_id=conn_id)

        with project_client:
            # Create an agent with the Bing Grounding tool
            agent = project_client.agents.create_agent(
                model=os.environ["AGENT_MODEL_DEPLOYMENT_NAME"],  # Model deployment name
                name="my-agent",  # Name of the agent
                instructions="You are a helpful agent",  # Instructions for the agent
                tools=bing.definitions,  # Attach the Bing Grounding tool
            )
            print(f"Created agent, ID: {agent.id}")
            thread = project_client.agents.threads.create()
            print(f"Created thread, ID: {thread.id}")
            message = project_client.agents.messages.create(
                thread_id=thread.id,
                role="user",  # Role of the message sender
                content=query,  # Message content
            )
            run = project_client.agents.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)
            messages = project_client.agents.messages.list(thread_id=thread.id)
            for message in messages:
                print(f"Role: {message.role}, Content: {message.content}")
                return message.content[-1].text.value  # Return the content of the last message
        # TODO: 失敗した場合のエラーハンドリング
        return f"検索結果: {query} に関する情報"

    @kernel_function(description="今日の日付をYYYY-MM-DD形式で返します。")
    def get_today(self) -> Annotated[str, "YYYY-MM-DD形式でフォーマットされた現在の日付。"]:
        return datetime.now().strftime("%Y-%m-%d")

    @kernel_function(description="日数のオフセットに基づいて相対的な日付を返します。")
    def get_relative_date(self, days_offset: Annotated[int, "今日に追加する日数。"]) -> Annotated[str, "今日からオフセットされた日付をYYYY-MM-DD形式で。"]:
        return (datetime.now() + timedelta(days=days_offset)).strftime("%Y-%m-%d")
    
# --- エージェント設定 ---
def create_agents():
    # Kernelの作成
    kernel = Kernel()
    
    # Azure OpenAI サービスの追加
    chat_service = AzureChatCompletion(
        deployment_name=os.environ["AGENT_MODEL_DEPLOYMENT_NAME"],
        endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
        api_key=os.environ["AZURE_OPENAI_API_KEY"],
        api_version=os.environ["AZURE_OPENAI_API_VERSION"],
    )
    kernel.add_service(chat_service)
    
    # プラグインの追加
    kernel.add_plugin(SearchPlugin(), plugin_name="search")
    
    # リサーチャーエージェント
    researcher = ChatCompletionAgent(
        id="default",
        kernel=kernel,
        name="ResearcherAgent",
        instructions="""
        あなたは市場調査の専門家です。与えられたクエリに対して検索を行い、
        結果を日本語で要約してください。
        """,
    )
    
    # ライターエージェント
    writer = ChatCompletionAgent(
        id="default",
        kernel=kernel,
        name="WriterAgent",
        instructions="""
        あなたは技術ライターです。リサーチ結果を受け取り、
        マークダウン形式のレポートを作成してください。
        """,
    )
    
    # コーディネーターエージェント
    coordinator = ChatCompletionAgent(
        id="default",
        kernel=kernel,
        name="CoordinatorAgent",
        instructions="""
        あなたはエージェントコーディネーターです。
        ユーザーのクエリに対して、リサーチが必要かどうかを判断し、
        必要に応じて適切なエージェントに作業を依頼してください。
        """,
        plugins=[researcher, writer],
    )
    
    return coordinator

# --- 対話処理 ---
async def handle_conversation():
    coordinator = create_agents()
    
    print("🤖 マルチエージェントシステムが起動しました")
    print("終了するには 'exit' と入力してください\n")

    while True:
        user_input = input("User> ")
        if user_input.lower() in ["exit", "quit", "終了"]:
            break

        from semantic_kernel.agents import ChatHistoryAgentThread
        thread = ChatHistoryAgentThread()
        response = await coordinator.get_response(
            messages=user_input,
            thread=thread,
        )
        await print_thread_message_details(thread)
        print(f"CoordinatorAgent> {response.content}")
    
    print("セッション終了")

# 非同期関数を実行
await handle_conversation()