diff --git a/apps/chatgpt/src/index.ts b/apps/chatgpt/src/index.ts index df15d58..4607b51 100644 --- a/apps/chatgpt/src/index.ts +++ b/apps/chatgpt/src/index.ts @@ -24,13 +24,15 @@ import { runTask, checkTask, listSessions, + listTasks, + getSession, agentBlurb, agentDescriptor, searchMemory, getMemory, listCollections, } from "@clawconnect/core"; -import type { AgentEntry, AgentRegistry, CheckMode } from "@clawconnect/core"; +import type { AgentEntry, AgentRegistry, CheckMode, TaskSummary } from "@clawconnect/core"; // Widget UI is temporarily disabled to keep the surface focused on // run_task / check_task. Re-enable by restoring the widget imports and @@ -144,6 +146,14 @@ function resolveScope(url: URL): Scope { return { allowedIds: allowed, defaultId, serverName }; } +/** Non-terminal task statuses — tasks that still need attention. */ +const ACTIVE_STATUSES: ReadonlySet = new Set([ + "queued", + "running", + "blocked", + "needs-human", +]); + const AGENTS_BY_ID = new Map(registry.agents.map((a) => [a.id, a])); function blurbsFor(ids: string[]): string { @@ -296,6 +306,64 @@ Pass the jobId returned by run_task. Available agents: ${list}.`, inputSchema: { type: "object", properties: {} }, annotations: { title: "List Collections", readOnlyHint: true, idempotentHint: true, openWorldHint: false }, }, + { + name: "list_tasks", + description: `List manager-friendly task summaries across agents. This is task-level coordination (what needs attention), not low-level session debugging.`, + inputSchema: { + type: "object", + properties: { + view: { + type: "string", + enum: ["active", "all"], + description: 'Optional preset. "active" returns non-terminal tasks (queued, running, blocked, needs-human) that still need attention.', + }, + }, + }, + annotations: { title: "List Tasks", readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + }, + { + name: "get_task", + description: `Inspect a task by taskId/jobId with a detail preset controlling which fields are returned.`, + inputSchema: { + type: "object", + properties: { + taskId: { type: "string", description: "Task identifier (same as jobId in v1)" }, + detail: { + type: "string", + enum: ["core", "summary", "updates", "artifacts", "diagnostics", "full", "fullWithDiagnostics"], + description: + 'Detail preset. Omit for summary. core=ids+status only; summary=+summary; updates=+logs; artifacts=+artifacts; diagnostics=+error info; full=core+summary+updates+artifacts; fullWithDiagnostics=full+diagnostics', + }, + mode: { + type: "string", + enum: ["poll", "wait"], + description: 'Uses check semantics: "wait" blocks up to timeout; "poll" returns on updates', + }, + }, + required: ["taskId"], + }, + annotations: { title: "Get Task", readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + }, + { + name: "get_session", + description: `Inspect one session for debugging ("what exactly happened?"). Use mode="snapshot" for current state, "events" for bounded event retrieval, or "tail" for cursor-based tailing.`, + inputSchema: { + type: "object", + properties: { + sessionId: { type: "string", description: "Session key to inspect" }, + mode: { + type: "string", + enum: ["snapshot", "events", "tail"], + description: "Inspection mode: snapshot (default), events, or tail", + }, + limit: { type: "number", description: "Max events to return for events/tail modes (1–200)" }, + after: { type: "number", description: "Zero-based event cursor; for tail mode use returned nextAfter" }, + agent: { ...agentProp, description: `${agentProp.description} Usually inferred from sessionId.` }, + }, + required: ["sessionId"], + }, + annotations: { title: "Get Session", readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + }, ]; } @@ -515,6 +583,96 @@ const server = createServer(async (req, res) => { content: [{ type: "text", text: JSON.stringify({ collections }) }], structuredContent: { collections }, }); + } else if (name === "list_tasks") { + const tasks = listTasks(pool); + const scoped = tasks.filter((t) => !t.agent || scope.allowedIds.includes(t.agent)); + const view = typeof args.view === "string" ? args.view : undefined; + const filtered = view === "active" ? scoped.filter((t) => ACTIVE_STATUSES.has(t.status)) : scoped; + respond({ + content: [{ type: "text", text: JSON.stringify({ tasks: filtered }) }], + structuredContent: { tasks: filtered }, + }); + } else if (name === "get_task") { + const taskId = typeof args.taskId === "string" ? args.taskId : ""; + const detail = typeof args.detail === "string" ? args.detail : undefined; + const mode = (typeof args.mode === "string" ? args.mode : undefined) as CheckMode | undefined; + const result = await checkTask(pool, { jobId: taskId, mode: mode ?? "wait" }); + + if (!result.found) { + respond({ + content: [{ type: "text", text: "Task not found. The server may have restarted." }], + structuredContent: { taskId, status: "error", error: "Task not found." }, + isError: true, + }); + } else if (result.snapshot.agent && !scope.allowedIds.includes(result.snapshot.agent)) { + respond({ + content: [{ type: "text", text: "Task not found." }], + structuredContent: { taskId, status: "error", error: "Task not found." }, + isError: true, + }); + } else { + const s = result.snapshot; + const d = detail ?? "summary"; + const has = (field: string) => d === field || d === "full" || d === "fullWithDiagnostics"; + const payload: Record = { + taskId: s.jobId, + jobId: s.jobId, + sessionKey: s.sessionKey, + agent: s.agent, + status: s.status, + startedAt: s.startedAt, + lastEventAt: s.lastEventAt, + }; + if (d === "summary" || has("summary")) { + payload.summary = s.summary; + } + if (has("updates")) { + payload.updates = s.logs; + } + if (has("artifacts")) { + payload.artifacts = s.artifacts; + } + if (d === "diagnostics" || d === "fullWithDiagnostics") { + payload.diagnostics = { error: s.error, errorInfo: s.errorInfo, continuationState: s.continuationState }; + } + respond({ + content: [{ type: "text", text: JSON.stringify(payload) }], + structuredContent: payload, + ...(result.isError ? { isError: true } : {}), + }); + } + } else if (name === "get_session") { + const sessionId = typeof args.sessionId === "string" ? args.sessionId : ""; + const sessionMode = typeof args.mode === "string" ? args.mode : undefined; + const limit = args.limit !== undefined ? Number(args.limit) : undefined; + const after = args.after !== undefined ? Number(args.after) : undefined; + const sessionAgent = typeof args.agent === "string" && args.agent ? args.agent : undefined; + + if (sessionAgent && !scope.allowedIds.includes(sessionAgent)) { + respond({ + content: [{ type: "text", text: `Agent "${sessionAgent}" is not available on this connection.` }], + isError: true, + }); + } else { + const result = getSession(pool, { sessionId, mode: sessionMode as any, limit, after, agent: sessionAgent }); + if (!result.found) { + respond({ + content: [{ type: "text", text: "Session not found." }], + structuredContent: { sessionId, found: false }, + isError: true, + }); + } else if (result.agent && !scope.allowedIds.includes(result.agent)) { + respond({ + content: [{ type: "text", text: "Session not found." }], + isError: true, + }); + } else { + respond({ + content: [{ type: "text", text: JSON.stringify(result) }], + structuredContent: result, + }); + } + } } else { respondError(-32601, `Unknown tool: ${name}`); } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9ed0a98..b852a59 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -7,7 +7,7 @@ export { searchMemory, getMemory, listCollections, DEFAULT_QMD_URL } from "./mem export type { MemorySearchHit, MemorySearchResult, GetMemoryResult, SearchMemoryOpts, CollectionListing } from "./memory.ts"; export { classifyError } from "./errors.ts"; export { emptyArtifacts, processEvent, extractPatternsFromSummary, deriveNextStep } from "./artifacts.ts"; -export { runTask, checkTask, listSessions } from "./tools.ts"; +export { runTask, checkTask, listSessions, listTasks, getSession } from "./tools.ts"; export type { Artifacts, CheckMode, @@ -23,5 +23,9 @@ export type { JobStatus, LogEntry, RunTaskResult, + SessionInspectMode, + SessionInspectResult, TaskInput, + TaskSummary, + TaskStatus, } from "./types.ts"; diff --git a/packages/core/src/tools.ts b/packages/core/src/tools.ts index 6b2c650..36c7bc1 100644 --- a/packages/core/src/tools.ts +++ b/packages/core/src/tools.ts @@ -4,6 +4,9 @@ import type { CheckTaskResult, ContinuationState, RunTaskResult, + SessionInspectMode, + SessionInspectResult, + TaskSummary, TaskInput, } from "./types.ts"; @@ -18,12 +21,52 @@ export function runTask(pool: GatewayPool, input: TaskInput): RunTaskResult { pool.rememberJob(job.jobId, entry.agent.id); return { jobId: job.jobId, + taskId: job.jobId, sessionKey: job.sessionKey, status: "running", agent: entry.agent.id, }; } +function mapTaskStatus(status: string): TaskSummary["status"] { + if (status === "running") return "running"; + if (status === "completed" || status === "completed_no_summary") return "done"; + if (status === "needs-human") return "needs-human"; + if (status === "blocked") return "blocked"; + if (status === "queued") return "queued"; + return "failed"; +} + +function deriveTaskStatus(job: { status: string; error?: string; artifacts: { needsHumanDecision: boolean } }): TaskSummary["status"] { + if (job.status === "running") return "running"; + if (job.status === "completed" || job.status === "completed_no_summary") return "done"; + if (job.artifacts.needsHumanDecision) return "needs-human"; + if (job.error?.includes("session busy")) return "blocked"; + return mapTaskStatus(job.status); +} + +export function listTasks(pool: GatewayPool): TaskSummary[] { + const items: TaskSummary[] = []; + for (const entry of pool.allEntries()) { + for (const session of entry.sessions.listSessions()) { + const job = entry.sessions.getLatestJobForSession(session.sessionKey); + if (!job) continue; + items.push({ + taskId: job.jobId, + jobId: job.jobId, + sessionKey: job.sessionKey, + agent: entry.agent.id, + status: deriveTaskStatus(job), + startedAt: job.startedAt, + lastEventAt: job.lastEventAt, + summary: job.summary, + error: job.error, + }); + } + } + return items; +} + function notFound(): CheckTaskResult { return { found: false }; } @@ -71,3 +114,39 @@ export function listSessions(pool: GatewayPool): ContinuationState[] { return all; } +export function getSession( + pool: GatewayPool, + opts: { sessionId: string; mode?: SessionInspectMode; limit?: number; after?: number; agent?: string }, +): SessionInspectResult { + let entry = opts.agent ? pool.forAgent(opts.agent) : pool.forSession(opts.sessionId); + if (!entry) { + for (const candidate of pool.allEntries()) { + if (candidate.sessions.getSessionState(opts.sessionId)) { + entry = candidate; + break; + } + } + } + if (!entry) return { found: false }; + const job = entry.sessions.getLatestJobForSession(opts.sessionId); + if (!job) return { found: false }; + + const mode = opts.mode ?? "snapshot"; + const limit = Math.max(1, Math.min(200, opts.limit ?? 50)); + const after = Math.max(0, opts.after ?? 0); + const events = job.logs.slice(after, after + limit); + + return { + found: true, + sessionKey: job.sessionKey, + agent: entry.agent.id, + jobId: job.jobId, + status: job.status, + startedAt: job.startedAt, + lastEventAt: job.lastEventAt, + summary: job.summary, + error: job.error, + ...(mode === "snapshot" ? {} : { events }), + ...(mode === "tail" ? { nextAfter: after + events.length } : {}), + }; +} diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 7491d43..1e1366d 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -37,6 +37,7 @@ export type ErrorInfo = { export type LogEntry = { ts: number; type: string; text: string }; export type JobStatus = "running" | "completed" | "completed_no_summary" | "error"; +export type TaskStatus = "queued" | "running" | "blocked" | "needs-human" | "done" | "failed"; export type Job = { jobId: string; @@ -111,12 +112,43 @@ export type CheckMode = "poll" | "wait"; export type RunTaskResult = { jobId: string; + taskId?: string; sessionKey: string; status: "running"; /** ClawConnect agent alias the task was dispatched to. */ agent?: string; }; +export type TaskSummary = { + taskId: string; + jobId: string; + sessionKey: string; + agent?: string; + status: TaskStatus; + startedAt: number; + lastEventAt: number; + summary?: string; + error?: string; +}; + +export type SessionInspectMode = "snapshot" | "events" | "tail"; + +export type SessionInspectResult = + | { found: false } + | { + found: true; + sessionKey: string; + agent?: string; + jobId: string; + status: JobStatus; + startedAt: number; + lastEventAt: number; + summary?: string; + error?: string; + events?: LogEntry[]; + nextAfter?: number; + }; + export type CheckTaskOpts = { jobId?: string; sessionKey?: string; diff --git a/packages/mcp/src/server.ts b/packages/mcp/src/server.ts index 8f4e3e6..18e529f 100644 --- a/packages/mcp/src/server.ts +++ b/packages/mcp/src/server.ts @@ -4,6 +4,8 @@ import { GatewayPool, runTask, checkTask, + getSession, + listTasks, listSessions, agentBlurb, agentDescriptor, @@ -17,8 +19,22 @@ import type { CheckTaskResult, ContinuationState, RunTaskResult, + TaskSummary, + SessionInspectResult, } from "@clawconnect/core"; +/** Non-terminal task statuses — tasks that still need attention. */ +const ACTIVE_STATUSES: ReadonlySet = new Set([ + "queued", + "running", + "blocked", + "needs-human", +]); + +function isActiveTaskStatus(status: TaskSummary["status"]): boolean { + return ACTIVE_STATUSES.has(status); +} + // ── Provider config ───────────────────────────────────────────────────────── type McpToolResponse = { @@ -36,6 +52,8 @@ export type ProviderConfig = { formatRunTask?: (result: RunTaskResult) => McpToolResponse; formatCheckTask?: (result: CheckTaskResult) => McpToolResponse; formatListSessions?: (result: ContinuationState[]) => McpToolResponse; + formatListTasks?: (result: TaskSummary[]) => McpToolResponse; + formatGetSession?: (result: SessionInspectResult) => McpToolResponse; }; // ── Default formatters (optimized for agentic use / Claude Code) ──────────── @@ -129,6 +147,17 @@ function defaultFormatListSessions(result: ContinuationState[]): McpToolResponse }; } +function defaultFormatListTasks(result: TaskSummary[]): McpToolResponse { + return { + content: [{ type: "text", text: JSON.stringify({ tasks: result }) }], + }; +} + +function defaultFormatGetSession(result: SessionInspectResult): McpToolResponse { + if (!result.found) return { content: [{ type: "text", text: "Session not found." }], isError: true }; + return { content: [{ type: "text", text: JSON.stringify(result) }] }; +} + // ── Server factory ────────────────────────────────────────────────────────── export function createMcpServer(config: { registry: AgentRegistry; provider?: ProviderConfig }) { @@ -144,6 +173,8 @@ export function createMcpServer(config: { registry: AgentRegistry; provider?: Pr const fmtRun = provider.formatRunTask ?? defaultFormatRunTask; const fmtCheck = provider.formatCheckTask ?? defaultFormatCheckTask; const fmtList = provider.formatListSessions ?? defaultFormatListSessions; + const fmtListTasks = provider.formatListTasks ?? defaultFormatListTasks; + const fmtGetSession = provider.formatGetSession ?? defaultFormatGetSession; const agentIds = config.registry.agents.map((a) => a.id); const agentBlurbs = config.registry.agents.map(agentBlurb).join("; "); @@ -208,6 +239,84 @@ Pass the jobId returned by run_task. Available agents: ${agentList}.`, }, ); + server.tool( + "list_tasks", + `List manager-friendly task summaries across agents. This is task-level coordination (what needs attention), not low-level session debugging.`, + { + view: z.enum(["active", "all"]).optional().describe('Optional preset. "active" returns non-terminal tasks (queued, running, blocked, needs-human) that still need attention.'), + }, + { readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + async ({ view }) => { + const tasks = listTasks(pool); + const filtered = view === "active" ? tasks.filter((t) => isActiveTaskStatus(t.status)) : tasks; + return fmtListTasks(filtered); + }, + ); + + server.tool( + "get_task", + `Inspect a task by taskId/jobId with a detail preset controlling which fields are returned.`, + { + taskId: z.string().describe("Task identifier (same as jobId in v1)"), + detail: z + .enum(["core", "summary", "updates", "artifacts", "diagnostics", "full", "fullWithDiagnostics"]) + .optional() + .describe( + "Detail preset. Omit for summary. core=ids+status only; summary=+summary; updates=+logs; artifacts=+artifacts; diagnostics=+error info; full=core+summary+updates+artifacts; fullWithDiagnostics=full+diagnostics", + ), + mode: z.enum(["poll", "wait"]).optional().describe('Uses check semantics: "wait" blocks up to timeout; "poll" returns on updates'), + }, + { readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + async ({ taskId, detail, mode }) => { + const result = await checkTask(pool, { jobId: taskId, mode: (mode as CheckMode) ?? defaultMode }); + if (!result.found) return defaultFormatCheckTask(result); + const snapshot = result.snapshot; + const d = detail ?? "summary"; + const has = (field: string) => d === field || d === "full" || d === "fullWithDiagnostics"; + return { + content: [ + { + type: "text", + text: JSON.stringify({ + taskId: snapshot.jobId, + jobId: snapshot.jobId, + sessionKey: snapshot.sessionKey, + agent: snapshot.agent, + status: snapshot.status, + startedAt: snapshot.startedAt, + lastEventAt: snapshot.lastEventAt, + summary: d === "summary" || has("summary") ? snapshot.summary : undefined, + updates: has("updates") ? snapshot.logs : undefined, + artifacts: has("artifacts") ? snapshot.artifacts : undefined, + diagnostics: + d === "diagnostics" || d === "fullWithDiagnostics" + ? { error: snapshot.error, errorInfo: snapshot.errorInfo, continuationState: snapshot.continuationState } + : undefined, + }), + }, + ], + ...(result.isError ? { isError: true } : {}), + }; + }, + ); + + server.tool( + "get_session", + `Inspect one session for debugging ("what exactly happened?"). Use mode="snapshot" for current state, "events" for bounded event retrieval, or "tail" for cursor-based tailing.`, + { + sessionId: z.string().describe("Session key to inspect"), + mode: z.enum(["snapshot", "events", "tail"]).optional(), + limit: z.number().int().positive().max(200).optional().describe("Max events to return for events/tail modes"), + after: z.number().int().nonnegative().optional().describe("Zero-based event cursor; for tail mode use returned nextAfter"), + agent: agentEnum.optional().describe(`${agentDescription} Usually inferred from sessionId.`), + }, + { readOnlyHint: true, idempotentHint: true, openWorldHint: false }, + async ({ sessionId, mode, limit, after, agent }) => { + const result = getSession(pool, { sessionId, mode, limit, after, agent }); + return fmtGetSession(result); + }, + ); + server.tool( "list_sessions", `List active OpenClaw sessions across configured agents. Shows agent, session keys, last job status, and recommended next steps. Available agents: ${agentList}.`,