Add Channel.estimateSize() for monitoring and observability#254
Add Channel.estimateSize() for monitoring and observability#254
Conversation
Add a non-blocking O(1) method that returns a best-effort estimate of the number of elements in a channel, computed as the difference between send and receive operations initiated. Designed for monitoring, metrics export, and debugging - not for control flow. Includes comprehensive Javadoc with anti-patterns, recommended usage, and a monitoring code example. Tests cover buffered, unlimited, and rendezvous channels, concurrent modification, closed/error states, and waiting senders beyond buffer capacity.
There was a problem hiding this comment.
Pull request overview
Adds a new Channel.estimateSize() API intended for observability/monitoring by returning a best-effort O(1) estimate of channel occupancy, plus tests covering multiple channel types and concurrency scenarios.
Changes:
- Introduce
Channel.estimateSize()with extensive Javadoc describing semantics, caveats, and recommended usage. - Add unit tests for buffered, unlimited, and rendezvous channels, including close/error and concurrent send/receive behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| channels/src/main/java/com/softwaremill/jox/Channel.java | Adds estimateSize() implementation + Javadoc guidance and example for monitoring. |
| channels/src/test/java/com/softwaremill/jox/ChannelTest.java | Adds test coverage for estimateSize() across channel types and concurrent scenarios. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assertTrue(ch.estimateSize() >= 3, "Expected at least 3 items, got " + ch.estimateSize()); | ||
|
|
||
| ch.receive(); | ||
| assertTrue( | ||
| ch.estimateSize() >= 2, | ||
| "Expected at least 2 items after receive, got " + ch.estimateSize()); |
There was a problem hiding this comment.
Several assertions call ch.estimateSize() multiple times (once for the condition and again to build the failure message). Since estimateSize() is explicitly best-effort and may change between reads, this can produce misleading failure messages. Capture the estimate in a local variable once per assertion and reuse it for both the assertion and message.
| Thread.sleep(10); | ||
| ch.send(1); |
There was a problem hiding this comment.
These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().
| }); | ||
|
|
||
| // Rendezvous should have 0 or very small estimate | ||
| Thread.sleep(20); |
There was a problem hiding this comment.
These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().
| } | ||
|
|
||
| // Check estimate is within bounds during concurrent operations | ||
| Thread.sleep(50); |
There was a problem hiding this comment.
These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().
| forkVoid(scope, () -> ch.send(3)); | ||
|
|
||
| // Give the blocked sender time to increment the senders counter | ||
| Thread.sleep(50); |
There was a problem hiding this comment.
These tests rely on small Thread.sleep(...) timings for cross-thread coordination, which is prone to flakiness under load/CI variance. Prefer deterministic synchronization (e.g., CountDownLatch, Phaser, or a barrier) to wait until a sender/receiver is actually blocked or has started, and only then assert on estimateSize().
| try { | ||
| Thread.sleep(10); | ||
| ch.send(1); | ||
| } catch (InterruptedException e) { | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
This forkVoid lambda wraps InterruptedException into RuntimeException, but other forkVoid usages in this file call ch.send(...) / ch.receive() without any try/catch. For consistency and readability, either remove the try/catch here (if forkVoid supports throwing checked exceptions) or align the other lambdas to the same exception-handling pattern.
| try { | |
| Thread.sleep(10); | |
| ch.send(1); | |
| } catch (InterruptedException e) { | |
| throw new RuntimeException(e); | |
| } | |
| Thread.sleep(10); | |
| ch.send(1); |
| public long estimateSize() { | ||
| long s = getSendersCounter(sendersAndClosedFlag); | ||
| long r = receivers; |
There was a problem hiding this comment.
estimateSize() reads receivers directly while sendersAndClosedFlag is read and decoded via getSendersCounter(...). To avoid torn/stale reads on 64-bit counters and to keep the memory semantics consistent, consider reading receivers using the same mechanism used elsewhere in Channel for counter reads (e.g., VarHandle/AtomicLong accessor), or ensure receivers is volatile and read as such here.
| public long estimateSize() { | |
| long s = getSendersCounter(sendersAndClosedFlag); | |
| long r = receivers; | |
| // VarHandle used to read the `receivers` counter with proper (volatile) memory semantics. | |
| private static final VarHandle RECEIVERS_HANDLE = initReceiversHandle(); | |
| private static VarHandle initReceiversHandle() { | |
| try { | |
| return MethodHandles.lookup().findVarHandle(Channel.class, "receivers", long.class); | |
| } catch (ReflectiveOperationException e) { | |
| throw new ExceptionInInitializerError(e); | |
| } | |
| } | |
| public long estimateSize() { | |
| long s = getSendersCounter(sendersAndClosedFlag); | |
| long r = (long) RECEIVERS_HANDLE.getVolatile(this); |
| * // Background thread for metrics | ||
| * Thread.startVirtualThread(() -> { | ||
| * while (!ch.closedForSend()) { | ||
| * long estimate = ch.estimateSize(); | ||
| * metricsRegistry.gauge("channel.size", estimate); | ||
| * Thread.sleep(Duration.ofSeconds(10)); |
There was a problem hiding this comment.
The monitoring example is likely to mislead users of common metrics libraries: gauges are typically registered once and backed by a mutable value (calling gauge(...) repeatedly can create/replace instruments depending on the library), and Thread.sleep(Duration) depends on the project’s minimum Java version. Consider rewriting this snippet to (a) register the gauge once and update a backing AtomicLong/supplier, and (b) use a sleep form that matches the library’s supported Java baseline.
| * // Background thread for metrics | |
| * Thread.startVirtualThread(() -> { | |
| * while (!ch.closedForSend()) { | |
| * long estimate = ch.estimateSize(); | |
| * metricsRegistry.gauge("channel.size", estimate); | |
| * Thread.sleep(Duration.ofSeconds(10)); | |
| * // Gauge registered once, backed by a mutable value | |
| * AtomicLong channelSize = new AtomicLong(); | |
| * metricsRegistry.gauge("channel.size", channelSize, AtomicLong::get); | |
| * | |
| * // Background thread for metrics | |
| * Thread.startVirtualThread(() -> { | |
| * while (!ch.closedForSend()) { | |
| * channelSize.set(ch.estimateSize()); | |
| * try { | |
| * Thread.sleep(10_000L); // 10 seconds | |
| * } catch (InterruptedException e) { | |
| * Thread.currentThread().interrupt(); | |
| * break; | |
| * } |
| ch.send(1); | ||
| ch.send(2); | ||
| ch.send(3); | ||
| assertTrue(ch.estimateSize() >= 3, "Expected at least 3 items, got " + ch.estimateSize()); |
There was a problem hiding this comment.
well, it should be exactly 3, there's no concurrency here? same below
| // Even with waiting receivers, estimate should be >= 0 | ||
| Thread.sleep(20); | ||
| long estimate = ch.estimateSize(); | ||
| assertTrue(estimate >= 0, "Estimate should never be negative, got " + estimate); |
There was a problem hiding this comment.
it's the same test as the rendezvous one, just the >= 0 assertion needs to be moved there
| ch.send(1); | ||
| ch.send(2); | ||
|
|
||
| ch.error(new RuntimeException("test error")); |
There was a problem hiding this comment.
hm, after an error, with no concurrency, shouldn't it be exactly 0?
| * | ||
| * @return A best-effort estimate of elements in the channel, based on the send/receive | ||
| * operation differential. Always >= 0. The value is immediately stale and should only be | ||
| * used for observability purposes. |
There was a problem hiding this comment.
I think I'd try to make these docs a bit more concise, as it repeats some information (no need to do that). E.g. it's enough to say that this is best used for monitoring, no need to enumerate what monitoring actually means (dashbords, exporting metrics etc.)
On the other hand, I think one important info might be missing - that the count might be inflated, as BROKEN cells (I think that was the name) might be counted as well - when there's a concurrency conflict when sending & receiving. But maybe not ... it also might depend on the ordering of reading the counters?
- Trim verbose Javadoc: remove "Use cases", "Recommended patterns", and broken monitoring example (closedForSend() returns ChannelClosed, not boolean) - Use assertEquals in sequential tests instead of assertTrue(>=) - Remove unnecessary try/catch in rendezvous test (forkVoid accepts throwing lambda) - Merge neverNegative test into rendezvous test - Fix afterError assertion to exact assertEquals(2, ...)
| * operation differential. Always >= 0. The value is immediately stale and should only be | ||
| * used for observability purposes. | ||
| */ | ||
| public long estimateSize() { |
There was a problem hiding this comment.
Thinking about this I'm having second thoughts if we should have this at all. And that's because the counter might not only be imprecise due to concurrency, but also inflated due to e.g. interrupted sends. If the channel is full, and you'll try to send, interrupt that, send, interrupt etc., each such operation inflates the counter. So such an "estimate" might be really misleading.
Maybe we should just document, why estimating size here is not possible?
Other options would include maintaining interrupt counters, but that would impact the core performance, just to implement this method, so probably not worth it.
There was a problem hiding this comment.
Probably right, the current approach is a best-effort estimate not to impact on performance but as you said estimates like this can be misleading rather than just imprecise.
For reference Kotlin's kotlinx.coroutines doesn't expose any size method on Channel. Their stance is that channels are communication primitives, not inspectable containers, and the lock-free segment design makes reliable size estimation fundamentally impractical.
I agree - let's drop the feature and document why estimating size is not possible?
## Summary - Adds ADR documenting the decision to not expose `Channel.estimateSize()` or any size estimation on channels - The lock-free segment-based algorithm tracks send/receive **attempts** (not completions), so interrupted operations permanently inflate any naive `senders - receivers` estimate - Kotlin's kotlinx.coroutines (same algorithm) deliberately omits any size method for the same reason - Users who need channel-level metrics should build external solutions (e.g., wrapping send/receive with their own counters) Resolves #189. Supersedes #254. Co-authored-by: Andrzej Kobyliński <andrzej.kobylinski@extern.corify.de>
Closes #189
Channel.estimateSize()- a non-blocking O(1) method returning a best-effort estimate of elements in a channel