-
Notifications
You must be signed in to change notification settings - Fork 44
Fixed memory leaks in topic reader when creating a large number of instances in sequence #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2868d02 to
8b2c440
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR addresses memory leaks in the topic reader that occur when creating and destroying reader instances frequently. The fixes focus on proper cleanup of resources including abort signals, event listeners, message buffers, partition sessions, and queue items.
Key Changes:
- Replaced
AbortSignal.any()with manual signal merging and proper listener cleanup to prevent memory accumulation - Added
reset()method to AsyncPriorityQueue and enhanceddispose()to properly clear queue state - Enhanced resource cleanup in
destroy()methods to clear partition sessions, message buffers, and pending commits
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/topic/vitest.config.ts | Increased test timeout to 15 seconds for integration tests |
| packages/topic/tests/memory-leak.test.ts | Added comprehensive memory leak test that creates/destroys 50,000 readers to validate stable memory consumption |
| packages/topic/src/reader/types.ts | Extended TopicReader interface to support both sync and async disposal patterns |
| packages/topic/src/reader/index.ts | Added stream promise tracking, enhanced cleanup in destroy methods, implemented disposal symbols for both sync and async cleanup |
| packages/topic/src/reader/_shared.ts | Updated documentation for background token refresher parameter |
| packages/topic/src/reader/_read.ts | Replaced AbortSignal.any() with manual signal merging and added proper event listener cleanup in finally block |
| packages/topic/src/reader/_consume_stream_tx.ts | Updated retry logic to call close() and reset() on queue for clean state on reconnection |
| packages/topic/src/reader/_consume_stream.ts | Updated retry logic and fixed signal checking to use retry signal instead of controller signal |
| packages/topic/src/queue.ts | Added reset() method to clear queue state and enhanced dispose() to prevent memory leaks |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| state.outgoingQueue.close() | ||
| state.outgoingQueue.reset() |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition: close() and reset() are called sequentially on state.outgoingQueue at the start of each retry. However, the queue is shared state that could be accessed by concurrent operations (e.g., the read generator trying to send read requests, or the background token refresher). This could lead to:
push()being called betweenclose()andreset(), which would throw "Queue closed" error- Iterator operations happening during the reset
Consider either:
- Using a new queue instance for each retry instead of resetting the existing one, or
- Adding proper synchronization to prevent concurrent access during the close/reset sequence
| state.outgoingQueue.close() | ||
| state.outgoingQueue.reset() |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential race condition: Same issue as in _consume_stream.ts - close() and reset() are called sequentially on a shared queue that could be accessed concurrently by the read generator or background operations. This could cause "Queue closed" errors or iterator issues during the reset sequence.
packages/topic/src/reader/index.ts
Outdated
| state.controller.abort(reason) | ||
| state.outgoingQueue.close() | ||
| state.outgoingQueue.dispose() | ||
| state.readOffsets.clear() | ||
| state.partitionSessions.clear() | ||
| state.buffer.length = 0 | ||
| state.disposed = true |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of cleanup operations in destroy() is inconsistent with the regular TopicReader. In TopicReader.destroy(), state.disposed is set to true before calling state.controller.abort(), while here it's set after.
Setting disposed = true first is safer because it prevents re-entry issues - if any code checks state.disposed during the abort process, it will correctly see that the reader is already being disposed. This should match the pattern used in the regular reader for consistency.
packages/topic/src/reader/index.ts
Outdated
|
|
||
| // Wait for the stream consumption to finish (with timeout) | ||
| if (streamPromise) { | ||
| try { | ||
| await Promise.race([ | ||
| streamPromise, | ||
| new Promise<void>((resolve) => | ||
| setTimeout(resolve, 1000) | ||
| ), // 1s timeout | ||
| ]) | ||
| } catch { | ||
| // Ignore errors from stream - it's already being closed | ||
| } | ||
| } |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The async disposal waits for close() to complete but then also waits for streamPromise with a timeout. However, close() internally calls destroy() at the end (line 138), which aborts the controller signal that _consume_stream is waiting on. This means that streamPromise should complete quickly after close() returns.
The additional wait for streamPromise with a 1-second timeout may be unnecessary since the stream should already be finishing due to the abort signal. Consider either:
- Removing the streamPromise wait since destroy was already called, or
- Adding a comment explaining why this additional wait is needed despite close() calling destroy()
| // Wait for the stream consumption to finish (with timeout) | |
| if (streamPromise) { | |
| try { | |
| await Promise.race([ | |
| streamPromise, | |
| new Promise<void>((resolve) => | |
| setTimeout(resolve, 1000) | |
| ), // 1s timeout | |
| ]) | |
| } catch { | |
| // Ignore errors from stream - it's already being closed | |
| } | |
| } | |
| // No need to wait for streamPromise: close() already calls destroy(), which aborts the stream. |
packages/topic/src/reader/types.ts
Outdated
| disposed: boolean | ||
|
|
||
| // Data structures | ||
| // Note: outgoingQueue is mutable because it needs to be recreated on each retry |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "outgoingQueue is mutable because it needs to be recreated on each retry" but the field is marked as readonly and the implementation doesn't recreate it - it calls close() and reset() on the same instance instead.
Either the comment should be updated to reflect the actual implementation, or if the queue truly should be recreated on retry (which might be safer to avoid shared state issues), the implementation should be changed to match the comment.
| // Note: outgoingQueue is mutable because it needs to be recreated on each retry | |
| // Note: outgoingQueue is reset (not recreated) on each retry |
packages/topic/src/queue.ts
Outdated
| reset() { | ||
| dbg.log('resetting queue, clearing %d items', this.heap.length) | ||
| this.heap.length = 0 | ||
| this.closed = false |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reset() method doesn't clear pending operations (pendingShift and pendingResume). When called after close(), there may be pending promises that were created during the next() call that will never be resolved. This could cause memory leaks if callers are waiting on these promises.
Consider adding cleanup for these pending operations:
reset() {
dbg.log('resetting queue, clearing %d items', this.heap.length)
this.heap.length = 0
this.closed = false
// Clean up any pending operations
if (this.pendingShift) {
this.pendingShift({ value: undefined as any, done: true })
delete this.pendingShift
}
if (this.pendingResume) {
this.pendingResume()
delete this.pendingResume
}
}| this.closed = false | |
| this.closed = false | |
| // Clean up any pending operations | |
| if (this.pendingShift) { | |
| dbg.log('resolving pending shift with done: true (reset)') | |
| let resolve = this.pendingShift | |
| delete this.pendingShift | |
| resolve({ value: undefined as any, done: true }) | |
| } | |
| if (this.pendingResume) { | |
| dbg.log('resolving pending resume (reset)') | |
| let resolve = this.pendingResume | |
| delete this.pendingResume | |
| resolve() | |
| } |
| if (messageCount >= limit) { | ||
| ctx.buffer.unshift(response) // Put the response back to the front of the buffer | ||
| break | ||
| } |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition 'message ... = limit' is always false.
| if (messageCount >= limit) { | |
| ctx.buffer.unshift(response) // Put the response back to the front of the buffer | |
| break | |
| } |
| if (messageCount >= limit) { | ||
| batch.messageData.unshift(msg) // Put the message back to the front of the batch | ||
| break | ||
| } |
Copilot
AI
Dec 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition 'message ... = limit' is always false.
| if (messageCount >= limit) { | |
| batch.messageData.unshift(msg) // Put the message back to the front of the batch | |
| break | |
| } |
Signed-off-by: Vladislav Polyakov <polRk@ydb.tech>
cd2154f to
9e9fe6e
Compare
What
This PR addresses several memory leaks in the topic reader implementation that could cause excessive memory consumption when creating and destroying readers frequently.
Why
These changes fix memory leaks that could cause applications using the topic reader to consume increasing amounts of memory over time, especially in scenarios where readers are frequently created and destroyed. The fixes ensure proper cleanup of resources including:
Changes
Testing
The memory leak test validates that memory consumption remains stable even after creating and destroying 50,000 readers.
Checklist