From 7c6cb05fc5ce108de5801146254187c5adba0637 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=B0=8F=E9=BB=98mo?= Date: Tue, 17 Mar 2026 17:44:42 +0800 Subject: [PATCH] Fix memcommit session resolution and legacy commit fallback --- .../openviking-memory.ts | 225 ++++++++++++++---- 1 file changed, 173 insertions(+), 52 deletions(-) diff --git a/examples/opencode-memory-plugin/openviking-memory.ts b/examples/opencode-memory-plugin/openviking-memory.ts index 6882b172..6f2ef6b8 100644 --- a/examples/opencode-memory-plugin/openviking-memory.ts +++ b/examples/opencode-memory-plugin/openviking-memory.ts @@ -340,6 +340,10 @@ interface TaskResult { error?: string | null } +type CommitStartResult = + | { mode: "background"; taskId: string } + | { mode: "completed"; result: CommitResult } + const DEFAULT_CONFIG: OpenVikingConfig = { endpoint: "http://localhost:1933", apiKey: "", @@ -703,6 +707,100 @@ function clearCommitState(mapping: SessionMapping): void { mapping.commitStartedAt = undefined } +let backgroundCommitSupported: boolean | null = null +const COMMIT_TIMEOUT_MS = 180000 + +async function detectBackgroundCommitSupport(config: OpenVikingConfig): Promise { + if (backgroundCommitSupported !== null) { + return backgroundCommitSupported + } + + const headers: Record = {} + if (config.apiKey) { + headers["X-API-Key"] = config.apiKey + } + + try { + const response = await fetch(`${config.endpoint}/api/v1/tasks?limit=1`, { + method: "GET", + headers, + signal: AbortSignal.timeout(3000), + }) + backgroundCommitSupported = response.ok + } catch { + backgroundCommitSupported = false + } + + log( + "INFO", + "session", + backgroundCommitSupported + ? "Detected background commit API support" + : "Detected legacy synchronous commit API", + { endpoint: config.endpoint }, + ) + return backgroundCommitSupported +} + +async function finalizeCommitSuccess( + mapping: SessionMapping, + opencodeSessionId: string, + config: OpenVikingConfig, +): Promise { + mapping.lastCommitTime = Date.now() + mapping.capturedMessages.clear() + clearCommitState(mapping) + debouncedSaveSessionMap() + + await flushPendingMessages(opencodeSessionId, mapping, config) + + if (mapping.pendingCleanup) { + sessionMap.delete(opencodeSessionId) + sessionMessageBuffer.delete(opencodeSessionId) + await saveSessionMap() + log("INFO", "session", "Cleaned up session mapping after commit completion", { + openviking_session: mapping.ovSessionId, + opencode_session: opencodeSessionId, + }) + } +} + +async function runSynchronousCommit( + mapping: SessionMapping, + opencodeSessionId: string, + config: OpenVikingConfig, + abortSignal?: AbortSignal, +): Promise { + mapping.commitInFlight = true + mapping.commitTaskId = undefined + mapping.commitStartedAt = Date.now() + debouncedSaveSessionMap() + + try { + const response = await makeRequest>(config, { + method: "POST", + endpoint: `/api/v1/sessions/${mapping.ovSessionId}/commit`, + timeoutMs: Math.max(config.timeoutMs, COMMIT_TIMEOUT_MS), + abortSignal, + }) + const result = unwrapResponse(response) + + log("INFO", "session", "OpenViking synchronous commit completed", { + openviking_session: mapping.ovSessionId, + opencode_session: opencodeSessionId, + memories_extracted: result?.memories_extracted ?? 0, + archived: result?.archived ?? false, + }) + + await finalizeCommitSuccess(mapping, opencodeSessionId, config) + return result + } catch (error: any) { + clearCommitState(mapping) + debouncedSaveSessionMap() + throw error + } +} + async function flushPendingMessages( opencodeSessionId: string, mapping: SessionMapping, @@ -769,9 +867,25 @@ async function startBackgroundCommit( mapping: SessionMapping, opencodeSessionId: string, config: OpenVikingConfig, -): Promise { + abortSignal?: AbortSignal, +): Promise { if (mapping.commitInFlight && mapping.commitTaskId) { - return mapping.commitTaskId + return { mode: "background", taskId: mapping.commitTaskId } + } + + const supportsBackgroundCommit = await detectBackgroundCommitSupport(config) + if (!supportsBackgroundCommit) { + try { + const result = await runSynchronousCommit(mapping, opencodeSessionId, config, abortSignal) + return { mode: "completed", result } + } catch (error: any) { + log("ERROR", "session", "Failed to run synchronous commit", { + openviking_session: mapping.ovSessionId, + opencode_session: opencodeSessionId, + error: error.message, + }) + return null + } } try { @@ -779,6 +893,7 @@ async function startBackgroundCommit( method: "POST", endpoint: `/api/v1/sessions/${mapping.ovSessionId}/commit?wait=false`, timeoutMs: 5000, + abortSignal, }) const data = unwrapResponse(response) const taskId = data?.task_id @@ -796,7 +911,7 @@ async function startBackgroundCommit( opencode_session: opencodeSessionId, task_id: taskId, }) - return taskId + return { mode: "background", taskId } } catch (error: any) { if (error.message?.includes("already has a commit in progress")) { const taskId = await findRunningCommitTaskId(mapping.ovSessionId, config) @@ -810,7 +925,24 @@ async function startBackgroundCommit( opencode_session: opencodeSessionId, task_id: taskId, }) - return taskId + return { mode: "background", taskId } + } + } + + if ( + error.message?.includes("Request timeout") || + error.message?.includes("background task id") + ) { + backgroundCommitSupported = false + try { + const result = await runSynchronousCommit(mapping, opencodeSessionId, config, abortSignal) + return { mode: "completed", result } + } catch (fallbackError: any) { + log("ERROR", "session", "Failed to fall back to synchronous commit", { + openviking_session: mapping.ovSessionId, + opencode_session: opencodeSessionId, + error: fallbackError.message, + }) } } @@ -833,17 +965,7 @@ async function pollCommitTaskOnce( } if (!mapping.commitTaskId) { - const recoveredTaskId = await findRunningCommitTaskId(mapping.ovSessionId, config) - if (!recoveredTaskId) { - log("ERROR", "session", "Commit marked in-flight without task id; clearing state", { - openviking_session: mapping.ovSessionId, - opencode_session: opencodeSessionId, - }) - clearCommitState(mapping) - debouncedSaveSessionMap() - return "unknown" - } - mapping.commitTaskId = recoveredTaskId + return "running" } try { @@ -870,22 +992,7 @@ async function pollCommitTaskOnce( archived, }) - mapping.lastCommitTime = Date.now() - mapping.capturedMessages.clear() - clearCommitState(mapping) - debouncedSaveSessionMap() - - await flushPendingMessages(opencodeSessionId, mapping, config) - - if (mapping.pendingCleanup) { - sessionMap.delete(opencodeSessionId) - sessionMessageBuffer.delete(opencodeSessionId) - await saveSessionMap() - log("INFO", "session", "Cleaned up session mapping after commit completion", { - openviking_session: mapping.ovSessionId, - opencode_session: opencodeSessionId, - }) - } + await finalizeCommitSuccess(mapping, opencodeSessionId, config) return task.status } @@ -936,9 +1043,13 @@ async function waitForCommitCompletion( throw new Error("Operation aborted") } - if (!mapping.commitInFlight || !mapping.commitTaskId) { + if (!mapping.commitInFlight) { return null } + if (!mapping.commitTaskId) { + await sleep(500, abortSignal) + continue + } const response = await makeRequest>(config, { method: "GET", @@ -952,12 +1063,7 @@ async function waitForCommitCompletion( const memoriesExtracted = task.result?.memories_extracted ?? 0 const archived = task.result?.archived ?? false - mapping.lastCommitTime = Date.now() - mapping.capturedMessages.clear() - clearCommitState(mapping) - debouncedSaveSessionMap() - - await flushPendingMessages(opencodeSessionId, mapping, config) + await finalizeCommitSuccess(mapping, opencodeSessionId, config) log("INFO", "memcommit", "Background commit completed while waiting", { openviking_session: mapping.ovSessionId, @@ -1551,8 +1657,8 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise }, async execute(args, context) { let sessionId = args.session_id - if (!sessionId && context.session?.id) { - const mapping = sessionMap.get(context.session.id) + if (!sessionId && context.sessionID) { + const mapping = sessionMap.get(context.sessionID) if (mapping) { sessionId = mapping.ovSessionId } @@ -1561,7 +1667,7 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise log("INFO", "memcommit", "Committing session", { requested_session_id: args.session_id, resolved_session_id: sessionId, - opencode_session_id: context.session?.id, + opencode_session_id: context.sessionID, }) if (!sessionId) { @@ -1569,12 +1675,12 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise } try { - const mapping = context.session?.id ? sessionMap.get(context.session.id) : undefined + const mapping = context.sessionID ? sessionMap.get(context.sessionID) : undefined const resolvedMapping = mapping?.ovSessionId === sessionId ? mapping : undefined if (resolvedMapping) { await flushPendingMessages( - context.session?.id ?? sessionId, + context.sessionID ?? sessionId, resolvedMapping, config, ) @@ -1583,7 +1689,7 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise if (resolvedMapping?.commitInFlight) { const task = await waitForCommitCompletion( resolvedMapping, - context.session?.id ?? sessionId, + context.sessionID ?? sessionId, config, context.abort, ) @@ -1612,18 +1718,33 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise sendingMessages: new Set(), } - const taskId = await startBackgroundCommit( + const commitStart = await startBackgroundCommit( tempMapping, - context.session?.id ?? sessionId, + context.sessionID ?? sessionId, config, + context.abort, ) - if (!taskId) { + if (!commitStart) { throw new Error("Failed to start background commit") } + if (commitStart.mode === "completed") { + return JSON.stringify( + { + message: `Memory extraction complete: ${commitStart.result.memories_extracted ?? 0} memories extracted`, + session_id: commitStart.result.session_id ?? sessionId, + status: commitStart.result.status ?? "completed", + memories_extracted: commitStart.result.memories_extracted ?? 0, + archived: commitStart.result.archived ?? false, + }, + null, + 2, + ) + } + const task = await waitForCommitCompletion( tempMapping, - context.session?.id ?? sessionId, + context.sessionID ?? sessionId, config, context.abort, ) @@ -1634,7 +1755,7 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise message: "Commit is still processing in the background", session_id: sessionId, status: "accepted", - task_id: taskId, + task_id: commitStart.taskId, }, null, 2, @@ -1693,12 +1814,12 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise // Auto-inject session_id if not provided let sessionId = args.session_id - if (!sessionId && context.session?.id) { - const mapping = sessionMap.get(context.session.id) + if (!sessionId && context.sessionID) { + const mapping = sessionMap.get(context.sessionID) if (mapping) { sessionId = mapping.ovSessionId log("INFO", "memsearch", "Auto-injected session context", { - opencode_session: context.session.id, + opencode_session: context.sessionID, openviking_session: sessionId, }) }