fix: use per-connection memory pool to prevent WorkingSet growth#696
fix: use per-connection memory pool to prevent WorkingSet growth#696
Conversation
MemoryPool<byte>.Shared wraps ArrayPool<byte>.Shared, a TlsOverPerCoreLockedStacksArrayPool whose per-thread and per-core caches grow to accommodate usage but never shrink. With multiple broker connections each running a dedicated read pump thread, the shared pool's retained set grows continuously — causing ~57 MB/min WorkingSet growth in 3-broker stress tests despite flat GC heap. Replace MemoryPool<byte>.Shared with a per-connection PipeMemoryPool backed by ArrayPool<byte>.Create(). When a connection is disposed, its dedicated pool and all retained arrays become GC-eligible, preventing cross-connection memory accumulation. Also cap _cancelledCorrelationIds at 10,000 entries to bound memory from orphaned timed-out requests whose responses never arrive. Fixes #663
There was a problem hiding this comment.
Code Review
This is a well-diagnosed and well-implemented fix. The root cause analysis (TLS caches in ArrayPool<byte>.Shared growing per-thread but never shrinking) is exactly right, and the per-connection pool approach is the correct architectural solution — matching the lifecycle of the memory to the lifecycle of the connection.
Architecture: Correct Approach
Scoping the ArrayPool to the connection is the right call here. The alternative approaches (periodic pool trimming via ArrayPool<byte>.Shared.Trim(), calling MemoryPool<byte>.Shared before connection teardown) would be fragile and wouldn't cleanly bound the retained set. Per-connection isolation is principled.
Potential Resource Leak on Failed Connection Attempts
In ConnectCoreAsync, _pipeMemoryPool is set early but the method can still throw (e.g., DNS resolution failure, TCP connect timeout). If ConnectCoreAsync throws after line 336 but before the connection is established, _pipeMemoryPool will be non-null but IsConnected will return false. On the next ConnectAsync call, _pipeMemoryPool is overwritten without the old one being disposed:
// ConnectCoreAsync - can throw after this line
_pipeMemoryPool = new PipeMemoryPool(); // OLD pool leaked if we throw below
await _socket.ConnectAsync(endpoint, cancellationToken).ConfigureAwait(false); // throws on failureThe practical impact is limited — PipeMemoryPool.Dispose() only sets a flag; the underlying ArrayPool.Create() has no Dispose() method and will eventually be GC'd. But for correctness, consider disposing the existing pool before creating a new one:
_pipeMemoryPool?.Dispose();
_pipeMemoryPool = new PipeMemoryPool();MaxBufferSize vs. Constructor maxArrayLength
MaxBufferSize returns int.MaxValue while the pool was constructed with maxArrayLength = 1_048_576. This is consistent with how MemoryPool<byte>.Shared behaves (it also returns int.MaxValue), and System.IO.Pipelines doesn't rely on MaxBufferSize for sizing decisions — so it's not harmful. Worth a comment though, since the discrepancy can look like a bug to a future reader.
_cancelledCorrelationIds Cap — Non-Atomic Check
if (!responseReceived && _cancelledCorrelationIds.Count < MaxCancelledCorrelationIds)
{
_cancelledCorrelationIds.TryAdd(correlationId, 0);
}The Count < Max check and TryAdd are not atomic, so the dictionary can slightly exceed 10,000 entries under high concurrency. For a cap designed to prevent unbounded growth (not a hard correctness limit), this is entirely acceptable — just worth acknowledging it's a soft cap in the comment.
PooledMemoryOwner Dispose After Pool Disposal
The disposal ordering comment in DisposeAsync says: "Must happen after pipes are completed so no IMemoryOwner returns to a disposed pool." This is slightly conservative — calling owner.Dispose() after PipeMemoryPool.Dispose() is actually safe because PooledMemoryOwner holds a direct reference to the underlying ArrayPool<byte> instance, which doesn't have a Dispose method. The array would be returned to the ArrayPool instance, which will be GC'd when no longer referenced. Consider updating the comment to be more precise:
"Placed after pipe completion so all outstanding IMemoryOwner objects are returned before the pool is released for GC."
Tests
Good coverage of the critical paths: rent, default size, double-dispose, post-dispose access, large buffers, buffer independence, and Pipe integration. One scenario not covered: disposing a PooledMemoryOwner after the parent PipeMemoryPool has been disposed. Given the implementation is safe (as described above), this would be a confidence test rather than a correctness test — minor omission.
Summary
The fix is sound and production-ready. The per-connection pool isolation cleanly solves the WorkingSet growth problem. The _cancelledCorrelationIds cap is a good defensive addition. The main thing worth addressing before merge is the potential pool leak when ConnectCoreAsync fails partway through — it's low impact given the pool's GC behavior, but it's an easy fix and keeps the resource lifecycle clean.
- Dispose existing PipeMemoryPool before creating new one in ConnectCoreAsync to prevent leak on reconnect - Add comment explaining MaxBufferSize returns int.MaxValue to match MemoryPool<byte>.Shared behavior - Document that _cancelledCorrelationIds cap is a soft cap (Count + TryAdd not atomic) - Clarify disposal ordering comment for PipeMemoryPool in DisposeAsync
There was a problem hiding this comment.
Code Review (Follow-up)
All four issues raised in the previous review have been addressed in the follow-up commit. This is now ready to merge.
Previous Feedback — All Addressed
Resource leak on failed reconnect — Fixed. _pipeMemoryPool?.Dispose() is now called before creating a new instance in ConnectCoreAsync, preventing a pool leak when reconnection fails partway through.
MaxBufferSize vs constructor maxArrayLength discrepancy — Documented. The comment now explains that int.MaxValue matches MemoryPool<byte>.Shared behavior and that System.IO.Pipelines does not use MaxBufferSize for sizing decisions.
Non-atomic Count + TryAdd for the cancelled IDs cap — Documented. The comment now explicitly calls this a soft cap and explains the intentional trade-off.
Disposal ordering comment — Clarified. The comment in DisposeAsync now accurately describes why the pool is disposed after pipe completion (outstanding IMemoryOwner<byte> objects returned before pool is released for GC), rather than the old phrasing which incorrectly implied danger in returning to a disposed pool.
Assessment
The implementation is correct and well-structured. PipeMemoryPool properly scopes buffer lifetime to connection lifetime, the PooledMemoryOwner uses Interlocked.Exchange for thread-safe idempotent dispose, and the _cancelledCorrelationIds soft cap correctly prevents unbounded growth without requiring lock-based exactness. The 8 unit tests cover the key behavioral contracts.
Main reverted to original defaults (1 MB / 4 buckets) after PR #696 merged. This PR's fix (4 MB / 32 buckets) is the correct resolution for the idempotent allocation regression.
Summary
MemoryPool<byte>.SharedwrapsArrayPool<byte>.Shared, aTlsOverPerCoreLockedStacksArrayPoolwhose per-thread (TLS) and per-core caches grow to accommodate usage patterns but never shrink. With multiple broker connections each running a dedicated read pump thread, the shared pool's retained buffer set grows continuously — causing ~57 MB/min WorkingSet growth in 3-broker stress tests despite flat GC heap.MemoryPool<byte>.Sharedwith a per-connectionPipeMemoryPoolbacked byArrayPool<byte>.Create(). When a connection is disposed, its dedicated pool and all retained arrays become GC-eligible, preventing cross-connection memory accumulation._cancelledCorrelationIdsat 10,000 entries to bound memory from orphaned timed-out requests whose responses never arrive.Changes
src/Dekaf/Networking/PipeMemoryPool.csMemoryPool<byte>backed by a dedicatedArrayPool<byte>instance per connectionsrc/Dekaf/Networking/KafkaConnection.csPipeMemoryPoolfor bothStreamPipeWriterand inputPipe; dispose on connection teardown; cap_cancelledCorrelationIdstests/Dekaf.Tests.Unit/Networking/PipeMemoryPoolTests.csTest plan
Fixes #663