# 概要
本ノートブックでは、Semantic Kernel の Agent Framework と Process Framework を用いて、マルチエージェントを構築するサンプルコードを紹介します。

# 初期設定

ライブラリのインストール

In [1]:
!pip install semantic-kernel==1.27.2 \
    python-dotenv==1.0.1 \
    azure-ai-projects==1.0.0b7 --quiet

環境変数の読み込み

In [2]:
import os
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv(override=True)
AZURE_OPENAI_API_KEY=os.getenv("AZURE_OPENAI_API_KEY")
AZURE_DEPLOYMENT_NAME=os.getenv("AZURE_DEPLOYMENT_NAME")
AZURE_OPENAI_ENDPOINT=os.getenv("AZURE_OPENAI_ENDPOINT")
AZURE_OPENAI_API_VERSION=os.getenv("AZURE_OPENAI_API_VERSION")

ユーティリティ関数の定義

In [3]:
import datetime
import json
from semantic_kernel.contents.function_call_content import FunctionCallContent
from semantic_kernel.contents.function_result_content import FunctionResultContent
from semantic_kernel.contents.text_content import TextContent


async def print_thread_message_details(thread: str):
    """
    スレッドのメッセージ詳細を表示します。

    Args:
        thread (str): スレッドのインスタンス
    """
    async for message in thread.get_messages():
        print("-----")

        for item in message.items:
            if isinstance(item, FunctionCallContent):
                print(f"[Function Calling] by {message.ai_model_id}")
                print(f" - Function Name : {item.name}")
                print(f" - Arguments     : {item.arguments}")

            elif isinstance(item, FunctionResultContent):
                print(f"[Function Result]")
                # 文字列のデコード変換
                if isinstance(item.result, str):
                    try:
                        decoded = json.loads(item.result)
                        print(f" - Result        : {decoded}") # デコード成功時は変換後の値を表示
                    except json.JSONDecodeError:
                        print(f" - Result        : {item.result}")  # デコード失敗時はそのまま
                else:
                    print(f" - Result        : {item.result}")

            elif isinstance(item, TextContent):
                if message.name:
                    print(f"[Agent Response] from {message.ai_model_id}")
                else:
                    print("[User Message]")
                print(f" - Content       : {item.text}")

            else:
                print(f"[Unknown Item Type] ({type(item).__name__})")
                print(f" - Raw Item      : {item}")


def log_with_timestamp(message: str) -> None:
    """
    現在時刻付きでメッセージを標準出力にログとして表示します。

    Args:
        message (str): 出力するログメッセージ。
    """
    timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
    print(f"[{timestamp}] {message}")

# シングルエージェント

## プラグイン定義

In [4]:
import json
from typing import Annotated
from semantic_kernel.functions import kernel_function


class UserPlugin:
    @kernel_function(
        name="get_plan",
        description="指定されたユーザーIDから契約情報を取得します",
    )
    def get_plan(
        self, 
        user_id: Annotated[int, "ユーザーID（4桁の数字）"]
    ) -> Annotated[str, "ユーザーの契約プラン名（例: '安心保障プラン'）"]:
        
        sample_user = [
            {"id": 1234, "name": "佐藤太郎", "email": "sato@example.com", "plan": "安心保障プラン"},
            {"id": 2345, "name": "鈴木花子", "email": "hanako.suzuki@example.com", "plan": "総合生活サポートプラン"},
            {"id": 3456, "name": "田中一郎", "email": "ichiro.tanaka@example.com", "plan": "シンプルプラン"},
        ]

        for user in sample_user:
            if user["id"] == user_id:
                return json.dumps(user["plan"])
        return "該当するユーザーが見つかりませんでした。"


## エージェントの作成

In [5]:
from semantic_kernel.agents import (
    ChatCompletionAgent,
    ChatHistoryAgentThread,
)
from semantic_kernel.connectors.ai.open_ai import (
    AzureChatCompletion,
)

# Azure ChatCompletionのインスタンスを作成
azure_chat_completion = AzureChatCompletion(
    api_key=AZURE_OPENAI_API_KEY,
    deployment_name=AZURE_DEPLOYMENT_NAME,
    endpoint=AZURE_OPENAI_ENDPOINT,
)

# エージェントを作成
agent = ChatCompletionAgent(
    service=azure_chat_completion,
    name="EmployeeAssistant",
    instructions=(
        "あなたはコールセンターで働く従業員向けサポートAIアシスタントです。"
        "従業員の質問に答えたり、サポートを提供したりします。"
    ),
    plugins=[UserPlugin()]
)

## 動作確認

In [6]:
# スレッドを作成
thread = ChatHistoryAgentThread()

# レスポンスを取得
response = await agent.get_response(
    messages=["ユーザー ID 3456 のお客様の保険の加入状況を教えて。"],
    thread=thread,
)

print(response)

ユーザー ID 3456のお客様は、シンプルプランに加入されています。何か他にお手伝いできることがあれば教えてください。


In [7]:
await print_thread_message_details(thread)

-----
[User Message]
 - Content       : ユーザー ID 3456 のお客様の保険の加入状況を教えて。
-----
[Function Calling] by gpt-4o-mini
 - Function Name : UserPlugin-get_plan
 - Arguments     : {"user_id":3456}
-----
[Function Result]
 - Result        : シンプルプラン
-----
[Agent Response] from gpt-4o-mini
 - Content       : ユーザー ID 3456のお客様は、シンプルプランに加入されています。何か他にお手伝いできることがあれば教えてください。


In [9]:
# エージェントの詳細を表示（関数のメタデータを確認）
formatted_json = json.dumps(agent.model_dump(), indent=2, ensure_ascii=False, default=str)
print(formatted_json)

{
  "arguments": null,
  "description": null,
  "id": "3fae41e9-dafb-4bab-8983-4d80616f8b4c",
  "instructions": "あなたはコールセンターで働く従業員向けサポートAIアシスタントです。従業員の質問に答えたり、サポートを提供したりします。",
  "kernel": {
    "retry_mechanism": {},
    "services": {
      "gpt-4o-mini": {
        "ai_model_id": "gpt-4o-mini",
        "service_id": "gpt-4o-mini"
      }
    },
    "ai_service_selector": "<semantic_kernel.services.ai_service_selector.AIServiceSelector object at 0x000002447B265010>",
    "plugins": {
      "UserPlugin": {
        "name": "UserPlugin",
        "description": null,
        "functions": {
          "get_plan": {
            "metadata": {
              "name": "get_plan",
              "plugin_name": "UserPlugin",
              "description": "指定されたユーザーIDから契約情報を取得します",
              "parameters": [
                {
                  "name": "user_id",
                  "description": "ユーザーID（4桁の数字）",
                  "default_value": null,
                  "type_": "int",
               

# マルチエージェント

## 協力型

![cooperative_arch](images/cooperative_arch.png)

### プラグイン定義

In [10]:
import json
from typing import Annotated
import datetime
from semantic_kernel.functions import kernel_function


class TimeWeatherPlugin:

    @kernel_function(
        name="fetch_current_datetime",
        description="現在の時刻を JSON 文字列として取得します。オプションでフォーマットを指定できます。",
    )
    def fetch_current_datetime(
        self,
        format: Annotated[str, "現在の時刻を返す形式（例: '%Y/%m/%d %H:%M'）。未指定時はデフォルト形式。"] = "",
    ) -> Annotated[str, "現在の時刻を含む JSON 文字列（例: {'current_time': '2023-10-01 12:00:00'}）"]:

        time_format = format or "%Y-%m-%d %H:%M:%S"
        current_time = datetime.datetime.now().strftime(time_format)
        return json.dumps({"current_time": current_time})


    @kernel_function(
        name="fetch_weather",
        description="指定された場所の天気情報を取得します。",
    )
    def fetch_weather(
        self, 
        location: Annotated[str, "天気情報を取得する都市名（例: Tokyo, New York, London）"]
    ) -> Annotated[str, "天気情報を含む JSON 文字列（例: {'weather': 'Sunny, 25°C'}）"]:

        # ダミーの天気データを使用
        dummy_weather_data = {
            "New York": "Sunny, 25°C",
            "London": "Cloudy, 18°C",
            "Tokyo": "Rainy, 22°C"
        }
        weather = dummy_weather_data.get(
            location, "Weather data not available for this location."
        )

        return json.dumps({"weather": weather})


class ConvertTemperaturePlugin:

    @kernel_function(
        name="convert_temperature",
        description="温度を摂氏から華氏に変換します。",
    )
    def convert_temperature(
        self,
        celsius: Annotated[float, "摂氏温度（例: 25.0）"]
    ) -> Annotated[str, "華氏温度を含む JSON 文字列（例: {'fahrenheit': 77.0}）"]:

        fahrenheit = (celsius * 9 / 5) + 32

        return json.dumps({"fahrenheit": fahrenheit})


class SendEmailPlugin:

    @kernel_function(
        name="send_email",
        description="指定の件名と本文を含むメールを宛先に送信します。",
    )
    def send_email(
        self,
        recipient: Annotated[str, "メールの宛先アドレス（例: user@example.com）"],
        subject: Annotated[str, "メールの件名"],
        body: Annotated[str, "メールの本文"]
    ) -> Annotated[str, "完了通知を含む文字列（例: {'message': Email successfully sent to xxx@example.com.}）"]:
        print(f"Sending email to {recipient}...")
        print(f"Subject: {subject}")
        print(f"Body:\n{body}")
        return json.dumps({"message": f"Email successfully sent to {recipient}."})

### マルチエージェントの作成

In [11]:
from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.connectors.ai.open_ai import OpenAIChatPromptExecutionSettings
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.functions.kernel_arguments import KernelArguments

# Azure Chat Completion の初期化
azure_chat_completion = AzureChatCompletion(
    api_key=AZURE_OPENAI_API_KEY,
    deployment_name=AZURE_DEPLOYMENT_NAME,
    endpoint=AZURE_OPENAI_ENDPOINT,
)

# 専門エージェントの作成
time_weather_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="TimeWeatherAgent", 
    instructions="あなたは時間と天気のクエリの専門的なエージェントです。",
    plugins=[TimeWeatherPlugin()]
)

temperature_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="TemperatureAgent", 
    instructions="あなたは温度変換の専門的なエージェントです。",
    plugins=[ConvertTemperaturePlugin()]
)

send_email_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="SendEmailAgent", 
    instructions="あなたは電子メールを送信するための専門のエージェントです。",
    plugins=[SendEmailPlugin()]
)

# 振り分けを行うトリアージエージェントを作成
triage_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="TriageAgent",
    instructions=(
        "ユーザーのリクエストを評価し、適切なエージェント（TimeWeatherAgent, TemperatureAgent, SendEmailAgent ）"
        "に転送して、適切なサポートを提供します。エージェントからの情報を含め、ユーザーに完全な回答を提供します。"
        "元のユーザーリクエストが完全に処理されたどうか確認してください。"
    ),
    plugins=[time_weather_agent, temperature_agent, send_email_agent],
)

In [14]:
# Triage エージェントの詳細を表示（各専門エージェントがどのように定義されているのか確認）
formatted_json = json.dumps(triage_agent.model_dump(), indent=2, ensure_ascii=False, default=str)
print(formatted_json)

{
  "arguments": null,
  "description": null,
  "id": "945d5428-0a78-47af-9bb0-0c5e175be08a",
  "instructions": "ユーザーのリクエストを評価し、適切なエージェント（TimeWeatherAgent, TemperatureAgent, SendEmailAgent ）に転送して、適切なサポートを提供します。エージェントからの情報を含め、ユーザーに完全な回答を提供します。元のユーザーリクエストが完全に処理されたどうか確認してください。",
  "kernel": {
    "retry_mechanism": {},
    "services": {
      "gpt-4o-mini": {
        "ai_model_id": "gpt-4o-mini",
        "service_id": "gpt-4o-mini"
      }
    },
    "ai_service_selector": "<semantic_kernel.services.ai_service_selector.AIServiceSelector object at 0x000002447B887230>",
    "plugins": {
      "TimeWeatherAgent": {
        "name": "TimeWeatherAgent",
        "description": null,
        "functions": {
          "TimeWeatherAgent": {
            "metadata": {
              "name": "TimeWeatherAgent",
              "plugin_name": "TimeWeatherAgent",
              "description": "あなたは時間と天気のクエリの専門的なエージェントです。",
              "parameters": [
                {
                  "name": "messages",


### 動作確認

In [None]:
thread = ChatHistoryAgentThread()

user_input = (
    "はじめに、現在の時刻を '%Y-%m-%d %H:%M:%S' 形式で、また Tokyo の天気を教えてください。"
    "次に、Tokyo の気温を華氏に変換してください。"
    "最後に、結果の概要を記載したメールをサンプル受信者に送信してください。"
)


arguments = KernelArguments(
    settings = OpenAIChatPromptExecutionSettings(
        function_choice_behavior=FunctionChoiceBehavior.Auto(),
        parallel_tool_calls=False # 並列関数呼び出しを無効に設定
    )
)

response = await triage_agent.get_response(
    messages=user_input,
    thread=thread,
    arguments=arguments,
)

Sending email to sample-recipient@example.com...
Subject: Tokyo の現在の情報
Body:
サンプル受信者様,

以下は、Tokyo の現在の情報です：

- 現在の時刻: 2025-04-16 08:22:54
- 天気: 雨
- 気温: 22°C (華氏 71.6°F)

よろしくお願いします。


In [16]:
await print_thread_message_details(thread)

-----
[User Message]
 - Content       : はじめに、現在の時刻を '%Y-%m-%d %H:%M:%S' 形式で、また Tokyo の天気を教えてください。次に、Tokyo の気温を華氏に変換してください。最後に、結果の概要を記載したメールをサンプル受信者に送信してください。
-----
[Function Calling] by gpt-4o-mini
 - Function Name : TimeWeatherAgent-TimeWeatherAgent
 - Arguments     : {"messages":"現在の時刻を '%Y-%m-%d %H:%M:%S' 形式で教えてください。また、Tokyo の天気も教えてください。"}
-----
[Function Result]
 - Result        : 現在の時刻は **2025-04-16 08:22:54** です。また、東京の天気は **雨** で、気温は **22°C** です。
-----
[Function Calling] by gpt-4o-mini
 - Function Name : TemperatureAgent-TemperatureAgent
 - Arguments     : {"messages":"Tokyo の気温 22°C を華氏に変換してください。"}
-----
[Function Result]
 - Result        : Tokyo の気温 22°C は、華氏で 71.6°F に相当します。
-----
[Function Calling] by gpt-4o-mini
 - Function Name : SendEmailAgent-SendEmailAgent
 - Arguments     : {"messages":"サンプル受信者様,\n\n以下は、Tokyo の現在の情報です：\n\n- 現在の時刻: 2025-04-16 08:22:54\n- 天気: 雨\n- 気温: 22°C (華氏 71.6°F)\n\nよろしくお願いします。"}
-----
[Function Result]
 - Result        : メールは無事に受信者に送信されました。何か他にお手伝いでき

## 競争型

![competitive_arch](images/competitive_arch.png)

ユーザーのリクエストを受け取った後の処理フロー

1. エージェント選択（Selection Strategy）
2. 選ばれたエージェントを実行
3. 結果を会話履歴に記録
4. 終了判定（Termination Strategy）
5. End or 終了条件を満たすまで繰り返し

### 各エージェントの作成

In [17]:
from semantic_kernel.agents import  ChatCompletionAgent


REVIEWER_NAME = "Reviewer"
WRITER_NAME = "Writer"


# Azure ChatCompletionのインスタンスを作成
azure_chat_completion = AzureChatCompletion(
    api_key=AZURE_OPENAI_API_KEY,
    deployment_name=AZURE_DEPLOYMENT_NAME,
    endpoint=AZURE_OPENAI_ENDPOINT,
)


# レビュアーエージェントを作成
agent_reviewer = ChatCompletionAgent(
    service=azure_chat_completion,
    name=REVIEWER_NAME,
    instructions="""
あなたの役割は、ユーザーが提示するキーワード、要点、草稿（マークダウン形式）に対して、
追加すべき情報や観点を具体的に提案するレビュワーです。

初期入力がキーワードのみであっても、読者にとって有益な情報を盛り込むために必要なトピック、
構成要素、視点などを具体的に提示してください。

制約：
- 記事の直接修正や文章生成は行わないでください。
- 提案はリスト形式で簡潔に提示してください。
"""
)


# ライターエージェントを作成
agent_writer = ChatCompletionAgent(
    service=azure_chat_completion,
    name=WRITER_NAME,
    instructions="""
あなたの役割は、レビュワーが提案した内容に基づき、
マークダウン形式で記事を段階的に肉付けしていくライターです。

制約：
- 提案された内容をすべて反映し、マークダウン形式で執筆してください。
- 記事本文以外の出力は含めないでください。
"""
)

### 選択戦略・終了戦略で使うプロンプトの定義

In [18]:
from semantic_kernel.functions import KernelFunctionFromPrompt


# 選択戦略で使うプロンプトを定義
selection_function = KernelFunctionFromPrompt(
    function_name="selection", 
    prompt=f"""
提供された直前の発言内容を確認し、次に発言すべき参加者を判断してください。  
出力には、選んだ参加者の名前のみを記述し、理由や補足説明は一切記載しないでください。  
また、同じ参加者が続けて発言することはできません。

選択可能な参加者：  
- {REVIEWER_NAME}  
- {WRITER_NAME}  

ルール：  
- 入力がユーザーによる初回キーワード（例：「Azure AI Agent Service」など）の場合 → {REVIEWER_NAME} を選んでください。  
- 発言者が {REVIEWER_NAME} の場合 → 次は {WRITER_NAME} を選んでください。  
- 発言者が {WRITER_NAME} の場合 → 次は {REVIEWER_NAME} を選んでください。  

直前の発言：  
{{{{$lastmessage}}}}
"""
)


# 終了戦略で使うプロンプトを定義
termination_keyword = "yes"

termination_function = KernelFunctionFromPrompt(
    function_name="termination", 
    prompt=f"""
以下の回答を確認し、その内容が適切かどうかを判断してください。  
適切である場合は、説明や補足なしで「{termination_keyword}」という単語のみを出力してください。

判定基準：  
- 回答に具体的な修正提案が含まれている場合 → 不適切です。  
- 回答に修正提案が含まれていない場合 → 適切です。  

確認対象の回答：  
{{{{$lastmessage}}}}
"""
)


### マルチエージェントの作成

In [19]:
from semantic_kernel import Kernel
from semantic_kernel.agents import AgentGroupChat
from semantic_kernel.agents.strategies import (
    KernelFunctionSelectionStrategy,
    KernelFunctionTerminationStrategy,
)
from semantic_kernel.contents import ChatHistoryTruncationReducer


# Kernel を作成
kernel = Kernel(services=azure_chat_completion)


# エージェントの会話履歴管理（直近 4 つのメッセージのみを保持するよう指定）
history_reducer = ChatHistoryTruncationReducer(target_count=4)


# AgentGroupChat を初期化
chat = AgentGroupChat(

    # ディスカッションに参加するエージェントのリスト
    agents=[agent_reviewer, agent_writer],

    # 選択戦略
    selection_strategy=KernelFunctionSelectionStrategy(
        initial_agent=agent_reviewer,
        function=selection_function,
        kernel=kernel,
        result_parser=lambda result: str(result.value[0]).strip() if result.value[0] is not None else WRITER_NAME,
        history_variable_name="lastmessage",
        history_reducer=history_reducer,
    ),

    # 終了戦略
    termination_strategy=KernelFunctionTerminationStrategy(
        agents=[agent_reviewer],
        function=termination_function,
        kernel=kernel,
        result_parser=lambda result: termination_keyword in str(result.value[0]).lower(),
        history_variable_name="lastmessage",
        maximum_iterations=8, # 最大 8 回のやり取りを許可
        history_reducer=history_reducer,
    ),
)

### 動作確認

In [20]:
user_input = ("生成 AI によるマルチエージェントのユースケースについて")

print(f"=== User ===")
print(user_input)

await chat.add_chat_message(message=user_input)

try:
    turn = 1
    async for response in chat.invoke():
        if response is None or not response.name:
            continue
        print(f"\n=== TURN {turn} ({response.name.upper()}) ===")
        print(response.content)
        turn += 1
except Exception as e:
    print(f"Error during chat invocation: {e}")

=== User ===
生成 AI によるマルチエージェントのユースケースについて

=== TURN 1 (REVIEWER) ===
生成AIによるマルチエージェントのユースケースに関する記事のための追加情報や観点の提案は以下の通りです：

1. **マルチエージェントシステムの定義**  
   - マルチエージェントシステム（MAS）の基本概念の説明。

2. **生成AIの基本概念**  
   - 生成AIが何であるか、どのように機能するかの概要。

3. **ユースケースの具体例**  
   - 産業別（医療、金融、エンターテインメントなど）における具体的なユースケースを挙げる。
   - ゲーム開発におけるNPC（ノンプレイヤーキャラクター）の生成。

4. **コミュニケーションと協調**  
   - 複数のエージェントがどのように相互作用し、協力してタスクを達成するかの事例。

5. **シミュレーションと最適化**  
   - 複雑なシステム（都市交通、物流など）のシミュレーションでの利用方法。

6. **AIエージェントの自律性**  
   - エージェントがどのように独立して意思決定を行うかのメカニズム。

7. **倫理的考慮**  
   - マルチエージェントシステムにおける倫理的な問題、例えば責任の所在やバイアスの問題。

8. **今後の展望**  
   - マルチエージェントと生成AIの未来の可能性や進化についての考察。

9. **技術的課題**  
   - 整合性、スケーラビリティ、通信の信頼性といった技術的課題を説明。

10. **実装事例の紹介**  
    - 企業やプロジェクトでの成功例を挙げ、成果や学びをシェア。

これらの要素を含めることで、読者にとってより深く理解できる内容になるでしょう。

=== TURN 2 (WRITER) ===
# 生成AIによるマルチエージェントのユースケース

## マルチエージェントシステムの定義
マルチエージェントシステム（MAS）は、複数のエージェントが相互に作用し、協力または競争しながらタスクを遂行するシステムです。各エージェントは独立して行動し、異なる目標を持つ場合がありますが、全体として一つの目的を達成することを目指します。

## 生成AIの基本

## ワークフロー型

![workflow_arch](images/workflow_arch.png)

### プラグインの定義

In [29]:
import json
import time
from typing import Annotated
from semantic_kernel.functions import kernel_function

# IT 初期セットアッププラグイン
class ITSetupPlugin:

    @kernel_function(
        name="pc_setup",
        description="PC の手配を行います。",
    )
    def pc_setup(
        self,
        user_id: Annotated[str, "PC を手配する対象ユーザーの ID"],
    ) -> Annotated[str, "PC セットアップ結果を含む JSON 文字列（例: {'user_id': 'u123', 'pc_setup': 'completed'}）"]:

        print("--- PC Setup ---")
        print(f"- Requested PC setup for user ID: {user_id}")
        time.sleep(1)
        print("- Preparing PC...")
        time.sleep(2)
        print("- PC setup completed.")
        print("---------------")

        data = {
            "user_id": user_id,
            "pc_setup": "completed"
        }
        return json.dumps(data)

    @kernel_function(
        name="create_mail_address",
        description="メールアドレスの新規作成を行います。",
    )
    def create_mail_address(
        self,
        user_id: Annotated[str, "メールアドレスを作成する対象ユーザーの ID"],
    ) -> Annotated[str, "メールアドレス作成結果を含む JSON 文字列（例: {'user_id': 'u123', 'mail_address': 'completed'}）"]:

        print("--- Mail Address Setup ---")
        print(f"- Requested mail address setup for user ID: {user_id}")
        time.sleep(1)
        print("- Preparing mail address...")
        time.sleep(2)
        print("- Mail address setup completed.")
        print("--------------------------")

        data = {
            "user_id": user_id,
            "mail_address": "completed"
        }
        return json.dumps(data)

# 入社書類準備プラグイン
class OnboardingDocumentsPlugin:

    @kernel_function(
        name="prepeare_onboarding_documents",
        description="入社書類の準備を行います。",
    )
    def prepeare_onboarding_documents(
        self,
        user_id: Annotated[str, "入社書類を準備する対象ユーザーの ID"],
    ) -> Annotated[str, "書類準備結果を含む JSON 文字列（例: {'user_id': 'u123', 'onboarding_documents': 'completed'}）"]:

        print("--- Onboarding Documents ---")
        print(f"- Requested onboarding documents for user ID: {user_id}")
        time.sleep(1)
        print("- Preparing onboarding documents...")
        time.sleep(2)
        print("- Onboarding documents preparation completed.")
        print("----------------------------")

        data = {
            "user_id": user_id,
            "onboarding_documents": "completed"
        }
        return json.dumps(data)


# ミーティング調整プラグイン
class ScheduleMeetingPlugin:

    @kernel_function(
        name="schedule_manager_meeting",
        description="マネージャーとの 1on1 ミーティングをスケジュールします。",
    )
    def schedule_manager_meeting(
        self,
        user_id: Annotated[str, "ミーティング対象のユーザー ID"],
        manager_id: Annotated[str, "ミーティング相手となるマネージャーの ID"],
    ) -> Annotated[str, "ミーティングスケジュール情報を含む JSON 文字列（例: {'user_id': 'u123', 'manager_id': 'm456', 'meeting_date': '2026-04-15', 'status': 'scheduled'}）"]:

        print("--- Schedule 1on1 Meeting ---")
        print(f"- Scheduling 1on1 meeting for user ID: {user_id} with manager ID: {manager_id}")
        time.sleep(1)
        print("- Preparing meeting...")
        time.sleep(2)
        print("- Meeting scheduled.")
        print("----------------------------")

        data = {
            "user_id": user_id,
            "manager_id": manager_id,
            "meeting_date": "2026-04-15",
            "status": "scheduled"
        }
        return json.dumps(data)


### 各エージェントの作成

In [30]:
from semantic_kernel.agents import ChatCompletionAgent, ChatHistoryAgentThread
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion


# Azure ChatCompletion のインスタンスを作成
azure_chat_completion = AzureChatCompletion(
    api_key=AZURE_OPENAI_API_KEY,
    deployment_name=AZURE_DEPLOYMENT_NAME,
    endpoint=AZURE_OPENAI_ENDPOINT,
)

# IT セットアップエージェント
it_setup_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="ITSetupAgent", 
    instructions="あなたは IT 関連（PCやメールアドレス）の初期セットアップを行うエージェントです。",
    plugins=[ITSetupPlugin()]
)

# 入社書類準備エージェント
onboarding_documents_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="OnboardingDocumentsAgent", 
    instructions="あなたは入社書類の準備を行うエージェントです。",
    plugins=[OnboardingDocumentsPlugin()]
)

# ミーティング調整エージェント
schedule_meeting_agent = ChatCompletionAgent(
    service=azure_chat_completion, 
    name="ScheduleMeetingAgent", 
    instructions="あなたは新入社員とマネージャーの 1on1 のスケジュールを設定するエージェントです。",
    plugins=[ScheduleMeetingPlugin()]
)

### 各ステップと状態を定義

In [31]:
import asyncio
from enum import Enum

from semantic_kernel.functions import kernel_function
from semantic_kernel.processes.kernel_process import KernelProcessStep, KernelProcessStepContext


complete_keyword = "OK"
thread = ChatHistoryAgentThread()


class Events(Enum):
    Start = "Start"
    GotUserInfo = "GotUserInfo"
    ITReady = "ITReady"
    DocsReady = "DocsReady"
    OneOnOneReady = "OneOnOneReady"
    NotifyReady = "NotifyReady"
    Completed = "Completed"


# 入社準備の状態を管理するクラス
class SharedState:
    _state = {}

    @classmethod
    def set(cls, key: str, value: bool) -> None:
        cls._state[key] = value

    @classmethod
    def all_ready(cls, required_keys: list[str]) -> bool:
        return all(cls._state.get(k) for k in required_keys)


# 各ステップを定義
class StartStep(KernelProcessStep):
    @kernel_function()
    async def run(self, context: KernelProcessStepContext, input: str):
        await asyncio.sleep(1)

        payload = json.loads(input)
        user_id = payload["employee_id"]
        
        print(f"Input received: {user_id}")
        await context.emit_event(Events.GotUserInfo, data=user_id)

class GetUserInfoStep(KernelProcessStep):
    """ユーザーの入社情報を取得するステップ"""
    @kernel_function()
    async def get_user_info(self, context: KernelProcessStepContext, input: str):
        user_id = str(input)
        log_with_timestamp("--- GetUserInfoStep ---")
        time.sleep(1)
        print(f"Getting user info...")
        user_data = {
            "user_id": user_id,
            "name": "佐藤花子",
            "hire_date": "2026-04-01",
            "manager": {
                "name": "山田太郎",
                "id": "5678"
            }
        }
        print("Got it.")
        print(f"User data: {user_data}")
        print("---------------------")

        await context.emit_event(Events.ITReady, data=user_data)
        await context.emit_event(Events.DocsReady, data=user_data)
        await context.emit_event(Events.OneOnOneReady, data=user_data)

class ITStep(KernelProcessStep):
    """IT 機器手配を行うステップ"""
    @kernel_function()
    async def prepare_it(self, context: KernelProcessStepContext, data: dict):
        log_with_timestamp("--- ITStep ---")

        # IT 機器手配エージェントにタスクを依頼
        prompt = f"""
        新入社員のIT機器の手配を行ってください。
        完了したら、説明や補足なしで「{complete_keyword}」という単語のみを出力してください。
        
        # IT 機器手配
        1. PC の手配
        2. メールアドレスの新規作成

        # 入社情報
        - ユーザー ID: {data["user_id"]}
        """
        response = await it_setup_agent.get_response(
            messages=prompt,
            thread=thread,
        )
        
        # 処理が正常に完了してたら check_ready 関数を呼び出す
        if response.content.content.strip() == complete_keyword:
            await check_ready(context, "ITStep", data)
        else:
            raise RuntimeError("IT機器手配に失敗したため、プロセスを中断します。")
            

class DocsStep(KernelProcessStep):
    """入社書類準備を行うステップ"""
    @kernel_function()
    async def prepare_docs(self, context: KernelProcessStepContext, data: dict):
        log_with_timestamp("--- DocsStep ---")

        # 入社書類準備エージェントにタスクを依頼
        prompt = f"""
        新入社員の入社書類を準備してください。
        完了したら「{complete_keyword}」という単語のみを出力してください。

        # 入社情報
        - ユーザー ID: {data["user_id"]}
        - 氏名: {data["name"]}
        - 入社日: {data["hire_date"]}
        """
        response = await onboarding_documents_agent.get_response(
            messages=prompt,
            thread=thread,
        )

        # 処理が正常に完了してたら check_ready 関数を呼び出す
        if response.content.content.strip() == complete_keyword:
            await check_ready(context, "DocsStep", data)
        else:
            raise RuntimeError("入社書類準備に失敗したため、プロセスを中断します。")


class OneOnOneStep(KernelProcessStep):
    """1on1 ミーティングのスケジュールを設定するステップ"""
    @kernel_function()
    async def setup_1on1(self, context: KernelProcessStepContext, data: dict):
        log_with_timestamp("--- OneOnOneStep ---")

        # ミーティング調整エージェントにタスクを依頼
        prompt = f"""
        新入社員とマネージャーの 1on1 ミーティングをスケジュールしてください。
        完了したら「{complete_keyword}」という単語のみを出力してください。

        # 入社情報
        - ユーザー ID: {data["user_id"]}
        - マネージャー ID: {data["manager"]["id"]}
        - マネージャー氏名: {data["manager"]["name"]}
        """
        response = await schedule_meeting_agent.get_response(
            messages=prompt,
            thread=thread,
        )

        # 処理が正常に完了してたら check_ready 関数を呼び出す
        if response.content.content.strip() == complete_keyword:
            await check_ready(context, "OneOnOneStep", data)
        else:
            raise RuntimeError("1on1 設定に失敗したため、プロセスを中断します。")


async def check_ready(context: KernelProcessStepContext, step_name: dict, data: dict):
    """
    入社準備が完了したかを確認し、すべての準備が整った場合に NotifyStep イベントを発火します。
    
    Args:
        context (KernelProcessStepContext): ステップのコンテキスト
        step_name (str): 現在のステップ名
        data (dict): 入社準備に必要なデータ
    """
    SharedState.set(step_name, True)
    preparation_steps = ["ITStep", "DocsStep", "OneOnOneStep"]
    if SharedState.all_ready(preparation_steps):
        await context.emit_event(Events.NotifyReady, data=data)


class NotifyStep(KernelProcessStep):
    """入社準備完了通知を行うステップ"""
    @kernel_function()
    async def send_mail(self, context: KernelProcessStepContext, data: dict):
        log_with_timestamp("--- NotifyStep ---")

        # 件名・本文の構成（任意でフォーマット調整可）
        subject = "【入社準備完了のお知らせ】"
        body = (
            f"{data['name']}さん、\n\n"
            f"以下の入社準備が完了しました。\n"
            f"- ユーザーID: {data['user_id']}\n"
            f"- 入社日: {data['hire_date']}\n"
            f"- 担当マネージャー: {data['manager']['name']}（ID: {data['manager']['id']}）\n\n"
            f"安心してご入社ください！\n\n"
            f"人事部より"
        )

        # ダミー送信処理（実際は print）
        print("=== メール送信処理 ===")
        print(f"宛先: {data['name']} <dummy@example.com>")
        print(f"件名: {subject}")
        print(f"本文:\n{body}")
        print("======================")

        await asyncio.sleep(1)
        await context.emit_event(Events.Completed)



### 動作確認

In [32]:
from semantic_kernel.kernel import Kernel
from semantic_kernel.processes import ProcessBuilder
from semantic_kernel.processes.local_runtime.local_kernel_process import start
from semantic_kernel.processes.local_runtime.local_event import KernelProcessEvent


async def main(input: str):
    builder = ProcessBuilder("OnboardingProcess")

    # --- ステップ定義 ---
    start_step = builder.add_step(step_type=StartStep)
    get_user_info_step = builder.add_step(step_type=GetUserInfoStep)
    it_step = builder.add_step(step_type=ITStep)
    docs_step = builder.add_step(step_type=DocsStep)
    oneonone_step = builder.add_step(step_type=OneOnOneStep)
    notify_step = builder.add_step(step_type=NotifyStep)

    # --- イベントルーティング定義 ---

    # プロセス起動時の入力 → StartStep に渡す
    builder.on_input_event(Events.Start).send_event_to(start_step, parameter_name="input")

    # StartStep 完了 → GetUserInfoStep を起動
    start_step.on_event(Events.GotUserInfo).send_event_to(get_user_info_step, parameter_name="input")

    # GetUserInfoStep の完了 → 各準備ステップ（並列）に起動
    get_user_info_step.on_event(Events.ITReady).send_event_to(it_step, parameter_name="data")
    get_user_info_step.on_event(Events.DocsReady).send_event_to(docs_step, parameter_name="data")
    get_user_info_step.on_event(Events.OneOnOneReady).send_event_to(oneonone_step, parameter_name="data")

    # 各準備ステップの完了 → NotifyStep を実行
    it_step.on_event(Events.NotifyReady).send_event_to(notify_step, parameter_name="data")
    docs_step.on_event(Events.NotifyReady).send_event_to(notify_step, parameter_name="data")
    oneonone_step.on_event(Events.NotifyReady).send_event_to(notify_step, parameter_name="data")

    # NotifyStep 完了 → プロセスを終了
    notify_step.on_event(Events.Completed).stop_process()

    # --- プロセスのビルドと実行 ---
    kernel_process = builder.build()
    await start(
        process=kernel_process,
        kernel=Kernel(),
        initial_event=KernelProcessEvent(id=Events.Start, data=input),
    )


imput = {"employee_id": "1234"}
await main(json.dumps(imput))

Input received: 1234
[08:31:17.100] --- GetUserInfoStep ---
Getting user info...
Got it.
User data: {'user_id': '1234', 'name': '佐藤花子', 'hire_date': '2026-04-01', 'manager': {'name': '山田太郎', 'id': '5678'}}
---------------------
[08:31:18.105] --- ITStep ---
[08:31:18.109] --- DocsStep ---
[08:31:18.113] --- OneOnOneStep ---
--- Schedule 1on1 Meeting ---
- Scheduling 1on1 meeting for user ID: 1234 with manager ID: 5678
- Preparing meeting...
- Meeting scheduled.
----------------------------
--- PC Setup ---
- Requested PC setup for user ID: 1234
- Preparing PC...
- PC setup completed.
---------------
--- Mail Address Setup ---
- Requested mail address setup for user ID: 1234
- Preparing mail address...
- Mail address setup completed.
--------------------------
--- Onboarding Documents ---
- Requested onboarding documents for user ID: 1234
- Preparing onboarding documents...
- Onboarding documents preparation completed.
----------------------------
[08:31:32.609] --- NotifyStep ---
=== メ