fix: resolve 8 performance and architecture issues#77
Conversation
#43 #48 #66) N+1 query fixes (#18, #42): - context.ts: parallelize summary + observation fetches with Promise.all - profile.ts: parallelize per-session observation loading - consolidate.ts: parallelize observation fetches across sessions - auto-forget.ts: parallelize observation loading - search.ts rebuildIndex: parallelize all session observation fetches smart-search O(sessions*expandIds) fix (#23): - findObservation accepts sessionIdHint for O(1) direct lookup - expandIds accepts {obsId, sessionId} objects for hint passthrough - fallback session scan parallelized with Promise.all Viewer proxy refactor (#20, #66): - Replace ~340 lines of duplicated API handlers with proxyToRestApi - Remove buildGraphFromData (~200 lines), buildProfileFromRawObs (~90 lines) - Preserve static HTML serving, WebSocket streaming, auth - Eliminates graph builder duplication (#66) buildGraphFromData O(n^3) fix (#24): - Pre-build edgesBySource/edgesByTarget lookup maps for O(1) checks - Replace triple-nested edges.some() with hasEdgeBetween() map lookups Export pagination (#43): - Add maxSessions (default 100) and offset parameters - Parallelize per-session observation + profile loading - Parallelize all independent KV list calls - Return pagination metadata when hasMore Health KV probe (#48): - Add KV connectivity check in collectHealth() - Write/read _probe key with latency measurement - Add kvConnectivity field to HealthSnapshot type
📝 WalkthroughWalkthroughThe PR parallelizes per-session observation retrieval across multiple functions, adds pagination to exports, enhances smart-search with optional session hints, probes KV health, and refactors the viewer server from in-process handlers to a REST API proxy. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Viewer as ViewerServer
participant Upstream as REST API
rect rgba(200,230,255,0.5)
Client->>Viewer: HTTP request (/api/...)
end
rect rgba(220,255,220,0.5)
Viewer->>Upstream: proxyToRestApi -> forward method/headers/body
Upstream-->>Viewer: upstream response (status, body)
end
rect rgba(255,240,200,0.5)
Viewer-->>Client: proxied response (status, headers, body)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
🧹 Nitpick comments (1)
src/viewer/server.ts (1)
78-78: Prefer explicitrestPortwiring overport - 2inference.This implicit coupling is fragile if port layouts change. Passing
restPortdirectly intostartViewerServeris safer.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/viewer/server.ts` at line 78, The code currently infers restPort with "const restPort = port - 2"; instead, modify startViewerServer to accept an explicit restPort parameter and remove the "port - 2" calculation: update the startViewerServer function signature to include restPort, pass the computed REST port from the caller instead of deriving it inside server.ts, and update all call sites that construct or start the viewer to supply restPort (ensure any callers that previously passed only port are updated). This makes restPort wiring explicit and removes the fragile implicit coupling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/functions/auto-forget.ts`:
- Around line 89-95: The current Promise.all over sessions causes unbounded
concurrent kv.list calls (see obsPerSession and sessions.map using kv.list),
which can overload the KV service; change this to a bounded-concurrency approach
— iterate sessions in batches or use a concurrency limiter (e.g., p-limit or a
simple chunked loop) to call kv.list for at most N sessions concurrently (pick a
sensible default like 5-10), collect the results into the same obsPerSession
shape, and preserve the existing .catch(() => [] as CompressedObservation[])
behavior for each call.
In `@src/functions/consolidate.ts`:
- Around line 81-87: The current call to Promise.all over filtered.map
triggering kv.list (obsPerSession) causes an unbounded parallel KV fetch; change
it to a bounded-concurrency approach by processing sessions in batches or using
a concurrency limiter: iterate over filtered (the array used in filtered.map),
chunk it into groups (e.g., batchSize = 10), and for each chunk call Promise.all
on kv.list(KV.observations(session.id)).catch(...) to collect results, appending
each batch's responses into obsPerSession; alternatively use a p-limit/p-map
style limiter for kv.list calls. Ensure you update the code that references
obsPerSession/CompressedObservation accordingly so behavior is identical but
with controlled parallelism.
In `@src/functions/export-import.ts`:
- Around line 97-99: The current code locally intersects pagination into the
export object, which hides pagination from typed consumers; instead add a shared
ExportPagination interface and make pagination an optional field on the existing
ExportData interface (e.g., export interface ExportPagination { offset:number;
limit:number; total:number; hasMore:boolean } and export interface ExportData {
/* existing fields */ pagination?: ExportPagination }), update the central types
file where ExportData is declared, then remove the local intersection in the
export/import code (stop using ExportData & { pagination?: ... }) and rely on
the updated ExportData import so all consumers see pagination in the contract.
- Around line 33-34: Validate and sanitize the pagination inputs maxSessions and
offset before use: ensure maxSessions is a positive integer (coerce to Number,
default to 100 if NaN or <=0, and optionally clamp to a reasonable upper bound)
and ensure offset is a non-negative integer (coerce to Number, default to 0 if
NaN or <0). Replace the direct nullish coalescing assignment of maxSessions and
offset with this validation logic where those variables are declared so
downstream logic uses safe, predictable pagination values.
- Around line 31-37: The export function currently defaults to paginating
sessions (maxSessions=100, offset=0) so calling mem::export with an empty object
returns only a subset; change it to export the full set by default and only
apply slicing when explicit pagination parameters are provided: retrieve all
sessions via kv.list<Session>(KV.sessions) into allSessions, and if
data?.maxSessions or data?.offset are defined use slice to produce
paginatedSessions, otherwise set paginatedSessions = allSessions; apply the same
change to the other export block around the allSessions/paginatedSessions logic
(the second occurrence at the 124-131 region).
In `@src/functions/search.ts`:
- Around line 23-27: The current rebuild swallows per-session failures by
mapping sessions to kv.list(...) .catch(() => []), and runs them with unbounded
Promise.all, allowing silent partial indexes; change the logic in the rebuild
path that produces obsPerSession (the sessions.map + Promise.all over kv.list
calls targeting KV.observations) to run kv.list in bounded batches (limit
concurrency) and stop swallowing errors: collect failed session ids instead of
returning empty arrays, emit telemetry/log entries for those failures (or
propagate a single error after batching) so rebuild either retries or fails
visibly, and ensure the variable names obsPerSession, sessions.map, kv.list, and
KV.observations are updated accordingly to reflect the new batching and
error-reporting behavior.
In `@src/functions/smart-search.ts`:
- Around line 101-109: The current fallback does a parallel kv.get for every
session (sessions from KV.sessions) causing high fan-out; change the logic in
smart-search.ts around the sessions/results/obsId lookup to perform batched or
sequential lookups against KV.observations(session.id) (e.g., process sessions
in small chunks or a sequential for-loop) and stop as soon as a non-null
CompressedObservation is found to return early instead of awaiting Promise.all
across all sessions. Ensure you still type the result as CompressedObservation |
null and preserve existing error handling for individual kv.get calls while
avoiding firing all requests in parallel.
- Around line 31-35: The mapping over raw into items assumes non-string entries
have obsId and sessionId; guard against malformed values (null, {}, etc.) by
validating each entry before accessing entry.obsId: in the map (the items
assignment) treat a branch for strings as before, add a branch that checks entry
is an object and has a string obsId (and optional sessionId), and for any
invalid entries either filter them out or produce a safe fallback (e.g., obsId:
undefined) so you never do entry.obsId on null/{}; update the logic around raw,
entry, obsId, and sessionId accordingly to perform this runtime shape
validation.
In `@src/health/monitor.ts`:
- Line 55: The health snapshot currently assigns raw error text into
kvConnectivity (kvConnectivity = { status: "error", error: err instanceof Error
? err.message : String(err) }), which can leak internal details; update the code
that builds the health snapshot (the kvConnectivity assignment in
src/health/monitor.ts) to return a sanitized error code/message (e.g., status:
"error" and error: "kv_connection_failed" or a short user-safe message) and move
the full err.message/stack into an internal/debug log statement (use the
existing logger or add one) so only the generic message appears in the health
payload while full details remain in logs for debugging.
- Around line 48-56: Wrap the KV probe (the await kv.set(KV.health, "_probe",
...) and await kv.get(KV.health, "_probe")) in a bounded timeout so they cannot
block collectHealth; implement a helper timeout promise (or use AbortController
if kv supports it) and race each KV call (or the combined probe sequence)
against that timeout, and on timeout set kvConnectivity = { status: "error",
error: "timeout", latencyMs: Math.round((performance.now() - kvStart) * 100) /
100 } (or similar) so timeouts are treated as probe failures — update the
existing kvStart/latency calculation and the catch branch to account for timeout
errors and ensure overlapping collectHealth runs cannot be stalled by unbounded
KV I/O.
In `@src/viewer/server.ts`:
- Around line 174-178: The upstream fetch call using upstreamUrl (const upstream
= await fetch(...)) has no timeout and can hang; wrap the fetch with an
AbortController: create an AbortController, pass controller.signal to fetch
(alongside method, headers, body), start a timer that calls controller.abort()
after a configured timeout, and clear the timer after fetch completes; handle
the abort (AbortError) where upstream is awaited so the request returns a timely
504/timeout response instead of hanging. Ensure symbols referenced: upstream,
upstreamUrl, method, headers, body, and use a configurable timeout value.
---
Nitpick comments:
In `@src/viewer/server.ts`:
- Line 78: The code currently infers restPort with "const restPort = port - 2";
instead, modify startViewerServer to accept an explicit restPort parameter and
remove the "port - 2" calculation: update the startViewerServer function
signature to include restPort, pass the computed REST port from the caller
instead of deriving it inside server.ts, and update all call sites that
construct or start the viewer to supply restPort (ensure any callers that
previously passed only port are updated). This makes restPort wiring explicit
and removes the fragile implicit coupling.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fcef3d85-3613-45aa-82d5-14a8a5ec7314
📒 Files selected for processing (11)
src/functions/auto-forget.tssrc/functions/consolidate.tssrc/functions/context.tssrc/functions/export-import.tssrc/functions/profile.tssrc/functions/search.tssrc/functions/smart-search.tssrc/health/monitor.tssrc/index.tssrc/types.tssrc/viewer/server.ts
💤 Files with no reviewable changes (1)
- src/index.ts
| const maxSessions = data?.maxSessions ?? 100; | ||
| const offset = data?.offset ?? 0; |
There was a problem hiding this comment.
Validate pagination inputs (maxSessions, offset).
Negative or zero values currently pass through and can create empty/incorrect pages or unusable pagination metadata.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/functions/export-import.ts` around lines 33 - 34, Validate and sanitize
the pagination inputs maxSessions and offset before use: ensure maxSessions is a
positive integer (coerce to Number, default to 100 if NaN or <=0, and optionally
clamp to a reasonable upper bound) and ensure offset is a non-negative integer
(coerce to Number, default to 0 if NaN or <0). Replace the direct nullish
coalescing assignment of maxSessions and offset with this validation logic where
those variables are declared so downstream logic uses safe, predictable
pagination values.
| const exportData: ExportData & { | ||
| pagination?: { offset: number; limit: number; total: number; hasMore: boolean }; | ||
| } = { |
There was a problem hiding this comment.
pagination should be part of the shared ExportData contract.
Adding it only via local intersection makes typed consumers blind to pagination and encourages accidental data loss handling bugs.
Proposed type update (outside this file)
// src/types.ts
export interface ExportPagination {
offset: number;
limit: number;
total: number;
hasMore: boolean;
}
export interface ExportData {
// existing fields...
pagination?: ExportPagination;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/functions/export-import.ts` around lines 97 - 99, The current code
locally intersects pagination into the export object, which hides pagination
from typed consumers; instead add a shared ExportPagination interface and make
pagination an optional field on the existing ExportData interface (e.g., export
interface ExportPagination { offset:number; limit:number; total:number;
hasMore:boolean } and export interface ExportData { /* existing fields */
pagination?: ExportPagination }), update the central types file where ExportData
is declared, then remove the local intersection in the export/import code (stop
using ExportData & { pagination?: ... }) and rely on the updated ExportData
import so all consumers see pagination in the contract.
| await kv.get(KV.health, "_probe"); | ||
| kvConnectivity = { status: "ok", latencyMs: Math.round((performance.now() - kvStart) * 100) / 100 }; | ||
| } catch (err) { | ||
| kvConnectivity = { status: "error", error: err instanceof Error ? err.message : String(err) }; |
There was a problem hiding this comment.
Do not expose raw KV exception text in health payload.
Line 55 currently returns err.message directly to the snapshot. Health endpoints are often externally reachable; this can leak backend/internal details. Prefer a sanitized error code/message and keep full details only in internal logs.
🔧 Proposed fix
- kvConnectivity = { status: "error", error: err instanceof Error ? err.message : String(err) };
+ kvConnectivity = { status: "error", error: "kv_probe_failed" };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| kvConnectivity = { status: "error", error: err instanceof Error ? err.message : String(err) }; | |
| kvConnectivity = { status: "error", error: "kv_probe_failed" }; |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/health/monitor.ts` at line 55, The health snapshot currently assigns raw
error text into kvConnectivity (kvConnectivity = { status: "error", error: err
instanceof Error ? err.message : String(err) }), which can leak internal
details; update the code that builds the health snapshot (the kvConnectivity
assignment in src/health/monitor.ts) to return a sanitized error code/message
(e.g., status: "error" and error: "kv_connection_failed" or a short user-safe
message) and move the full err.message/stack into an internal/debug log
statement (use the existing logger or add one) so only the generic message
appears in the health payload while full details remain in logs for debugging.
…n, timeouts Bounded concurrency: - auto-forget.ts: batch kv.list calls in groups of 10 - consolidate.ts: batch kv.list calls in groups of 10 - search.ts: batch rebuild in groups of 10, log failed sessions - smart-search.ts: sequential batch of 5 with early return Input validation: - export-import.ts: validate/clamp maxSessions (1-1000) and offset (>=0) - export-import.ts: full export by default, paginate only when explicit - export-import.ts: ExportPagination type moved to types.ts on ExportData - smart-search.ts: guard against malformed expandIds entries Security/resilience: - monitor.ts: sanitize KV probe error to generic "kv_probe_failed" - monitor.ts: 5s timeout on KV probe via Promise.race - viewer/server.ts: 10s AbortController timeout on upstream fetch - viewer/server.ts: accept explicit restPort parameter
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/functions/search.ts (1)
23-43:⚠️ Potential issue | 🟠 MajorPartial rebuilds still replace the active index when session loads fail.
At Line 41 you now warn on failures, but rebuild continues and the cleared index is repopulated with partial data. Prefer atomic rebuild (build into a new index and swap only on success, or fail the rebuild when
failedSessionsis non-empty).
This is the same reliability concern previously raised, now partially mitigated by logging only.Proposed fix (atomic swap)
export async function rebuildIndex(kv: StateKV): Promise<number> { - const idx = getSearchIndex() - idx.clear() + const nextIndex = new SearchIndex() const sessions = await kv.list<Session>(KV.sessions) if (!sessions.length) return 0 let count = 0 const obsPerSession: CompressedObservation[][] = [] const failedSessions: string[] = [] @@ if (failedSessions.length > 0) { const ctx = getContext() ctx.logger.warn('rebuildIndex: failed to load observations for sessions', { failedSessions }) + throw new Error('rebuildIndex failed: partial session fetch') } for (const observations of obsPerSession) { for (const obs of observations) { if (obs.title && obs.narrative) { - idx.add(obs) + nextIndex.add(obs) count++ } } } + index = nextIndex return count }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/functions/search.ts` around lines 23 - 43, The rebuild is writing partial data when some session loads fail; modify the logic to perform an atomic swap by building into a temporary/new index and only replacing the active index when all session loads succeed: collect observations into obsPerSession as you do, but if failedSessions is non-empty after the batch loop, abort the rebuild (log and return/throw) instead of proceeding to repopulate the live index; alternatively create a new index instance, populate it from obsPerSession, and swap it into place only when failedSessions.length === 0 — update references to obsPerSession, failedSessions, getContext()/ctx.logger.warn, KV.observations and kv.list to implement this control flow.src/functions/smart-search.ts (1)
31-37:⚠️ Potential issue | 🟡 MinorValidate
sessionIdtype duringexpandIdsnormalization.The object branch still trusts
sessionIdvia cast, so malformed values can slip through and trigger invalid hinted lookups before fallback.Proposed hardening
- const items = raw.map((entry) => { - if (typeof entry === "string") return { obsId: entry, sessionId: undefined as string | undefined }; - if (entry && typeof entry === "object" && typeof (entry as any).obsId === "string") { - return { obsId: (entry as any).obsId, sessionId: (entry as any).sessionId as string | undefined }; - } - return null; - }).filter((item): item is NonNullable<typeof item> => item !== null); + const items = raw + .map((entry) => { + if (typeof entry === "string" && entry.trim()) { + return { obsId: entry.trim(), sessionId: undefined as string | undefined }; + } + if ( + entry && + typeof entry === "object" && + typeof (entry as { obsId?: unknown }).obsId === "string" && + ((entry as { sessionId?: unknown }).sessionId === undefined || + typeof (entry as { sessionId?: unknown }).sessionId === "string") + ) { + const { obsId, sessionId } = entry as { obsId: string; sessionId?: string }; + if (!obsId.trim()) return null; + return { obsId: obsId.trim(), sessionId }; + } + return null; + }) + .filter((item): item is NonNullable<typeof item> => item !== null);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/functions/smart-search.ts` around lines 31 - 37, The normalization in expandIds currently casts sessionId without validating it, allowing malformed non-string sessionId values through; update the object branch in the raw.map that creates items so it only assigns sessionId when typeof (entry as any).sessionId === "string" (otherwise set sessionId to undefined), keeping obsId validation as-is and preserving the existing filter type guard for NonNullable items; this ensures sessionId is a real string before use in hinted lookups.
🧹 Nitpick comments (5)
src/viewer/server.ts (3)
72-79: Function signature now accepts unused parameters.The
_kvand_sdkparameters are typed asunknownand unused. This preserves backward compatibility at call sites but creates dead parameters. Consider removing them from the signature if no longer needed, or document why they're retained.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/viewer/server.ts` around lines 72 - 79, The function startViewerServer currently declares unused parameters _kv and _sdk (typed unknown) which create dead parameters; either remove _kv and _sdk from the startViewerServer signature and update all callers to pass only (port, secret?, restPort?) or, if they must remain for backward compatibility, add a clear comment/JSDoc above startViewerServer explaining they're intentionally retained for ABI stability and mark them as deprecated (and optionally reference a TODO to remove in a future major version), and remove the leading underscores if you opt to keep them documented rather than unused; ensure resolvedRestPort logic remains unchanged and update any types or exported references accordingly.
170-173: DELETE and PATCH methods read body, but DELETE typically has no body.While HTTP allows DELETE with a body, it's uncommon and some servers ignore it. Including DELETE in the body-reading methods is acceptable for flexibility, but verify that the upstream REST API actually expects bodies on DELETE requests.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/viewer/server.ts` around lines 170 - 173, The code currently reads a request body for methods "POST", "PUT", "DELETE", and "PATCH" which can cause unexpected behavior for DELETE; update the condition that assigns body to only read when a body is actually present (e.g., check headers Content-Length > 0 or Transfer-Encoding is set) instead of unconditionally including "DELETE", by modifying the if that uses method and calling readBody(req) only when the header check passes; reference the variables/methods method, body, and readBody to locate and change this logic.
155-158: Path construction may produce double slashes.When
pathnameis/(root path), the upstream path becomes/agentmemory/, which is fine. However, ifpathnameis empty string"", the ternary produces/agentmemory/with a trailing slash that might not be intended. The current logic handles most cases correctly, but consider edge cases.Simplified path construction
- const upstreamPath = pathname.startsWith("/agentmemory/") - ? pathname - : `/agentmemory${pathname.startsWith("/") ? pathname : "/" + pathname}`; + const normalizedPath = pathname.startsWith("/") ? pathname : "/" + pathname; + const upstreamPath = normalizedPath.startsWith("/agentmemory/") || normalizedPath === "/agentmemory" + ? normalizedPath + : `/agentmemory${normalizedPath}`;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/viewer/server.ts` around lines 155 - 158, The upstreamPath construction can produce double or unintended trailing slashes when pathname is empty or already has a leading slash; update the logic that sets upstreamPath (the current ternary using pathname and "/agentmemory/") to normalize pathname first: if pathname already starts with "/agentmemory/" keep it, otherwise strip any leading slash from pathname and only append "/"+trimmedPath when trimmedPath is non-empty so the result becomes "/agentmemory" for empty pathname and "/agentmemory/<path>" for non-empty paths, avoiding double or trailing slashes.src/functions/export-import.ts (2)
311-339: Import loops are sequential - similar optimization opportunity.The import loops (sessions, observations, memories, etc.) process items one at a time. While the
skipstrategy requires checking existence first, the operations could still benefit from batched parallel execution with bounded concurrency for better throughput on large imports.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/functions/export-import.ts` around lines 311 - 339, The import loops for importData.sessions and importData.observations are sequential and should be converted to batched, bounded-concurrency parallel tasks: collect per-item async tasks that perform the existing "skip" existence check against kv.get (for KV.sessions and KV.observations(sessionId)) and then kv.set, but run them through a concurrency limiter (e.g., p-limit or a simple worker queue) instead of awaiting each item inline; update stats using local counters inside each task and aggregate them after Promise.allSettled to avoid race conditions on shared stats, and ensure errors from kv.get/kv.set are caught and logged per task to preserve behavior. Use the existing identifiers importData.sessions, importData.observations, strategy, KV.sessions, KV.observations(sessionId), and stats to locate and replace the sequential loops.
251-309: Replace strategy has sequential deletion - potential performance concern.The
replacestrategy deletes existing data one item at a time in serial loops (e.g., lines 253-269, 270-308). For large datasets, this could be slow. Consider batching deletions or usingPromise.allwith bounded concurrency, similar to the export optimization.Example: Batch deletions with bounded concurrency
// Helper for bounded parallel execution async function batchProcess<T>(items: T[], fn: (item: T) => Promise<void>, batchSize = 10): Promise<void> { for (let i = 0; i < items.length; i += batchSize) { await Promise.all(items.slice(i, i + batchSize).map(fn)); } } // Usage in replace strategy: const existing = await kv.list<Session>(KV.sessions); await batchProcess(existing, async (session) => { await kv.delete(KV.sessions, session.id); const obs = await kv.list<CompressedObservation>(KV.observations(session.id)).catch(() => []); for (const o of obs) { await kv.delete(KV.observations(session.id), o.id); } });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/functions/export-import.ts` around lines 251 - 309, The replace branch currently deletes items serially (see the block handling KV.sessions, KV.observations, KV.memories, KV.summaries, KV.actions, KV.actionEdges, KV.routines, KV.signals, KV.checkpoints, KV.sentinels, KV.sketches, KV.crystals, KV.facets, KV.graphNodes, KV.graphEdges, KV.semantic, KV.procedural), which is slow for large datasets; introduce a bounded-parallel helper (e.g., batchProcess(items, fn, batchSize)) or use Promise.all with a limited concurrency to perform deletions in batches, update the session loop to delete sessions and their KV.observations in parallel batches, and replace the other single-item loops with batched deletion calls while preserving existing .catch(() => []) list behavior and error handling.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/functions/consolidate.ts`:
- Around line 82-92: The consolidation loop currently swallows errors by
converting failed kv.list calls to empty arrays (using .catch(() => [])), which
hides failed session fetches; update the Promise.all mapping so that failures
from kv.list(KV.observations(s.id)) are not silently dropped: capture the error
(include s.id and the error details) and either log it via the existing logger
or push an explicit failure marker so obsPerSession can reflect the failure, and
at the end of consolidate (or immediately) surface or throw an aggregated error
if any session fetches failed; ensure references to filtered, KV.observations,
CompressedObservation, and obsPerSession are used to locate and update the
behavior.
In `@src/functions/smart-search.ts`:
- Around line 22-25: The smart-search expandIds parameter supports object-form
entries with sessionId but most callers still pass string[] so requests hit the
expensive fallback; update the request contract and callers to accept
Array<string | { obsId: string; sessionId?: string }> (keep backward
compatibility with string entries), modify the code that constructs expandIds in
the API trigger and MCP server call sites (where they currently pass string[])
to include sessionId from compact result objects when available, and ensure
mem::smart-search (the async function with the expandIds param) treats string
entries as legacy obsId-only and object entries by using the provided sessionId
to perform the efficient expansion path.
- Around line 45-51: The current Promise.all over items causes unbounded
parallel calls to findObservation (which performs kv.list/kv.get) and can spike
KV reads; replace the all-at-once pattern with bounded concurrency (e.g., use a
small concurrency limit like 4–8) when invoking findObservation for each {obsId,
sessionId}. Locate the Promise.all(items.map(...)) usage in smart-search.ts and
change it to a concurrency-controlled loop or use a p-limit / async-pool style
helper so only N findObservation calls run concurrently, collecting results into
the same shape ({ obsId, sessionId, observation } or null) before continuing.
Ensure the implementation still preserves order/association of results with
input items.
---
Duplicate comments:
In `@src/functions/search.ts`:
- Around line 23-43: The rebuild is writing partial data when some session loads
fail; modify the logic to perform an atomic swap by building into a
temporary/new index and only replacing the active index when all session loads
succeed: collect observations into obsPerSession as you do, but if
failedSessions is non-empty after the batch loop, abort the rebuild (log and
return/throw) instead of proceeding to repopulate the live index; alternatively
create a new index instance, populate it from obsPerSession, and swap it into
place only when failedSessions.length === 0 — update references to
obsPerSession, failedSessions, getContext()/ctx.logger.warn, KV.observations and
kv.list to implement this control flow.
In `@src/functions/smart-search.ts`:
- Around line 31-37: The normalization in expandIds currently casts sessionId
without validating it, allowing malformed non-string sessionId values through;
update the object branch in the raw.map that creates items so it only assigns
sessionId when typeof (entry as any).sessionId === "string" (otherwise set
sessionId to undefined), keeping obsId validation as-is and preserving the
existing filter type guard for NonNullable items; this ensures sessionId is a
real string before use in hinted lookups.
---
Nitpick comments:
In `@src/functions/export-import.ts`:
- Around line 311-339: The import loops for importData.sessions and
importData.observations are sequential and should be converted to batched,
bounded-concurrency parallel tasks: collect per-item async tasks that perform
the existing "skip" existence check against kv.get (for KV.sessions and
KV.observations(sessionId)) and then kv.set, but run them through a concurrency
limiter (e.g., p-limit or a simple worker queue) instead of awaiting each item
inline; update stats using local counters inside each task and aggregate them
after Promise.allSettled to avoid race conditions on shared stats, and ensure
errors from kv.get/kv.set are caught and logged per task to preserve behavior.
Use the existing identifiers importData.sessions, importData.observations,
strategy, KV.sessions, KV.observations(sessionId), and stats to locate and
replace the sequential loops.
- Around line 251-309: The replace branch currently deletes items serially (see
the block handling KV.sessions, KV.observations, KV.memories, KV.summaries,
KV.actions, KV.actionEdges, KV.routines, KV.signals, KV.checkpoints,
KV.sentinels, KV.sketches, KV.crystals, KV.facets, KV.graphNodes, KV.graphEdges,
KV.semantic, KV.procedural), which is slow for large datasets; introduce a
bounded-parallel helper (e.g., batchProcess(items, fn, batchSize)) or use
Promise.all with a limited concurrency to perform deletions in batches, update
the session loop to delete sessions and their KV.observations in parallel
batches, and replace the other single-item loops with batched deletion calls
while preserving existing .catch(() => []) list behavior and error handling.
In `@src/viewer/server.ts`:
- Around line 72-79: The function startViewerServer currently declares unused
parameters _kv and _sdk (typed unknown) which create dead parameters; either
remove _kv and _sdk from the startViewerServer signature and update all callers
to pass only (port, secret?, restPort?) or, if they must remain for backward
compatibility, add a clear comment/JSDoc above startViewerServer explaining
they're intentionally retained for ABI stability and mark them as deprecated
(and optionally reference a TODO to remove in a future major version), and
remove the leading underscores if you opt to keep them documented rather than
unused; ensure resolvedRestPort logic remains unchanged and update any types or
exported references accordingly.
- Around line 170-173: The code currently reads a request body for methods
"POST", "PUT", "DELETE", and "PATCH" which can cause unexpected behavior for
DELETE; update the condition that assigns body to only read when a body is
actually present (e.g., check headers Content-Length > 0 or Transfer-Encoding is
set) instead of unconditionally including "DELETE", by modifying the if that
uses method and calling readBody(req) only when the header check passes;
reference the variables/methods method, body, and readBody to locate and change
this logic.
- Around line 155-158: The upstreamPath construction can produce double or
unintended trailing slashes when pathname is empty or already has a leading
slash; update the logic that sets upstreamPath (the current ternary using
pathname and "/agentmemory/") to normalize pathname first: if pathname already
starts with "/agentmemory/" keep it, otherwise strip any leading slash from
pathname and only append "/"+trimmedPath when trimmedPath is non-empty so the
result becomes "/agentmemory" for empty pathname and "/agentmemory/<path>" for
non-empty paths, avoiding double or trailing slashes.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6c5bd025-ec9e-4eeb-a279-4ec77c3ed102
📒 Files selected for processing (9)
src/functions/auto-forget.tssrc/functions/consolidate.tssrc/functions/export-import.tssrc/functions/search.tssrc/functions/smart-search.tssrc/health/monitor.tssrc/index.tssrc/types.tssrc/viewer/server.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- src/health/monitor.ts
- src/index.ts
| for (let batch = 0; batch < filtered.length; batch += 10) { | ||
| const chunk = filtered.slice(batch, batch + 10); | ||
| const results = await Promise.all( | ||
| chunk.map((s) => | ||
| kv | ||
| .list<CompressedObservation>(KV.observations(s.id)) | ||
| .catch(() => [] as CompressedObservation[]), | ||
| ), | ||
| ); | ||
| for (const obs of observations) { | ||
| obsPerSession.push(...results); | ||
| } |
There was a problem hiding this comment.
Avoid silently dropping failed session fetches during consolidation.
At Line 88, failures are converted to empty arrays without any warning, so consolidation can quietly produce incomplete output.
Proposed fix
const allObs: Array<CompressedObservation & { sid: string }> = [];
const obsPerSession: CompressedObservation[][] = [];
+const failedSessions: string[] = [];
for (let batch = 0; batch < filtered.length; batch += 10) {
const chunk = filtered.slice(batch, batch + 10);
const results = await Promise.all(
- chunk.map((s) =>
- kv
- .list<CompressedObservation>(KV.observations(s.id))
- .catch(() => [] as CompressedObservation[]),
- ),
+ chunk.map(async (s) => {
+ try {
+ return await kv.list<CompressedObservation>(KV.observations(s.id));
+ } catch {
+ failedSessions.push(s.id);
+ return [] as CompressedObservation[];
+ }
+ }),
);
obsPerSession.push(...results);
}
+if (failedSessions.length > 0) {
+ ctx.logger.warn("consolidate: failed to load observations for sessions", {
+ failedSessionsCount: failedSessions.length,
+ });
+}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/functions/consolidate.ts` around lines 82 - 92, The consolidation loop
currently swallows errors by converting failed kv.list calls to empty arrays
(using .catch(() => [])), which hides failed session fetches; update the
Promise.all mapping so that failures from kv.list(KV.observations(s.id)) are not
silently dropped: capture the error (include s.id and the error details) and
either log it via the existing logger or push an explicit failure marker so
obsPerSession can reflect the failure, and at the end of consolidate (or
immediately) surface or throw an aggregated error if any session fetches failed;
ensure references to filtered, KV.observations, CompressedObservation, and
obsPerSession are used to locate and update the behavior.
| async (data: { | ||
| query?: string; | ||
| expandIds?: Array<string | { obsId: string; sessionId: string }>; | ||
| limit?: number; |
There was a problem hiding this comment.
sessionId hint path is not effectively reachable from current API surfaces.
This function now supports object-form expandIds, but provided ingress code paths still submit string[] (src/triggers/api.ts:443-462, src/mcp/server.ts:175-199). That leaves most requests on the expensive fallback scan path.
Suggested follow-up
- Keep backward compatibility, but update the request contracts that call
mem::smart-searchto also accept:Array<string | { obsId: string; sessionId?: string }>
- Then pass through
sessionIdfrom compact results when clients request expansion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/functions/smart-search.ts` around lines 22 - 25, The smart-search
expandIds parameter supports object-form entries with sessionId but most callers
still pass string[] so requests hit the expensive fallback; update the request
contract and callers to accept Array<string | { obsId: string; sessionId?:
string }> (keep backward compatibility with string entries), modify the code
that constructs expandIds in the API trigger and MCP server call sites (where
they currently pass string[]) to include sessionId from compact result objects
when available, and ensure mem::smart-search (the async function with the
expandIds param) treats string entries as legacy obsId-only and object entries
by using the provided sessionId to perform the efficient expansion path.
| const results = await Promise.all( | ||
| items.map(({ obsId, sessionId }) => | ||
| findObservation(kv, obsId, sessionId).then((obs) => | ||
| obs ? { obsId, sessionId: obs.sessionId, observation: obs } : null, | ||
| ), | ||
| ), | ||
| ); |
There was a problem hiding this comment.
Unbounded item-level parallelism can still cause KV bursts.
Each findObservation may do its own session scan, and Promise.all across all items multiplies kv.list + kv.get pressure. With 20 expand IDs, this can still spike reads.
Bound item-level concurrency
- const results = await Promise.all(
- items.map(({ obsId, sessionId }) =>
- findObservation(kv, obsId, sessionId).then((obs) =>
- obs ? { obsId, sessionId: obs.sessionId, observation: obs } : null,
- ),
- ),
- );
- for (const r of results) {
- if (r) expanded.push(r);
- }
+ const ITEM_BATCH_SIZE = 5;
+ for (let i = 0; i < items.length; i += ITEM_BATCH_SIZE) {
+ const batch = items.slice(i, i + ITEM_BATCH_SIZE);
+ const results = await Promise.all(
+ batch.map(({ obsId, sessionId }) =>
+ findObservation(kv, obsId, sessionId).then((obs) =>
+ obs ? { obsId, sessionId: obs.sessionId, observation: obs } : null,
+ ),
+ ),
+ );
+ for (const r of results) {
+ if (r) expanded.push(r);
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/functions/smart-search.ts` around lines 45 - 51, The current Promise.all
over items causes unbounded parallel calls to findObservation (which performs
kv.list/kv.get) and can spike KV reads; replace the all-at-once pattern with
bounded concurrency (e.g., use a small concurrency limit like 4–8) when invoking
findObservation for each {obsId, sessionId}. Locate the
Promise.all(items.map(...)) usage in smart-search.ts and change it to a
concurrency-controlled loop or use a p-limit / async-pool style helper so only N
findObservation calls run concurrently, collecting results into the same shape
({ obsId, sessionId, observation } or null) before continuing. Ensure the
implementation still preserves order/association of results with input items.
Summary
Resolves 8 open issues in one batch — net -536 lines of code.
Promise.allparallelization in 5 filesTest plan
maxSessions=10&offset=0kvConnectivityfieldCloses #18 #20 #23 #24 #42 #43 #48 #66
Summary by CodeRabbit
New Features
Performance
Behavior