# 3-Agent Demo：需求 → 任务拍卖 → 并行推进 → 结果合并
- 角色：
  - Manager：需求拆解 + 合同网拍卖
  - Coder：写代码
  - Tester：写单测
- 通信：黑板（dict 模拟）
- 协调：合同网（投标-中标）

In [None]:
import time, random, json
blackboard = {}
def post(key, value):
    blackboard[key] = value
    print(f"[BLACKBOARD] {key} = {value}")

def bid(task, agent, cost):
    post(f"bid/{task}", {"agent": agent, "cost": cost})

## 1. 需求到达

In [None]:
requirement = "实现斐波那契函数 + 单元测试"
post("requirement", requirement)

## 2. Manager 拆解 & 招标

In [None]:
tasks = ["fib_code", "fib_test"]
for t in tasks:
    post(f"task/{t}/status", "open")
    print(f"[MANAGER] 招标：{t}")

## 3. Agent 投标

In [None]:
bid("fib_code", "Coder", cost=random.randint(1,3))
bid("fib_test", "Tester", cost=random.randint(1,3))

## 4. Manager 评标 & 中标

In [None]:
winner = {}
for t in tasks:
    b = blackboard.get(f"bid/{t}")
    winner[t] = b['agent']
    post(f"task/{t}/winner", b['agent'])
    print(f"[MANAGER] {t} 中标：{b['agent']}")

## 5. Agent 并行执行

In [None]:
if winner.get("fib_code") == "Coder":
    code = "def fib(n): return n if n<2 else fib(n-1)+fib(n-2)"
    time.sleep(1)
    post("artifact/fib.py", code)
    print("[CODER] 交付 fib.py")

if winner.get("fib_test") == "Tester":
    test = "assert fib(5)==5, 'fail'"
    time.sleep(1)
    post("artifact/test_fib.py", test)
    print("[TESTER] 交付 test_fib.py")

## 6. 结果合并 & 验证

In [None]:
exec(blackboard["artifact/fib.py"])
exec(blackboard["artifact/test_fib.py"])
print("[MANAGER] 集成完成，测试通过 ✅")

## 7. 看黑板总览
运行后打印：

In [None]:
json.dumps(blackboard, indent=2, ensure_ascii=False)