Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 173 additions & 52 deletions examples/opencode-memory-plugin/openviking-memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ interface TaskResult {
error?: string | null
}

type CommitStartResult =
| { mode: "background"; taskId: string }
| { mode: "completed"; result: CommitResult }

const DEFAULT_CONFIG: OpenVikingConfig = {
endpoint: "http://localhost:1933",
apiKey: "",
Expand Down Expand Up @@ -703,6 +707,100 @@ function clearCommitState(mapping: SessionMapping): void {
mapping.commitStartedAt = undefined
}

let backgroundCommitSupported: boolean | null = null
const COMMIT_TIMEOUT_MS = 180000

async function detectBackgroundCommitSupport(config: OpenVikingConfig): Promise<boolean> {
if (backgroundCommitSupported !== null) {
return backgroundCommitSupported
}

const headers: Record<string, string> = {}
if (config.apiKey) {
headers["X-API-Key"] = config.apiKey
}

try {
const response = await fetch(`${config.endpoint}/api/v1/tasks?limit=1`, {
method: "GET",
headers,
signal: AbortSignal.timeout(3000),
})
backgroundCommitSupported = response.ok
} catch {
backgroundCommitSupported = false
}

log(
"INFO",
"session",
backgroundCommitSupported
? "Detected background commit API support"
: "Detected legacy synchronous commit API",
{ endpoint: config.endpoint },
)
return backgroundCommitSupported
}

async function finalizeCommitSuccess(
mapping: SessionMapping,
opencodeSessionId: string,
config: OpenVikingConfig,
): Promise<void> {
mapping.lastCommitTime = Date.now()
mapping.capturedMessages.clear()
clearCommitState(mapping)
debouncedSaveSessionMap()

await flushPendingMessages(opencodeSessionId, mapping, config)

if (mapping.pendingCleanup) {
sessionMap.delete(opencodeSessionId)
sessionMessageBuffer.delete(opencodeSessionId)
await saveSessionMap()
log("INFO", "session", "Cleaned up session mapping after commit completion", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
})
}
}

async function runSynchronousCommit(
mapping: SessionMapping,
opencodeSessionId: string,
config: OpenVikingConfig,
abortSignal?: AbortSignal,
): Promise<CommitResult> {
mapping.commitInFlight = true
mapping.commitTaskId = undefined
mapping.commitStartedAt = Date.now()
debouncedSaveSessionMap()

try {
const response = await makeRequest<OpenVikingResponse<CommitResult>>(config, {
method: "POST",
endpoint: `/api/v1/sessions/${mapping.ovSessionId}/commit`,
timeoutMs: Math.max(config.timeoutMs, COMMIT_TIMEOUT_MS),
abortSignal,
})
const result = unwrapResponse(response)

log("INFO", "session", "OpenViking synchronous commit completed", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
memories_extracted: result?.memories_extracted ?? 0,
archived: result?.archived ?? false,
})

await finalizeCommitSuccess(mapping, opencodeSessionId, config)
return result
} catch (error: any) {
clearCommitState(mapping)
debouncedSaveSessionMap()
throw error
}
}

async function flushPendingMessages(
opencodeSessionId: string,
mapping: SessionMapping,
Expand Down Expand Up @@ -769,16 +867,33 @@ async function startBackgroundCommit(
mapping: SessionMapping,
opencodeSessionId: string,
config: OpenVikingConfig,
): Promise<string | null> {
abortSignal?: AbortSignal,
): Promise<CommitStartResult | null> {
if (mapping.commitInFlight && mapping.commitTaskId) {
return mapping.commitTaskId
return { mode: "background", taskId: mapping.commitTaskId }
}

const supportsBackgroundCommit = await detectBackgroundCommitSupport(config)
if (!supportsBackgroundCommit) {
try {
const result = await runSynchronousCommit(mapping, opencodeSessionId, config, abortSignal)
return { mode: "completed", result }
} catch (error: any) {
log("ERROR", "session", "Failed to run synchronous commit", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
error: error.message,
})
return null
}
}

try {
const response = await makeRequest<OpenVikingResponse<CommitResult>>(config, {
method: "POST",
endpoint: `/api/v1/sessions/${mapping.ovSessionId}/commit?wait=false`,
timeoutMs: 5000,
abortSignal,
})
const data = unwrapResponse(response)
const taskId = data?.task_id
Expand All @@ -796,7 +911,7 @@ async function startBackgroundCommit(
opencode_session: opencodeSessionId,
task_id: taskId,
})
return taskId
return { mode: "background", taskId }
} catch (error: any) {
if (error.message?.includes("already has a commit in progress")) {
const taskId = await findRunningCommitTaskId(mapping.ovSessionId, config)
Expand All @@ -810,7 +925,24 @@ async function startBackgroundCommit(
opencode_session: opencodeSessionId,
task_id: taskId,
})
return taskId
return { mode: "background", taskId }
}
}

if (
error.message?.includes("Request timeout") ||
error.message?.includes("background task id")
) {
backgroundCommitSupported = false
try {
const result = await runSynchronousCommit(mapping, opencodeSessionId, config, abortSignal)
return { mode: "completed", result }
} catch (fallbackError: any) {
log("ERROR", "session", "Failed to fall back to synchronous commit", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
error: fallbackError.message,
})
}
}

Expand All @@ -833,17 +965,7 @@ async function pollCommitTaskOnce(
}

if (!mapping.commitTaskId) {
const recoveredTaskId = await findRunningCommitTaskId(mapping.ovSessionId, config)
if (!recoveredTaskId) {
log("ERROR", "session", "Commit marked in-flight without task id; clearing state", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
})
clearCommitState(mapping)
debouncedSaveSessionMap()
return "unknown"
}
mapping.commitTaskId = recoveredTaskId
return "running"
}

try {
Expand All @@ -870,22 +992,7 @@ async function pollCommitTaskOnce(
archived,
})

mapping.lastCommitTime = Date.now()
mapping.capturedMessages.clear()
clearCommitState(mapping)
debouncedSaveSessionMap()

await flushPendingMessages(opencodeSessionId, mapping, config)

if (mapping.pendingCleanup) {
sessionMap.delete(opencodeSessionId)
sessionMessageBuffer.delete(opencodeSessionId)
await saveSessionMap()
log("INFO", "session", "Cleaned up session mapping after commit completion", {
openviking_session: mapping.ovSessionId,
opencode_session: opencodeSessionId,
})
}
await finalizeCommitSuccess(mapping, opencodeSessionId, config)

return task.status
}
Expand Down Expand Up @@ -936,9 +1043,13 @@ async function waitForCommitCompletion(
throw new Error("Operation aborted")
}

if (!mapping.commitInFlight || !mapping.commitTaskId) {
if (!mapping.commitInFlight) {
return null
}
if (!mapping.commitTaskId) {
await sleep(500, abortSignal)
continue
}

const response = await makeRequest<OpenVikingResponse<TaskResult>>(config, {
method: "GET",
Expand All @@ -952,12 +1063,7 @@ async function waitForCommitCompletion(
const memoriesExtracted = task.result?.memories_extracted ?? 0
const archived = task.result?.archived ?? false

mapping.lastCommitTime = Date.now()
mapping.capturedMessages.clear()
clearCommitState(mapping)
debouncedSaveSessionMap()

await flushPendingMessages(opencodeSessionId, mapping, config)
await finalizeCommitSuccess(mapping, opencodeSessionId, config)

log("INFO", "memcommit", "Background commit completed while waiting", {
openviking_session: mapping.ovSessionId,
Expand Down Expand Up @@ -1551,8 +1657,8 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>
},
async execute(args, context) {
let sessionId = args.session_id
if (!sessionId && context.session?.id) {
const mapping = sessionMap.get(context.session.id)
if (!sessionId && context.sessionID) {
const mapping = sessionMap.get(context.sessionID)
if (mapping) {
sessionId = mapping.ovSessionId
}
Expand All @@ -1561,20 +1667,20 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>
log("INFO", "memcommit", "Committing session", {
requested_session_id: args.session_id,
resolved_session_id: sessionId,
opencode_session_id: context.session?.id,
opencode_session_id: context.sessionID,
})

if (!sessionId) {
return "Error: No OpenViking session is associated with the current OpenCode session. Start or resume a normal OpenCode session first, or pass an explicit session_id."
}

try {
const mapping = context.session?.id ? sessionMap.get(context.session.id) : undefined
const mapping = context.sessionID ? sessionMap.get(context.sessionID) : undefined
const resolvedMapping = mapping?.ovSessionId === sessionId ? mapping : undefined

if (resolvedMapping) {
await flushPendingMessages(
context.session?.id ?? sessionId,
context.sessionID ?? sessionId,
resolvedMapping,
config,
)
Expand All @@ -1583,7 +1689,7 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>
if (resolvedMapping?.commitInFlight) {
const task = await waitForCommitCompletion(
resolvedMapping,
context.session?.id ?? sessionId,
context.sessionID ?? sessionId,
config,
context.abort,
)
Expand Down Expand Up @@ -1612,18 +1718,33 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>
sendingMessages: new Set(),
}

const taskId = await startBackgroundCommit(
const commitStart = await startBackgroundCommit(
tempMapping,
context.session?.id ?? sessionId,
context.sessionID ?? sessionId,
config,
context.abort,
)
if (!taskId) {
if (!commitStart) {
throw new Error("Failed to start background commit")
}

if (commitStart.mode === "completed") {
return JSON.stringify(
{
message: `Memory extraction complete: ${commitStart.result.memories_extracted ?? 0} memories extracted`,
session_id: commitStart.result.session_id ?? sessionId,
status: commitStart.result.status ?? "completed",
memories_extracted: commitStart.result.memories_extracted ?? 0,
archived: commitStart.result.archived ?? false,
},
null,
2,
)
}

const task = await waitForCommitCompletion(
tempMapping,
context.session?.id ?? sessionId,
context.sessionID ?? sessionId,
config,
context.abort,
)
Expand All @@ -1634,7 +1755,7 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>
message: "Commit is still processing in the background",
session_id: sessionId,
status: "accepted",
task_id: taskId,
task_id: commitStart.taskId,
},
null,
2,
Expand Down Expand Up @@ -1693,12 +1814,12 @@ export const OpenVikingMemoryPlugin = async (input: PluginInput): Promise<Hooks>

// Auto-inject session_id if not provided
let sessionId = args.session_id
if (!sessionId && context.session?.id) {
const mapping = sessionMap.get(context.session.id)
if (!sessionId && context.sessionID) {
const mapping = sessionMap.get(context.sessionID)
if (mapping) {
sessionId = mapping.ovSessionId
log("INFO", "memsearch", "Auto-injected session context", {
opencode_session: context.session.id,
opencode_session: context.sessionID,
openviking_session: sessionId,
})
}
Expand Down
Loading