Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions .fork-features/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"modifiedFiles": [
"packages/opencode/src/session/index.ts",
"packages/opencode/src/session/prompt.ts",
"packages/opencode/src/session/processor.ts",
"packages/opencode/src/tool/registry.ts"
],
"criticalCode": [
Expand All @@ -41,13 +42,24 @@
"parent_session_id",
"reserveTaskSlot",
"getSessionTaskCount",
"MAX_STORED_TASK_RESULTS"
"MAX_STORED_TASK_RESULTS",
"stallDetected",
"lastToolCalls",
"lastActivity",
"isSessionStalled",
"markSessionStalled",
"clearSessionStalled",
"lastTokenTime",
"OPENCODE_STALL_TIMEOUT_MS",
"getStallTimeout",
"LLM stream stalled"
],
"tests": [
"packages/opencode/test/tool/check_task.test.ts",
"packages/opencode/test/tool/list_tasks.test.ts",
"packages/opencode/test/tool/cancel_task.test.ts",
"packages/opencode/test/session/async-tasks.test.ts"
"packages/opencode/test/session/async-tasks.test.ts",
"packages/opencode/test/session/processor-stall.test.ts"
],
"upstreamTracking": {
"relatedPRs": ["anomalyco/opencode#7206"],
Expand All @@ -58,7 +70,14 @@
"task.*concurrency.*slot",
"cancel.*task",
"CancelTaskTool",
"tryCancel"
"tryCancel",
"stallDetected",
"lastToolCalls",
"lastActivity",
"export function isSessionStalled",
"stall.*detector",
"stream.*stall",
"lastTokenTime"
]
}
},
Expand Down
56 changes: 53 additions & 3 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ export namespace SessionProcessor {
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })

const stalledSessions = new Set<string>()

export function isSessionStalled(id: string): boolean {
return stalledSessions.has(id)
}

function markSessionStalled(id: string) {
stalledSessions.add(id)
}

function clearSessionStalled(id: string) {
stalledSessions.delete(id)
}

function getStallTimeout(): number {
const timeout = parseInt(process.env.OPENCODE_STALL_TIMEOUT_MS || "180000", 10)
if (isNaN(timeout) || timeout <= 0) {
throw new Error(`Invalid OPENCODE_STALL_TIMEOUT_MS: must be positive number, got "${process.env.OPENCODE_STALL_TIMEOUT_MS}"`)
}
return timeout
}

export type Info = Awaited<ReturnType<typeof create>>
export type Result = Awaited<ReturnType<Info["process"]>>

Expand Down Expand Up @@ -50,16 +72,24 @@ export namespace SessionProcessor {
try {
let currentText: MessageV2.TextPart | undefined
let reasoningMap: Record<string, MessageV2.ReasoningPart> = {}
let lastTokenTime = Date.now()
const stallTimeout = getStallTimeout()
const stream = await LLM.stream(streamInput)

for await (const value of stream.fullStream) {
input.abort.throwIfAborted()
if (Date.now() - lastTokenTime > stallTimeout) {
log.warn("stall", { sessionID: input.sessionID, elapsed: Date.now() - lastTokenTime })
markSessionStalled(input.sessionID)
throw new Error(`LLM stream stalled: no tokens received for ${Math.round(stallTimeout / 60000)} minutes`)
}
switch (value.type) {
case "start":
SessionStatus.set(input.sessionID, { type: "busy" })
break

case "reasoning-start":
lastTokenTime = Date.now()
if (value.id in reasoningMap) {
continue
}
Expand All @@ -79,6 +109,7 @@ export namespace SessionProcessor {
break

case "reasoning-delta":
lastTokenTime = Date.now()
if (value.id in reasoningMap) {
const part = reasoningMap[value.id]
part.text += value.text
Expand Down Expand Up @@ -132,6 +163,7 @@ export namespace SessionProcessor {
break

case "tool-call": {
lastTokenTime = Date.now()
const match = toolcalls[value.toolCallId]
if (match) {
const part = await Session.updatePart({
Expand Down Expand Up @@ -178,6 +210,7 @@ export namespace SessionProcessor {
break
}
case "tool-result": {
lastTokenTime = Date.now()
const match = toolcalls[value.toolCallId]
if (match && match.state.status === "running") {
await Session.updatePart({
Expand Down Expand Up @@ -218,6 +251,7 @@ export namespace SessionProcessor {
}

case "tool-error": {
lastTokenTime = Date.now()
const match = toolcalls[value.toolCallId]
const errorMsg = value.error instanceof Error ? value.error.message : String(value.error)
if (match && match.state.status === "running") {
Expand Down Expand Up @@ -336,6 +370,7 @@ export namespace SessionProcessor {
break

case "text-delta":
lastTokenTime = Date.now()
if (currentText) {
currentText.text += value.text
if (value.providerMetadata) currentText.metadata = value.providerMetadata
Expand Down Expand Up @@ -411,6 +446,7 @@ export namespace SessionProcessor {
error: input.assistantMessage.error,
})
SessionStatus.set(input.sessionID, { type: "idle" })
clearSessionStalled(input.sessionID)
}
if (snapshot) {
const patch = await Snapshot.patch(snapshot)
Expand Down Expand Up @@ -445,13 +481,27 @@ export namespace SessionProcessor {
}
input.assistantMessage.time.completed = Date.now()
await Session.updateMessage(input.assistantMessage)
if (needsCompaction) return "compact"
if (blocked) return "stop"
if (input.assistantMessage.error) return "stop"
if (needsCompaction) {
clearSessionStalled(input.sessionID)
return "compact"
}
if (blocked) {
clearSessionStalled(input.sessionID)
return "stop"
}
if (input.assistantMessage.error) {
clearSessionStalled(input.sessionID)
return "stop"
}
clearSessionStalled(input.sessionID)
return "continue"
}
},
}
return result
}
}

export function isSessionStalled(id: string): boolean {
return SessionProcessor.isSessionStalled(id)
}
30 changes: 30 additions & 0 deletions packages/opencode/src/tool/check_task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { listBackgroundTasks, getBackgroundTaskResult, getBackgroundTaskMetadata
import { Instance } from "../project/instance"
import { SessionStatus } from "../session/status"
import { MessageV2 } from "../session/message-v2"
import { isSessionStalled } from "../session/processor"

type TaskStatus = "running" | "completed" | "failed" | "not_found" | "cancelled"

Expand All @@ -19,6 +20,13 @@ interface TaskResult {
duration_seconds?: number
started_at?: string
completed_at?: string
stallDetected?: boolean
lastToolCalls?: {
name: string
status: string
time: string
}[]
lastActivity?: string
}

interface CheckTaskMetadata {
Expand All @@ -27,6 +35,10 @@ interface CheckTaskMetadata {
sessionId?: string
}

function hasStartTime(part: MessageV2.ToolPart): part is MessageV2.ToolPart & { state: { time: { start: number } } } {
return part.state.status !== "pending" && "time" in part.state && typeof (part.state as { time: { start: unknown } }).time.start === "number"
}

function checkBackgroundTask(id: string): TaskResult | undefined {
const tasks = listBackgroundTasks()
if (tasks.pending.includes(id)) {
Expand Down Expand Up @@ -84,10 +96,28 @@ async function checkSessionTask(id: string, callerSessionId?: string): Promise<T
const status = SessionStatus.get(id)

if (status.type === "busy") {
const messages = await Session.messages({ sessionID: id, limit: 5 })
const toolParts = messages.flatMap((msg) =>
msg.info.role === "assistant" ? msg.parts.filter((part): part is MessageV2.ToolPart => part.type === "tool") : []
)
const recentTools = toolParts
.filter(hasStartTime)
.slice(-3)
.map((part) => ({
name: part.tool,
status: part.state.status,
time: new Date(part.state.time.start).toISOString(),
}))
const lastActivity = recentTools.length > 0 ? recentTools[recentTools.length - 1].time : new Date().toISOString()
const stallDetected = isSessionStalled(id)

return {
task_id: id,
status: "running",
started_at: started,
stallDetected,
lastToolCalls: recentTools,
lastActivity,
}
}

Expand Down
Loading
Loading