# セッション 5 – マルチエージェントオーケストレーター

Foundry Local を使用して、シンプルな2エージェントのパイプライン（リサーチャー -> エディター）をデモンストレーションします。


### 説明: 依存関係のインストール
ローカルモデルへのアクセスとチャット完了に必要な`foundry-local-sdk`と`openai`をインストールします。冪等性があります。


# シナリオ
最小限の2エージェントオーケストレーターパターンを実装します：
- **研究者エージェント**は簡潔な事実を箇条書きで収集します
- **編集者エージェント**は経営層向けの明確な表現に書き換えます

各エージェントごとの共有メモリ、中間出力の順次受け渡し、そしてシンプルなパイプライン関数を示します。さらに役割（例：批評家、検証者）や並列分岐を追加して拡張可能です。

**環境変数:**
- `FOUNDRY_LOCAL_ALIAS` - 使用するデフォルトモデル（デフォルト: phi-4-mini）
- `AGENT_MODEL_PRIMARY` - 主エージェントモデル（ALIASを上書き）
- `AGENT_MODEL_EDITOR` - 編集者エージェントモデル（デフォルトは主エージェントモデル）

**SDKリファレンス:** https://github.com/microsoft/Foundry-Local/tree/main/sdk/python/foundry_local

**動作の仕組み:**
1. **FoundryLocalManager**がFoundry Localサービスを自動的に起動
2. 指定されたモデルをダウンロードしてロード（またはキャッシュ版を使用）
3. OpenAI互換のエンドポイントを提供して対話を可能に
4. 各エージェントが専門的なタスクに異なるモデルを使用可能
5. 内蔵のリトライロジックが一時的な障害を優雅に処理

**主な特徴:**
- ✅ 自動サービス検出と初期化
- ✅ モデルライフサイクル管理（ダウンロード、キャッシュ、ロード）
- ✅ OpenAI SDK互換性で馴染みのあるAPIを提供
- ✅ エージェント専門化のためのマルチモデル対応
- ✅ リトライロジックによる堅牢なエラー処理
- ✅ ローカル推論（クラウドAPI不要）


In [16]:
# Install dependencies
!pip install -q foundry-local-sdk openai

### 説明: コアインポートと型付け
エージェントメッセージの保存に使用するdataclassを導入し、型ヒントでコードの明確性を向上させます。後続のエージェントアクションのためにFoundry LocalマネージャーとOpenAIクライアントをインポートします。


In [17]:
from dataclasses import dataclass, field
from typing import List
import os
from foundry_local import FoundryLocalManager
from openai import OpenAI

### 説明: モデル初期化 (SDKパターン)
Foundry Local Python SDKを使用して、堅牢なモデル管理を実現します:
- **FoundryLocalManager(alias)** - サービスを自動的に開始し、エイリアスでモデルをロード
- **get_model_info(alias)** - エイリアスを具体的なモデルIDに解決
- **manager.endpoint** - OpenAIクライアント用のサービスエンドポイントを提供
- **manager.api_key** - APIキーを提供 (ローカル使用の場合はオプション)
- 異なるエージェント用に別々のモデルをサポート (プライマリ vs エディター)
- 指数バックオフを伴う組み込みのリトライロジックで耐障害性を向上
- サービスが準備完了していることを確認する接続検証

**主要なSDKパターン:**
```python
manager = FoundryLocalManager(alias)
model_info = manager.get_model_info(alias)
client = OpenAI(base_url=manager.endpoint, api_key=manager.api_key)
```

**ライフサイクル管理:**
- マネージャーはグローバルに保存され、適切にクリーンアップされる
- 各エージェントは専門化のために異なるモデルを使用可能
- 自動サービス検出と接続処理
- 障害時の指数バックオフによる優雅なリトライ

これにより、エージェントのオーケストレーションが始まる前に適切な初期化が保証されます。

**参考:** https://github.com/microsoft/Foundry-Local/tree/main/sdk/python/foundry_local


In [18]:
import time

# Environment configuration
PRIMARY_ALIAS = os.getenv('AGENT_MODEL_PRIMARY', os.getenv('FOUNDRY_LOCAL_ALIAS', 'phi-4-mini'))
EDITOR_ALIAS = os.getenv('AGENT_MODEL_EDITOR', PRIMARY_ALIAS)

# Store managers globally for proper lifecycle management
primary_manager = None
editor_manager = None

def init_model(alias: str, max_retries: int = 3):
    """Initialize Foundry Local manager with retry logic.
    
    Args:
        alias: Model alias to initialize
        max_retries: Number of retry attempts with exponential backoff
    
    Returns:
        Tuple of (manager, client, model_id, endpoint)
    """
    delay = 2.0
    last_err = None
    
    for attempt in range(1, max_retries + 1):
        try:
            print(f"[Init] Starting Foundry Local for '{alias}' (attempt {attempt}/{max_retries})...")
            
            # Initialize manager - this starts the service and loads the model
            manager = FoundryLocalManager(alias)
            
            # Get model info to retrieve the actual model ID
            model_info = manager.get_model_info(alias)
            model_id = model_info.id
            
            # Create OpenAI client with manager's endpoint
            client = OpenAI(
                base_url=manager.endpoint,
                api_key=manager.api_key or 'not-needed'
            )
            
            # Verify the connection with a simple test
            models = client.models.list()
            print(f"[OK] Initialized '{alias}' -> {model_id} at {manager.endpoint}")
            
            return manager, client, model_id, manager.endpoint
            
        except Exception as e:
            last_err = e
            if attempt < max_retries:
                print(f"[Retry {attempt}/{max_retries}] Failed to init '{alias}': {e}")
                print(f"[Retry] Waiting {delay:.1f}s before retry...")
                time.sleep(delay)
                delay *= 2
            else:
                print(f"[ERROR] Failed to initialize '{alias}' after {max_retries} attempts")
    
    raise RuntimeError(f"Failed to initialize '{alias}' after {max_retries} attempts: {last_err}")

# Initialize primary model (for researcher)
print(f"\n{'='*80}")
print(f"Initializing Primary Model: {PRIMARY_ALIAS}")
print('='*80)
primary_manager, primary_client, PRIMARY_MODEL_ID, primary_endpoint = init_model(PRIMARY_ALIAS)

# Initialize editor model (may be same as primary)
if EDITOR_ALIAS != PRIMARY_ALIAS:
    print(f"\n{'='*80}")
    print(f"Initializing Editor Model: {EDITOR_ALIAS}")
    print('='*80)
    editor_manager, editor_client, EDITOR_MODEL_ID, editor_endpoint = init_model(EDITOR_ALIAS)
else:
    print(f"\n[Info] Editor using same model as primary")
    editor_manager = primary_manager
    editor_client, EDITOR_MODEL_ID = primary_client, PRIMARY_MODEL_ID
    editor_endpoint = primary_endpoint

print(f"\n{'='*80}")
print(f"[Configuration Summary]")
print('='*80)
print(f"  Primary Agent:")
print(f"    - Alias: {PRIMARY_ALIAS}")
print(f"    - Model: {PRIMARY_MODEL_ID}")
print(f"    - Endpoint: {primary_endpoint}")
print(f"\n  Editor Agent:")
print(f"    - Alias: {EDITOR_ALIAS}")
print(f"    - Model: {EDITOR_MODEL_ID}")
print(f"    - Endpoint: {editor_endpoint}")
print('='*80)



Initializing Primary Model: phi-4-mini
[Init] Starting Foundry Local for 'phi-4-mini' (attempt 1/3)...
[OK] Initialized 'phi-4-mini' -> Phi-4-mini-instruct-cuda-gpu:4 at http://127.0.0.1:59959/v1

Initializing Editor Model: gpt-oss-20b
[Init] Starting Foundry Local for 'gpt-oss-20b' (attempt 1/3)...
[OK] Initialized 'gpt-oss-20b' -> gpt-oss-20b-cuda-gpu:1 at http://127.0.0.1:59959/v1

[Configuration Summary]
  Primary Agent:
    - Alias: phi-4-mini
    - Model: Phi-4-mini-instruct-cuda-gpu:4
    - Endpoint: http://127.0.0.1:59959/v1

  Editor Agent:
    - Alias: gpt-oss-20b
    - Model: gpt-oss-20b-cuda-gpu:1
    - Endpoint: http://127.0.0.1:59959/v1


### 説明: Agent & Memory クラス
軽量なメモリエントリ用の `AgentMsg` と、以下をカプセル化する `Agent` を定義します:
- **システムロール** - エージェントの人格と指示
- **メッセージ履歴** - 会話のコンテキストを保持
- **act() メソッド** - 適切なエラーハンドリングでアクションを実行

エージェントは異なるモデル（プライマリ vs エディター）を使用でき、エージェントごとに独立したコンテキストを維持します。このパターンにより以下が可能になります:
- アクション間でのメモリの永続性
- エージェントごとの柔軟なモデル割り当て
- エラーの分離と回復
- 簡単なチェーン化とオーケストレーション


In [19]:
@dataclass
class AgentMsg:
    role: str
    content: str

@dataclass
class Agent:
    name: str
    system: str
    client: OpenAI = None  # Allow per-agent client assignment
    model_id: str = None   # Allow per-agent model
    memory: List[AgentMsg] = field(default_factory=list)

    def _history(self):
        """Return chat history in OpenAI messages format including system + memory."""
        msgs = [{'role': 'system', 'content': self.system}]
        for m in self.memory[-6:]:  # Keep last 6 messages to avoid context overflow
            msgs.append({'role': m.role, 'content': m.content})
        return msgs

    def act(self, prompt: str, temperature: float = 0.4, max_tokens: int = 300):
        """Send a prompt, store user + assistant messages in memory, and return assistant text.
        
        Args:
            prompt: User input/task for the agent
            temperature: Sampling temperature (0.0-1.0)
            max_tokens: Maximum tokens to generate
        
        Returns:
            Assistant response text
        """
        # Use agent-specific client/model or fall back to primary
        client_to_use = self.client or primary_client
        model_to_use = self.model_id or PRIMARY_MODEL_ID
        
        self.memory.append(AgentMsg('user', prompt))
        
        try:
            # Build messages including system prompt and history
            messages = self._history() + [{'role': 'user', 'content': prompt}]
            
            resp = client_to_use.chat.completions.create(
                model=model_to_use,
                messages=messages,
                max_tokens=max_tokens,
                temperature=temperature,
            )
            
            # Validate response
            if not resp.choices:
                raise RuntimeError("No completion choices returned")
            
            out = resp.choices[0].message.content or ""
            
            if not out:
                raise RuntimeError("Empty response content")
            
        except Exception as e:
            out = f"[ERROR:{self.name}] {type(e).__name__}: {str(e)}"
            print(f"[Agent Error] {self.name}: {type(e).__name__}: {str(e)}")
        
        self.memory.append(AgentMsg('assistant', out))
        return out

print("[INFO] Agent classes initialized with Foundry SDK support")
print(f"[INFO] Using OpenAI SDK version: {OpenAI.__module__}")


[INFO] Agent classes initialized with Foundry SDK support
[INFO] Using OpenAI SDK version: openai


### 説明: オーケストレーションされたパイプライン
2つの専門的なエージェントを作成します:
- **リサーチャー**: 主なモデルを使用し、事実情報を収集
- **エディター**: 別のモデルを使用可能（設定されている場合）、情報を洗練し書き直し

`pipeline` 関数の流れ:
1. リサーチャーが生の情報を収集
2. エディターが経営層向けの完成度の高い出力に仕上げる
3. 中間結果と最終結果の両方を返す

このパターンの利点:
- モデルの専門化（役割ごとに異なるモデルを使用可能）
- 多段階処理による品質向上
- 情報変換の追跡可能性
- エージェントの追加や並列処理への簡単な拡張


In [None]:
# Create specialized agents with optional model assignment
researcher = Agent(
    name='Researcher',
    system='You collect concise factual bullet points.',
    client=primary_client,
    model_id=PRIMARY_MODEL_ID
)

editor = Agent(
    name='Editor',
    system='You rewrite content for clarity and an executive, action-focused tone.',
    client=editor_client,
    model_id=EDITOR_MODEL_ID
)

def pipeline(q: str, verbose: bool = True):
    """Execute multi-agent pipeline: Researcher -> Editor.
    
    Args:
        q: User question/task
        verbose: Print intermediate outputs
    
    Returns:
        Dictionary with research, final outputs, and metadata
    """
    if verbose:
        print(f"[Pipeline] Question: {q}\n")
    
    # Stage 1: Research
    if verbose:
        print("[Stage 1: Research]")
    research = researcher.act(q)
    if verbose:
        print(f"Output: {research[:200]}...\n")
    
    # Stage 2: Editorial refinement
    if verbose:
        print("[Stage 2: Editorial Refinement]")
    rewrite = editor.act(
        f"Rewrite professionally with a 1-sentence executive summary first. "
        f"Improve clarity, keep bullet structure if present. Source:\n{research}"
    )
    if verbose:
        print(f"Output: {rewrite[:200]}...\n")
    
    return {
        'question': q,
        'research': research,
        'final': rewrite,
        'models': {
            'researcher': PRIMARY_MODEL_ID,
            'editor': EDITOR_MODEL_ID
        }
    }

# Execute sample pipeline
print("="*80)
result = pipeline('Explain why edge AI matters for compliance and latency.')
print("="*80)
print("\n[FINAL OUTPUT]")
print(result['final'])
print("\n[METADATA]")
print(f"Models used: {result['models']}")
result

[Pipeline] Question: Explain why edge AI matters for compliance and latency.

[Stage 1: Research]
Output: - **Data Sovereignty**: Edge AI allows data to be processed locally, which can help organizations comply with regional data protection regulations by keeping sensitive information within the borders o...

[Stage 2: Editorial Refinement]


### 説明: パイプラインの実行と結果
コンプライアンスと遅延に関するテーマの質問に対して、マルチエージェントパイプラインを実行し、以下を示します:
- 情報の多段階変換
- エージェントの専門性と協力
- 洗練による出力品質の向上
- トレーサビリティ（中間出力と最終出力の両方を保持）

**結果構造:**
- `question` - ユーザーの元の質問
- `research` - 生の調査結果（事実の箇条書き）
- `final` - 洗練されたエグゼクティブサマリー
- `models` - 各段階で使用されたモデル

**拡張アイデア:**
1. 品質レビューのためのCriticエージェントを追加
2. 異なる側面に対する並列調査エージェントを実装
3. 事実確認のためのVerifierエージェントを追加
4. 異なる複雑さのレベルに応じて異なるモデルを使用
5. 反復的な改善のためのフィードバックループを実装


### 上級編: カスタムエージェント設定

初期化セルを実行する前に環境変数を変更して、エージェントの動作をカスタマイズしてみましょう:

**利用可能なモデル:**
- ターミナルで `foundry model ls` を使用して、利用可能なすべてのモデルを確認できます
- 例: phi-4-mini, phi-3.5-mini, qwen2.5-7b, llama-3.2-3b など


In [None]:
# Example: Use different models for different agents
# Uncomment and modify as needed:

# import os
# os.environ['AGENT_MODEL_PRIMARY'] = 'phi-4-mini'      # Fast, good for research
# os.environ['AGENT_MODEL_EDITOR'] = 'qwen2.5-7b'       # Higher quality for editing

# Then restart the kernel and re-run all cells

# Test with different questions
test_questions = [
    "What are 3 key benefits of using small language models?",
    "How does RAG improve AI accuracy?",
    "Why is local inference important for privacy?"
]

print("Testing pipeline with multiple questions:\n")
for i, q in enumerate(test_questions, 1):
    print(f"\n{'='*80}")
    print(f"Question {i}: {q}")
    print('='*80)
    r = pipeline(q, verbose=False)
    print(f"\n[FINAL]: {r['final'][:300]}...")
    print(f"[Models]: Researcher={r['models']['researcher']}, Editor={r['models']['editor']}")


Testing pipeline with multiple questions:


Question 1: What are 3 key benefits of using small language models?

[FINAL]: <|channel|>analysis<|message|>The user wants a rewrite of the entire block of text. The rewrite should be professional, include a one-sentence executive summary first, improve clarity, keep bullet structure if present. The user has provided a large amount of text. The user wants a rewrite of that te...
[Models]: Researcher=Phi-4-mini-instruct-cuda-gpu:4, Editor=gpt-oss-20b-cuda-gpu:1

Question 2: How does RAG improve AI accuracy?

[FINAL]: <|channel|>final<|message|>**RAG (Retrieval‑Augmented Generation) empowers AI to produce highly accurate, contextually relevant responses by combining a retrieval system with a large language model (LLM).**<|return|>...
[Models]: Researcher=Phi-4-mini-instruct-cuda-gpu:4, Editor=gpt-oss-20b-cuda-gpu:1

Question 3: Why is local inference important for privacy?

[FINAL]: <|channel|>final<|message|>**Local inference—processing data d


---

**免責事項**:  
この文書は、AI翻訳サービス [Co-op Translator](https://github.com/Azure/co-op-translator) を使用して翻訳されています。正確性を追求しておりますが、自動翻訳には誤りや不正確な部分が含まれる可能性があることをご承知ください。元の言語で記載された文書が正式な情報源とみなされるべきです。重要な情報については、専門の人間による翻訳を推奨します。この翻訳の使用に起因する誤解や誤解釈について、当方は一切の責任を負いません。
