<a href="https://colab.research.google.com/github/m10k1/anthropic-cookbook/blob/main/orchestrator_workers.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install anthropic

Collecting anthropic
  Downloading anthropic-0.42.0-py3-none-any.whl.metadata (23 kB)
Downloading anthropic-0.42.0-py3-none-any.whl (203 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.4/203.4 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: anthropic
Successfully installed anthropic-0.42.0


In [2]:
from google.colab import drive
mount_path = '/content/drive'
drive.mount(mount_path)

import os
import sys
sys.path.append(os.path.join(mount_path,'MyDrive/python'))


Mounted at /content/drive


In [3]:
from google.colab import userdata
os.environ["ANTHROPIC_API_KEY"] = userdata.get('ANTHROPIC_API_KEY')

In [7]:
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Optional
from util import llm_call, extract_xml

# Orchestrator-Workers Workflow

このワークフローでは中央LLMが動的にタスクをブレイクダウンして、タスクをワーカーLLMへ委譲します。そしてその結果を合成します。

## このワークフローの使いどころ

このワークフローは、必要なサブタスクが予測できない複雑なタスクに適している。

単純な並列化との決定的な違いは、その柔軟性です。サブタスクは事前に定義されているのではなく、特定の入力に基づいてオーケストレーターが決定します。


![Orchestrator-workers](https://www.anthropic.com/_next/image?url=https%3A%2F%2Fwww-cdn.anthropic.com%2Fimages%2F4zrzovbb%2Fwebsite%2F8985fc683fae4780fb34eab1365ab78c7e51bc8e-2401x1000.png&w=3840&q=75)



結果のXMLを解析するためのパーサー
タスクの配列を返す


In [8]:
def parse_tasks(tasks_xml: str) -> List[Dict]:
    """Parse XML tasks into a list of task dictionaries."""
    tasks = []
    current_task = {}

    for line in tasks_xml.split('\n'):
        line = line.strip()
        if not line:
            continue

        if line.startswith("<task>"):
            current_task = {}
        elif line.startswith("<type>"):
            current_task["type"] = line[6:-7].strip()
        elif line.startswith("<description>"):
            current_task["description"] = line[12:-13].strip()
        elif line.startswith("</task>"):
            if "description" in current_task:
                if "type" not in current_task:
                    current_task["type"] = "default"
                tasks.append(current_task)

    return tasks

In [9]:
class FlexibleOrchestrator:
    """Break down tasks and run them in parallel using worker LLMs."""

    def __init__(
        self,
        orchestrator_prompt: str,
        worker_prompt: str,
    ):
        """
        プロンプトのテンプレートを初期化する
        """
        self.orchestrator_prompt = orchestrator_prompt
        self.worker_prompt = worker_prompt

    def _format_prompt(self, template: str, **kwargs) -> str:
        """
        テンプレートに値を流し込んでプロンプトを作成
        """
        try:
            return template.format(**kwargs)
        except KeyError as e:
            raise ValueError(f"Missing required prompt variable: {e}")

    def process(self, task: str, context: Optional[Dict] = None) -> Dict:
        """
        ブレイクダウンしてタスクを処理する。
        サブタスクは並列に実行される。
        Process task by breaking it down and running subtasks in parallel.

        """
        context = context or {}

        # Step 1: Get orchestrator response
        # オーケストレイターのレスポンスを取得

        orchestrator_input = self._format_prompt(
            self.orchestrator_prompt,
            task=task,
            **context
        )
        orchestrator_response = llm_call(orchestrator_input)

        # オーケストレイターの応答を処理
        analysis = extract_xml(orchestrator_response, "analysis")
        tasks_xml = extract_xml(orchestrator_response, "tasks")
        tasks = parse_tasks(tasks_xml)

        print("\n=== ORCHESTRATOR OUTPUT ===")
        print(f"\nANALYSIS:\n{analysis}")
        print(f"\nTASKS:\n{tasks}")

        # Step 2: タスクそれぞれを処理
        worker_results = []
        for task_info in tasks:
            worker_input = self._format_prompt(
                self.worker_prompt,
                original_task=task,
                task_type=task_info['type'],
                task_description=task_info['description'],
                **context
            )

            worker_response = llm_call(worker_input)
            result = extract_xml(worker_response, "response")

            worker_results.append({
                "type": task_info["type"],
                "description": task_info["description"],
                "result": result
            })

            print(f"\n=== WORKER RESULT ({task_info['type']}) ===\n{result}\n")

        return {
            "analysis": analysis,
            "worker_results": worker_results,
        }

## 使用例： マーケティングバリエーション生成

プロンプ ： オーケストレーター

```
このタスクを分析し、2-3の明確なアプローチに分けなさい：

タスク {タスク｝

この形式で回答を返してください：

<analysis>
タスクに対するあなたの理解と、どのバリエーションが価値があるかを説明してください。
各アプローチがタスクの異なる側面にどのように役立つかに注目してください。
</analysis>

<tasks>
    <task>
    <type>フォーマル</type>
      <description>仕様を強調した正確で技術的なバージョンを書く</description>。
      </task>
    <task>
      <type>会話型</type>
      <description>読者とつながる、魅力的でフレンドリーなバージョンを書く</description>
    </task>
</tasks>
```

In [10]:
ORCHESTRATOR_PROMPT = """
Analyze this task and break it down into 2-3 distinct approaches:

Task: {task}

Return your response in this format:

<analysis>
Explain your understanding of the task and which variations would be valuable.
Focus on how each approach serves different aspects of the task.
</analysis>

<tasks>
    <task>
    <type>formal</type>
    <description>Write a precise, technical version that emphasizes specifications</description>
    </task>
    <task>
    <type>conversational</type>
    <description>Write an engaging, friendly version that connects with readers</description>
    </task>
</tasks>
"""



ワーカープロンプトの中身
```
次に基づいてコンテンツを生成する：
タスク： {original_task｝
スタイル {task_type}
ガイドライン {task_description}

この形式で応答を返します：

<response>
指定されたスタイルを維持し、要件に完全に対応した内容をここに記述してください。
</response>
```

In [11]:
WORKER_PROMPT = """
Generate content based on:
Task: {original_task}
Style: {task_type}
Guidelines: {task_description}

Return your response in this format:

<response>
Your content here, maintaining the specified style and fully addressing requirements.
</response>
"""

プロンプトの実行
```
ここでは、タスクを"新しい環境にやさしい水筒の商品説明を書く"
コンテキストは
対象オーディエンス： 環境に敏感なミレニアル世代
キーとなる特長: プラスチックフリー、断熱、永久保証
```

In [12]:
orchestrator = FlexibleOrchestrator(
    orchestrator_prompt=ORCHESTRATOR_PROMPT,
    worker_prompt=WORKER_PROMPT,
)

results = orchestrator.process(
    task="Write a product description for a new eco-friendly water bottle",
    context={
        "target_audience": "environmentally conscious millennials",
        "key_features": ["plastic-free", "insulated", "lifetime warranty"]
    }
)


=== ORCHESTRATOR OUTPUT ===

ANALYSIS:

This task requires creating marketing copy for an eco-friendly water bottle, which presents multiple angles for effective communication. The key challenge is balancing environmental benefits with practical features while maintaining appeal to different consumer segments.

Key variations would be valuable because:
1. Technical buyers need specific details about materials and environmental impact
2. Lifestyle-focused consumers respond better to emotional benefits and storytelling
3. Different tones can target distinct market segments while promoting the same core product

The technical approach serves those who make purchase decisions based on specifications and measurable impact, while the conversational approach connects with those who buy based on lifestyle alignment and emotional resonance.


TASKS:
[{'type': 'formal', 'description': '>Create a specification-focused description highlighting material composition, environmental certifications, c

In [None]:
results

## 出力結果


***形式的な説明 (formal):** 素材の構成、環境認証、容量の測定、数値で示される環境への影響（例：「年間で削減されるペットボトルの本数」）を強調した仕様重視の説明を作成する。製造プロセスやリサイクル能力についての技術的な詳細を含める。

**会話調の説明 (conversational):** ライフスタイルの利点、環境保護への感情的な結びつき、日常的な使用シナリオを強調したナラティブスタイルの説明を開発する。エコ意識のあるライフスタイルにどのように適合し、環境への貢献についてユーザーに良い気持ちを与えるかに焦点を当てる。

**ハイブリッド (hybrid):** 感情的な訴求と重要な仕様をバランスよく組み合わせたアプローチで説明を作成する。技術的な特徴を説明するために関連性のある例を使用し、統計的な影響とストーリー要素を含む。



