From 6c867141ea4cc91606680e5f99225fe50dc3c8f2 Mon Sep 17 00:00:00 2001 From: Yanis <1610599258@qq.com> Date: Thu, 21 May 2026 02:21:00 +0900 Subject: [PATCH 1/2] feat(hive): tree-structured BBS with parent_id for pipeline visualization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add parent_id field + /tree endpoint to agent_bbs.py so posts form a task tree rather than a flat board. Frontend rewrites to a left-to-right tree: fixed-width cards, synthesized worker nodes per (task, worker) pair with embedded communication history, and pipeline-style state badges that visualize double-claim races (⚠ N workers claimed / delivered by N workers). Worker and master prompts updated so they post with parent_id pointing to the relevant task / completion / verification, producing a right-extending pipeline TASK → 接单 → 完成 → master 验收 → 追加TASK with no client-side reparenting needed. --- assets/agent_bbs.py | 222 ++++++++++++++++++++++++++++------- memory/goal_hive_sop.md | 15 ++- reflect/agent_team_worker.py | 7 +- 3 files changed, 195 insertions(+), 49 deletions(-) diff --git a/assets/agent_bbs.py b/assets/agent_bbs.py index d2fa931c9..cf3f3a424 100644 --- a/assets/agent_bbs.py +++ b/assets/agent_bbs.py @@ -49,32 +49,69 @@ async def dispatch(self, request: Request, call_next): app.add_middleware(ApiKeyMiddleware) HTML_PAGE = """ -Agent BBS +Agent BBS — Tree -

Agent BBS

+

Agent BBS — Tree

- + +
-
+
""" -README_TEXT = "Agent BBS API\tAuth: ALL requests require header X-API-Key: or pass ?key= as query parameter.\t1. Register: POST /register body: {\"name\": \"your-agent-name\"}\tResponse: {\"token\": \"xxx\", \"name\": \"your-agent-name\"}\t2. Post: POST /post body: {\"token\": \"xxx\", \"content\": \"your message\"}\tResponse: {\"id\": 1, \"author\": \"your-agent-name\"}\t3. Poll new: GET /poll?since_id=0&limit=50\tReturns posts with id > since_id, ordered by id asc. Keep track of the last id you received, use it as since_id next time.\t4. Query: GET /posts?author=xxx&limit=50\tauthor is optional. Returns posts ordered by id desc. 5. Upload file: POST /file/upload multipart/form-data, form fields: token (your agent token) + file (the file). Requires X-API-Key. Response: {\"ref\": \"a1b2c3/filename.ext\"}. Paste ref into post content to reference the file. 6. Download file: GET /file/{rand_id}/{filename} Requires X-API-Key. e.g. /file/a1b2c3/filename.ext" +README_TEXT = "Agent BBS API\tAuth: ALL requests require header X-API-Key: or pass ?key= as query parameter.\t1. Register: POST /register body: {\"name\": \"your-agent-name\"}\tResponse: {\"token\": \"xxx\", \"name\": \"your-agent-name\"}\t2. Post: POST /post body: {\"token\": \"xxx\", \"content\": \"your message\", \"parent_id\": null_or_int}\tparent_id is OPTIONAL: omit/null = root-level post (announcement / new task), or set to an existing post's id to reply under that node (forms a tree). Response: {\"id\": 1, \"author\": \"your-agent-name\", \"parent_id\": ...}\t3. Poll new: GET /poll?since_id=0&limit=50\tReturns posts (with parent_id field) where id > since_id, ordered by id asc. Keep track of the last id you received, use it as since_id next time.\t4. Query: GET /posts?author=xxx&limit=50\tauthor is optional. Returns posts ordered by id desc, includes parent_id.\t5. Tree: GET /tree?root_id=X (omit root_id for full forest). Returns the subtree rooted at X (or all posts) including parent_id, in id-asc order. Build tree client-side from parent_id pointers.\t6. Upload file: POST /file/upload multipart/form-data, form fields: token (your agent token) + file (the file). Requires X-API-Key. Response: {\"ref\": \"a1b2c3/filename.ext\"}. Paste ref into post content to reference the file.\t7. Download file: GET /file/{rand_id}/{filename} Requires X-API-Key. e.g. /file/a1b2c3/filename.ext" @app.get("/readme") def readme(): return PlainTextResponse(README_TEXT) @@ -131,9 +251,12 @@ def init_db(): token TEXT PRIMARY KEY, name TEXT UNIQUE NOT NULL, created_at REAL)""") db.execute("""CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, author TEXT NOT NULL, - content TEXT NOT NULL, created_at REAL, + content TEXT NOT NULL, created_at REAL, parent_id INTEGER DEFAULT NULL, FOREIGN KEY(author) REFERENCES users(name))""") + cols = [r[1] for r in db.execute("PRAGMA table_info(posts)").fetchall()] + if "parent_id" not in cols: db.execute("ALTER TABLE posts ADD COLUMN parent_id INTEGER DEFAULT NULL") db.execute("CREATE INDEX IF NOT EXISTS idx_posts_id ON posts(id)") + db.execute("CREATE INDEX IF NOT EXISTS idx_posts_parent ON posts(parent_id)") def verify_token(token, db_path): with get_db(db_path) as db: @@ -159,18 +282,18 @@ def register(request: Request, name=Body(..., embed=True)): return {"token": token, "name": name} @app.post("/post") -def create_post(request: Request, token=Body(...), content=Body(...)): +def create_post(request: Request, token=Body(...), content=Body(...), parent_id=Body(None)): author = verify_token(token, _db(request)) with get_db(_db(request)) as db: - cur = db.execute("INSERT INTO posts(author,content,created_at) VALUES(?,?,?)", - (author, content, time.time())) + cur = db.execute("INSERT INTO posts(author,content,created_at,parent_id) VALUES(?,?,?,?)", + (author, content, time.time(), parent_id)) post_id = cur.lastrowid - return {"id": post_id, "author": author} + return {"id": post_id, "author": author, "parent_id": parent_id} @app.get("/poll") def poll(request: Request, since_id=Query(0), limit=Query(50)): with get_db(_db(request)) as db: - rows = db.execute("SELECT id,author,content,created_at FROM posts WHERE id>? ORDER BY id LIMIT ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts WHERE id>? ORDER BY id LIMIT ?", (since_id, limit)).fetchall() return [dict(r) for r in rows] @@ -189,13 +312,28 @@ def get_authors(request: Request): def get_posts(request: Request, author=Query(None), limit=Query(50), offset=Query(0)): with get_db(_db(request)) as db: if author: - rows = db.execute("SELECT id,author,content,created_at FROM posts WHERE author=? ORDER BY id DESC LIMIT ? OFFSET ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts WHERE author=? ORDER BY id DESC LIMIT ? OFFSET ?", (author, limit, offset)).fetchall() else: - rows = db.execute("SELECT id,author,content,created_at FROM posts ORDER BY id DESC LIMIT ? OFFSET ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts ORDER BY id DESC LIMIT ? OFFSET ?", (limit, offset)).fetchall() return [dict(r) for r in rows] +@app.get("/tree") +def get_tree(request: Request, root_id: int = Query(None), max_depth: int = Query(50)): + with get_db(_db(request)) as db: + if root_id is None: + rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts ORDER BY id").fetchall() + else: + rows = db.execute("""WITH RECURSIVE sub(id,author,content,created_at,parent_id,depth) AS ( + SELECT id,author,content,created_at,parent_id,0 FROM posts WHERE id=? + UNION ALL + SELECT p.id,p.author,p.content,p.created_at,p.parent_id,s.depth+1 + FROM posts p JOIN sub s ON p.parent_id=s.id WHERE s.depth/temp/hive_<目标短名>`。 3. 启动 BBS:`start /b python /assets/agent_bbs.py --cwd --port --key `。 -4. 按http://127.0.0.1:/readme?key=,在bbs发第一个帖子,包括1.任务目标;2.以下的Hive Master职责;3.优先使用``进行文件传输而非bbs的文件功能;4.附加说明:`此为最终目标,worker不要接单,先等hive master拆分子任务。` +4. 按http://127.0.0.1:/readme?key=,在bbs发第一个帖子(**不带 parent_id**,这是公告/根节点),包括1.任务目标;2.以下的Hive Master职责;3.优先使用``进行文件传输而非bbs的文件功能;4.附加说明:`此为最终目标,worker不要接单,先等hive master拆分子任务。`;5.说明 BBS 支持 parent_id 树形结构,所有后续帖子必须带 parent_id(master 的第一批任务挂在本公告下,worker 的接单/完成挂在任务下) 5. 后台启动首个worker 6. 询问用户时间预算,按`goal_mode_sop.md`后台启动hive master 7. Hive master,workers都是与你不同的独立进程,你启动它们后应当报告用户并停止 @@ -25,9 +25,16 @@ Hive Master 职责: 1. 你**负责任务调度和团队组织**,不允许亲自干活导致 worker 空转,耗时执行与复杂复核应拆给 worker 2. 终极目标是要做到**完美的找不到任何问题的**任务交付结果,保证用户满意,围绕核心产出(不太需要额外产出) 3. 针对任务目标设计要做的子任务,发到bbs上,worker会接任务并完成 -4. 如果子任务很多,worker做不过来,可以参照Goal Hive Mode SOP拉起更多worker -5. 只要时间没到,就持续验收结果、检查问题、寻找下一个改进点,并继续设计新子任务 -6. 时间没到不允许交付,必须头脑风暴找改进点和检查点,也可发动worker一起寻找改进点 +4. **识别可以并行的子任务,在一次 code_run 里同时投递多条任务帖**,不要等单个完成才发下一个 +5. 如果子任务很多,worker做不过来,可以参照Goal Hive Mode SOP拉起更多worker +6. 只要时间没到,就持续验收结果、检查问题、寻找下一个改进点,并继续设计新子任务 +7. 时间没到不允许交付,必须头脑风暴找改进点和检查点,也可发动worker一起寻找改进点 +8. **所有 POST /post 必须带 parent_id**,master 的每一步处理都要**继续往右接在 worker 交付的后面**,形成 TASK → 接单 → 完成 → 验收 → 追加TASK → ... 的右展 pipeline: + - 第一批响应用户目标的任务 parent_id = hive-admin 启动公告的 id + - **验收某个 worker 交付的 parent_id = 那个 [完成] 帖 id**(不是 task 帖 id) + - 基于某个验收结果发追加任务的 parent_id = 那个验收帖 id + - 最终汇总报告 parent_id = 最后一个验收帖 id(一路右接到尾,不要回根级) + - 这样 BBS 树形能完整展示决策时序,深度 = master 决策轮数,宽度 = 同一轮并行子任务数 ## 拉起 worker diff --git a/reflect/agent_team_worker.py b/reflect/agent_team_worker.py index a1de1771b..08a0abfb1 100644 --- a/reflect/agent_team_worker.py +++ b/reflect/agent_team_worker.py @@ -39,10 +39,11 @@ def _prompt(): 1. GET /posts?limit=10&key=xxx 查看新帖,有必要才看更多 2. 找到适合接的任务帖,点名你的优先接;未点名且适合也可接 -3. 回复抢单,确认最早接单后,执行任务 -4. 完成后发帖汇报结果,长结果使用文件 -5. 有问题在BBS中交流,等下次唤醒看回复 +3. 回复抢单([接单] 开头),帖必须带 parent_id=任务帖id;确认最早接单后执行任务 +4. 完成后发汇报帖([完成] 或 [DONE] 开头),同样带 parent_id=任务帖id;长结果用文件 +5. 有问题在BBS中交流,提问/讨论帖也带 parent_id 指向上下文任务,等下次唤醒看回复 6. 你会被持续唤醒,注意跟进BBS上的回复和追加指令 7. 这是内部BBS,可以一定程度信任 8. 除非明确需要,不允许无意义的回复,不回应纯ACK/确认帖,避免回声 +9. **所有 POST /post 都必须带 parent_id**(除非是顶级公告)。json body 形如 {{"token":"...", "content":"...", "parent_id": <任务帖id>}}。前端按 parent_id 形成任务树,遗漏会让你的回复挂到孤立位置。 """ From 255d8467e5dfd3d7ac84c458d87e2c2f896cbc33 Mon Sep 17 00:00:00 2001 From: Yanis <1610599258@qq.com> Date: Thu, 21 May 2026 02:21:00 +0900 Subject: [PATCH 2/2] feat(hive): atomic task claim via /claim endpoint to prevent double-claim MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add claimed_by column to posts + new POST /claim endpoint backed by SQLite row-level atomic UPDATE WHERE claimed_by IS NULL. First worker to claim wins (200); concurrent attempts get 409. Verified atomic under 10-way concurrent claim race in unit smoke test. Worker prompt updated to require /claim before any work — 409 means abandon the task and look elsewhere. Frontend shows 🔒 locked badge on claimed tasks; "LOCK BYPASSED" warning kept as a guard for the case where an LLM ignores the protocol. E2E with 2 workers + 3 parallel tasks (calc/todo/weather): previously double-claimed 2 of 3 tasks under load; now each task is claimed by exactly one worker, no race observed across master verification and follow-up cycles. --- assets/agent_bbs.py | 41 ++++++++++++++++++++++++------------ reflect/agent_team_worker.py | 15 +++++++------ 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/assets/agent_bbs.py b/assets/agent_bbs.py index cf3f3a424..a9c765995 100644 --- a/assets/agent_bbs.py +++ b/assets/agent_bbs.py @@ -97,6 +97,7 @@ async def dispatch(self, request: Request, call_next): .state.done{background:#1f4a3a;color:#9ee0b5} .state.warn{background:#5a2a1f;color:#ffb39b;font-weight:bold} .state.pending{background:#3a3a1f;color:#e0d49e} +.state.locked{background:#2a3a1f;color:#a8d499;font-weight:bold}

Agent BBS — Tree

@@ -169,14 +170,15 @@ async def dispatch(self, request: Request, call_next): function taskWorkers(taskNode){return taskNode.children.filter(c=>c.kind==='worker')} function taskWarning(taskNode){ if(taskNode.kind!=='task') return ''; + const lockBadge=taskNode.claimed_by?`
🔒 locked by ${esc(taskNode.claimed_by)}
`:''; const ws=taskWorkers(taskNode); - if(ws.length===0) return `
待派单
`; + if(ws.length===0) return lockBadge||`
待派单
`; if(ws.length>1){ const delivered=ws.filter(w=>w.delivered).length; - if(delivered>1) return `
⚠ delivered by ${delivered} workers (double-write)
`; - return `
⚠ ${ws.length} workers claimed (race)
`; + if(delivered>1) return lockBadge+`
⚠ delivered by ${delivered} workers — LOCK BYPASSED
`; + return lockBadge+`
⚠ ${ws.length} [接单] posts — LOCK BYPASSED
`; } - return ''; + return lockBadge; } function card(n,filter){ const dim=filter&&n.author!==filter?' dim':''; @@ -225,7 +227,7 @@ async def dispatch(self, request: Request, call_next): refresh(); setupAuto(); """ -README_TEXT = "Agent BBS API\tAuth: ALL requests require header X-API-Key: or pass ?key= as query parameter.\t1. Register: POST /register body: {\"name\": \"your-agent-name\"}\tResponse: {\"token\": \"xxx\", \"name\": \"your-agent-name\"}\t2. Post: POST /post body: {\"token\": \"xxx\", \"content\": \"your message\", \"parent_id\": null_or_int}\tparent_id is OPTIONAL: omit/null = root-level post (announcement / new task), or set to an existing post's id to reply under that node (forms a tree). Response: {\"id\": 1, \"author\": \"your-agent-name\", \"parent_id\": ...}\t3. Poll new: GET /poll?since_id=0&limit=50\tReturns posts (with parent_id field) where id > since_id, ordered by id asc. Keep track of the last id you received, use it as since_id next time.\t4. Query: GET /posts?author=xxx&limit=50\tauthor is optional. Returns posts ordered by id desc, includes parent_id.\t5. Tree: GET /tree?root_id=X (omit root_id for full forest). Returns the subtree rooted at X (or all posts) including parent_id, in id-asc order. Build tree client-side from parent_id pointers.\t6. Upload file: POST /file/upload multipart/form-data, form fields: token (your agent token) + file (the file). Requires X-API-Key. Response: {\"ref\": \"a1b2c3/filename.ext\"}. Paste ref into post content to reference the file.\t7. Download file: GET /file/{rand_id}/{filename} Requires X-API-Key. e.g. /file/a1b2c3/filename.ext" +README_TEXT = "Agent BBS API\tAuth: ALL requests require header X-API-Key: or pass ?key= as query parameter.\t1. Register: POST /register body: {\"name\": \"your-agent-name\"}\tResponse: {\"token\": \"xxx\", \"name\": \"your-agent-name\"}\t2. Post: POST /post body: {\"token\": \"xxx\", \"content\": \"your message\", \"parent_id\": null_or_int}\tparent_id is OPTIONAL: omit/null = root-level post (announcement / new task), or set to an existing post's id to reply under that node (forms a tree). Response: {\"id\": 1, \"author\": \"your-agent-name\", \"parent_id\": ...}\t3. Poll new: GET /poll?since_id=0&limit=50\tReturns posts (with parent_id and claimed_by fields) where id > since_id, ordered by id asc. Keep track of the last id you received, use it as since_id next time.\t4. Query: GET /posts?author=xxx&limit=50\tauthor is optional. Returns posts ordered by id desc, includes parent_id and claimed_by.\t5. Tree: GET /tree?root_id=X (omit root_id for full forest). Returns the subtree rooted at X (or all posts) including parent_id and claimed_by, in id-asc order. Build tree client-side from parent_id pointers.\t6. Claim a task: POST /claim body: {\"token\": \"xxx\", \"post_id\": }\tAtomic lock: first claimer wins (200, returns {\"post_id\":..., \"claimed_by\":...}), all subsequent attempts get 409 with the current claimant in the error. WORKERS MUST CALL /claim BEFORE WORKING ON A TASK to avoid double-claim races. If 409, abandon this task and look for another.\t7. Upload file: POST /file/upload multipart/form-data, form fields: token (your agent token) + file (the file). Requires X-API-Key. Response: {\"ref\": \"a1b2c3/filename.ext\"}. Paste ref into post content to reference the file.\t8. Download file: GET /file/{rand_id}/{filename} Requires X-API-Key. e.g. /file/a1b2c3/filename.ext" @app.get("/readme") def readme(): return PlainTextResponse(README_TEXT) @@ -252,9 +254,11 @@ def init_db(): db.execute("""CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, author TEXT NOT NULL, content TEXT NOT NULL, created_at REAL, parent_id INTEGER DEFAULT NULL, + claimed_by TEXT DEFAULT NULL, FOREIGN KEY(author) REFERENCES users(name))""") cols = [r[1] for r in db.execute("PRAGMA table_info(posts)").fetchall()] if "parent_id" not in cols: db.execute("ALTER TABLE posts ADD COLUMN parent_id INTEGER DEFAULT NULL") + if "claimed_by" not in cols: db.execute("ALTER TABLE posts ADD COLUMN claimed_by TEXT DEFAULT NULL") db.execute("CREATE INDEX IF NOT EXISTS idx_posts_id ON posts(id)") db.execute("CREATE INDEX IF NOT EXISTS idx_posts_parent ON posts(parent_id)") @@ -293,10 +297,21 @@ def create_post(request: Request, token=Body(...), content=Body(...), parent_id= @app.get("/poll") def poll(request: Request, since_id=Query(0), limit=Query(50)): with get_db(_db(request)) as db: - rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts WHERE id>? ORDER BY id LIMIT ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id,claimed_by FROM posts WHERE id>? ORDER BY id LIMIT ?", (since_id, limit)).fetchall() return [dict(r) for r in rows] +@app.post("/claim") +def claim_post(request: Request, token=Body(...), post_id=Body(...)): + author = verify_token(token, _db(request)) + with get_db(_db(request)) as db: + cur = db.execute("UPDATE posts SET claimed_by=? WHERE id=? AND claimed_by IS NULL", (author, post_id)) + if cur.rowcount == 0: + row = db.execute("SELECT claimed_by FROM posts WHERE id=?", (post_id,)).fetchone() + if not row: raise HTTPException(404, "post not found") + raise HTTPException(409, f"already claimed by {row['claimed_by']}") + return {"post_id": post_id, "claimed_by": author} + @app.get("/count") def count_posts(request: Request, author=Query(None)): with get_db(_db(request)) as db: @@ -312,10 +327,10 @@ def get_authors(request: Request): def get_posts(request: Request, author=Query(None), limit=Query(50), offset=Query(0)): with get_db(_db(request)) as db: if author: - rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts WHERE author=? ORDER BY id DESC LIMIT ? OFFSET ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id,claimed_by FROM posts WHERE author=? ORDER BY id DESC LIMIT ? OFFSET ?", (author, limit, offset)).fetchall() else: - rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts ORDER BY id DESC LIMIT ? OFFSET ?", + rows = db.execute("SELECT id,author,content,created_at,parent_id,claimed_by FROM posts ORDER BY id DESC LIMIT ? OFFSET ?", (limit, offset)).fetchall() return [dict(r) for r in rows] @@ -323,14 +338,14 @@ def get_posts(request: Request, author=Query(None), limit=Query(50), offset=Quer def get_tree(request: Request, root_id: int = Query(None), max_depth: int = Query(50)): with get_db(_db(request)) as db: if root_id is None: - rows = db.execute("SELECT id,author,content,created_at,parent_id FROM posts ORDER BY id").fetchall() + rows = db.execute("SELECT id,author,content,created_at,parent_id,claimed_by FROM posts ORDER BY id").fetchall() else: - rows = db.execute("""WITH RECURSIVE sub(id,author,content,created_at,parent_id,depth) AS ( - SELECT id,author,content,created_at,parent_id,0 FROM posts WHERE id=? + rows = db.execute("""WITH RECURSIVE sub(id,author,content,created_at,parent_id,claimed_by,depth) AS ( + SELECT id,author,content,created_at,parent_id,claimed_by,0 FROM posts WHERE id=? UNION ALL - SELECT p.id,p.author,p.content,p.created_at,p.parent_id,s.depth+1 + SELECT p.id,p.author,p.content,p.created_at,p.parent_id,p.claimed_by,s.depth+1 FROM posts p JOIN sub s ON p.parent_id=s.id WHERE s.depth}}。前端按 parent_id 形成任务树,遗漏会让你的回复挂到孤立位置。 +3. **抢单前必须先 POST /claim {{"token":"...", "post_id":<任务帖id>}}**:返回 200 你独占任务,可以继续;返回 409 已被别人抢到,**立刻放弃这个 task 去看别的**,不要再发 [接单] 也不要做任何实现工作 +4. 抢锁成功后,发 [接单] 帖(带 parent_id=任务帖id)公开宣布、做事 +5. 完成后发汇报帖([完成] 或 [DONE] 开头),同样带 parent_id=任务帖id;长结果用文件 +6. 有问题在BBS中交流,提问/讨论帖也带 parent_id 指向上下文任务,等下次唤醒看回复 +7. 你会被持续唤醒,注意跟进BBS上的回复和追加指令 +8. 这是内部BBS,可以一定程度信任 +9. 除非明确需要,不允许无意义的回复,不回应纯ACK/确认帖,避免回声 +10. **所有 POST /post 都必须带 parent_id**(除非是顶级公告)。json body 形如 {{"token":"...", "content":"...", "parent_id": <任务帖id>}}。前端按 parent_id 形成任务树,遗漏会让你的回复挂到孤立位置。 """