fix(ui): add buffer size limits to prevent memory leaks in streaming#1302
fix(ui): add buffer size limits to prevent memory leaks in streaming#1302opspawn wants to merge 3 commits intokagent-dev:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds size limits and truncation logic to the UI’s SSE streaming implementations to prevent unbounded buffer growth (and associated memory leaks) during long-running conversations.
Changes:
- Added per-stream constants for max buffer size, chunk size, and max total message size.
- Tracks total processed bytes and aborts the stream when exceeding the 10MB limit.
- Truncates the in-memory buffer when it grows beyond 1MB to cap memory usage.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
ui/src/lib/a2aClient.ts |
Adds buffer/message size limiting in processSSEStream() to cap memory growth while decoding/processing SSE. |
ui/src/app/a2a/[namespace]/[agentName]/route.ts |
Adds equivalent buffer/message size limiting in the proxy route’s stream pump() to prevent server-side buffer growth. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ui/src/lib/a2aClient.ts
Outdated
| const MAX_BUFFER_SIZE = 1024 * 1024; // 1 MB | ||
| const CHUNK_SIZE = 16 * 1024; // 16 KB | ||
| const MAX_MESSAGE_SIZE = 10 * 1024 * 1024; // 10 MB |
There was a problem hiding this comment.
MAX_BUFFER_SIZE / CHUNK_SIZE are documented in bytes, but buffer.length and buffer.slice(...) operate on UTF-16 code units. The effective limit will vary with non-ASCII input and can exceed the intended memory bound. Consider tracking buffer size in bytes (or rename the constants/comments to make it explicit this is character-based).
| processedSize += value.length; | ||
| if (processedSize > MAX_MESSAGE_SIZE) { | ||
| throw new Error('Message size exceeds maximum allowed limit of 10MB'); | ||
| } |
There was a problem hiding this comment.
Given the new truncation/size-limit behavior, it would be good to add Jest unit tests for processSSEStream() to cover: parsing multiple events across chunks, ensuring buffer trimming doesn’t drop complete events, and the MAX_MESSAGE_SIZE error path.
| if (buffer.length > MAX_BUFFER_SIZE) { | ||
| const lines = buffer.split('\n'); | ||
| const lastLine = lines.pop() || ''; | ||
| buffer = lastLine; | ||
| if (buffer.length > MAX_BUFFER_SIZE) { |
There was a problem hiding this comment.
As in the client implementation, the buffer-size enforcement runs before extracting complete SSE events (\n\n). If buffer temporarily exceeds the limit while containing one or more complete events, this truncation can drop those events and break the proxied stream. Process complete events first, then apply the buffer cap only to the remaining incomplete tail (or fail fast if the tail grows beyond the limit).
| processedSize += value.length; | ||
| if (processedSize > MAX_MESSAGE_SIZE) { | ||
| throw new Error('Message size exceeds maximum allowed limit of 10MB'); | ||
| } |
There was a problem hiding this comment.
Throwing on MAX_MESSAGE_SIZE is caught by the .catch(...), but the backend reader/response body is not cancelled/aborted there. That can leave the upstream connection open until it naturally closes. When this limit triggers, cancel the reader and ideally abort the backend fetch (via an AbortController) before calling controller.error(...).
ui/src/lib/a2aClient.ts
Outdated
| if (buffer.length > MAX_BUFFER_SIZE) { | ||
| const lines = buffer.split('\n'); | ||
| const lastLine = lines.pop() || ''; | ||
| buffer = lastLine; | ||
| if (buffer.length > MAX_BUFFER_SIZE) { |
There was a problem hiding this comment.
The buffer-size enforcement runs before the loop that extracts complete SSE events (\n\n). If the chunk pushes buffer over the limit while it already contains one or more complete events, this truncation will discard those unprocessed events and silently lose data. Consider processing all complete events first, then applying the buffer cap only to the remaining incomplete tail (or explicitly error out when the tail exceeds the limit).
| processedSize += value.length; | ||
| if (processedSize > MAX_MESSAGE_SIZE) { | ||
| throw new Error('Message size exceeds maximum allowed limit of 10MB'); | ||
| } |
There was a problem hiding this comment.
When processedSize exceeds MAX_MESSAGE_SIZE this throws and exits the generator, but the underlying fetch/body may remain open because only reader.releaseLock() runs in finally. To avoid keeping the connection/resources alive (and to make this limit actually stop the stream), cancel the reader/stream (e.g., await reader.cancel(...)) before releasing the lock when exiting due to an error/limit breach.
- Fix await in non-async function error in route.ts pump() by converting from .then() chaining to async/await - Add proper reader.cancel() cleanup in both files on error/limit - Add cleanup helper in route.ts to cancel reader and clear timers - Clarify byte vs character count in constant comments - Process complete SSE events before buffer size enforcement (already correct in prior commit, preserved) Addresses Copilot review comments on PR kagent-dev#1302: 1. Resource cleanup: reader properly cancelled on error paths 2. Buffer logic ordering: events extracted before truncation 3. Byte vs char: constants annotated with measurement unit
Add MAX_BUFFER_SIZE (1MB), CHUNK_SIZE (16KB), and MAX_MESSAGE_SIZE (10MB) limits to SSE streaming buffers in both a2aClient.ts and route.ts to prevent unbounded memory growth during long-running streams. Closes kagent-dev#466 Signed-off-by: opspawn <agent@opspawn.com> Signed-off-by: opspawn <opspawn@users.noreply.github.com>
…r on size limit Process complete SSE events before applying buffer size cap to avoid dropping events mid-stream. Cancel the reader when MAX_MESSAGE_SIZE is exceeded instead of just throwing. Signed-off-by: opspawn <opspawn@users.noreply.github.com>
- Fix await in non-async function error in route.ts pump() by converting from .then() chaining to async/await - Add proper reader.cancel() cleanup in both files on error/limit - Add cleanup helper in route.ts to cancel reader and clear timers - Clarify byte vs character count in constant comments - Process complete SSE events before buffer size enforcement (already correct in prior commit, preserved) Addresses Copilot review comments on PR kagent-dev#1302: 1. Resource cleanup: reader properly cancelled on error paths 2. Buffer logic ordering: events extracted before truncation 3. Byte vs char: constants annotated with measurement unit Signed-off-by: opspawn <opspawn@users.noreply.github.com>
fe699f7 to
e3d098e
Compare
Summary
Fixes unbounded buffer growth in SSE streaming code that could cause memory leaks during long-running agent conversations.
Changes
MAX_BUFFER_SIZE(1 MB),CHUNK_SIZE(16 KB), andMAX_MESSAGE_SIZE(10 MB) constants to both streaming implementationsFiles Changed
ui/src/lib/a2aClient.ts—processSSEStream()methodui/src/app/a2a/[namespace]/[agentName]/route.ts—pump()functionCloses #466