feat(lib-data-stream-redis): add Micrometer metrics — design doc#70
Merged
Conversation
Spec for adding optional Micrometer-based metrics (backlog gauge, processed counter with outcome tag, processing-time Timer with percentiles) to AbstractMessageStream, behind a compileOnly micrometer-core dependency. Covers multi-app tagging strategy and downstream migration for Wave and Sched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds optional Micrometer metrics to AbstractMessageStream behind a compileOnly micrometer-core dependency. Consumers opt in by passing a non-null MeterRegistry through the new 2-arg constructor; the existing 1-arg constructor is preserved unchanged. Meters (all tagged with stream=<subclass.name()> and stream_id=<actual key>): - seqera.stream.entries (Gauge) - backlog - seqera.stream.messages (Counter) - outcome=processed|failed|errored - seqera.stream.processing (Timer) - p25/p50/p75/p95/p99 + histogram buckets Version bump 1.3.0 -> 1.4.0. README and changelog updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A classloader-safety test (StreamMetricsClassloaderTest) revealed that putting MeterRegistry directly in AbstractMessageStream's constructor signature forces micrometer-core onto consumers' runtime classpath: Java reflection (Class#getDeclaredConstructors, used by Groovy's metaclass machinery, Micronaut, debuggers, etc.) resolves every parameter type even for unused constructors. Push the Micrometer types behind the StreamMetrics interface. AbstractMessageStream now references only StreamMetrics (a neutral interface in io.seqera.data.stream.metrics) and NoopStreamMetrics. Consumers that want Micrometer construct MicrometerStreamMetrics themselves and pass it in. Subclasses without metrics remain loadable on classpaths without micrometer-core, verified by StreamMetricsClassloaderTest using an isolated URLClassLoader. Public API additions under io.seqera.data.stream.metrics: - StreamMetrics (interface) - MicrometerStreamMetrics (Micrometer-backed impl) - NoopStreamMetrics (#INSTANCE) - Outcome (enum: processed | failed | errored | empty) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The original spec assumed HotSpot's lazy descriptor resolution would keep MeterRegistry out of the loaded class set when AbstractMessageStream took it as a Nullable constructor parameter. The dedicated classloader test exposed that this assumption fails under reflection (Class#getDeclaredConstructors, used by Groovy MetaClass, Micronaut, debuggers, etc.) — a single Groovy property access on the Class object triggered NoClassDefFoundError. Update the spec to reflect the actual design: - AbstractMessageStream takes a neutral StreamMetrics interface, not MeterRegistry. Micrometer types live entirely in the new io.seqera.data.stream.metrics package. - §4.2 rewritten with the classloader-safety story (what was wrong, why, and the resolution). - §5.1 / §5.3 updated for StreamMetrics, ConsumerStatus plumbing, and the Outcome default-initialization for definite assignment. - §6.0 documents the WeakReference gotcha that bit the gauge on first run. - §9 lists the actual tests that landed; the deferred Redis-Testcontainers metrics test is documented as deliberately out of scope. - §10 updated with the consumer-side MicrometerStreamMetrics construction. - §11 marks step 1 done. - §12 records resolutions of the deferred items. - §13 risk register tightened around the classloader regression guard. - Module version corrected to 1.4.0 (was 1.6.0 in the draft); status banner updated to "Implemented (PR #70)". Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add CommandQueue(MessageStream, StreamMetrics) so subclasses can pass a MicrometerStreamMetrics through to AbstractMessageStream. The existing 1-arg constructor is preserved unchanged. Required for downstream consumers (sched, wave) that subclass CommandQueue to opt into the lib-data-stream-redis 1.4.0 metrics. Bumps to 0.4.0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Expand the README's Metrics section with: - Complete tag-by-tag table for the three meters - outcome enum reference (processed / failed / errored) - Real scrape output captured from sched running locally - PromQL recipes for throughput, error rate, percentile latencies, mean and max, and backlog The example output and queries match what was verified end-to-end with the sched backend pointed at the local libseqera composite build. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… delta Drop the ConsumerStatus holder and the 4-arg processMessage overload that was used to bubble the consumer's return value out of the stream.consume lambda. The existing 3-arg processMessage already increments the shared AtomicInteger before invoking the consumer, so the polling loop can derive the outcome from: - count delta zero -> EMPTY (lambda never ran) - count delta nonzero, ret -> PROCESSED or FAILED based on accepted - Throwable caught -> ERRORED Side benefit: when LocalMessageStream swallows a consumer exception internally (it catches Throwable, logs, returns false, re-queues), the new logic classifies it as FAILED instead of the previous EMPTY — more accurate than the holder-based approach. Extract the per-stream consume cycle into a private consumeOne(...) so the processMessages loop body reads as it did pre-metrics. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Spec (§4.1, §5.3, §12.1) rewritten to describe the count-delta outcome detection that actually shipped; ConsumerStatus holder references kept only as the "earlier draft, dropped during implementation" footnote. - README: empty polls phrasing fixed — they are ignored, not "recorded internally". - AbstractMessageStreamMetricsTest: rename misleading test case to match the NoopStreamMetrics path it actually exercises. - MicrometerStreamMetrics.bindBacklog: guard against duplicate registration via putIfAbsent + warn log (Micrometer caches gauges by name+tags, so a silent overwrite of the supplier would otherwise leak). - CommandQueue: log "metrics=enabled|disabled" instead of leaking the StreamMetrics impl class name into ops logs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Initial PR for adding optional Micrometer/Prometheus metrics to
lib-data-stream-redis. This first commit lands the design spec only — implementation will follow in subsequent commits on this same branch.The spec is at
docs/superpowers/specs/2026-05-13-lib-data-stream-redis-metrics-design.md.Metrics surfaced
seqera.stream.entries— Gauge (backlog per stream)seqera.stream.messages— Counter withoutcome=processed|failed|erroredseqera.stream.processing— Timer withpublishPercentiles(0.25, 0.5, 0.75, 0.95, 0.99)+ histogram bucketsAll tagged with
stream(subclassname()) andstream_id(Redis key, e.g.jobs-pending/v2). App-level segregation (Wave vs Sched) is handled at theMeterRegistryboundary viamicronaut.metrics.tags.application— no library knob needed.Approach
compileOnly "io.micrometer:micrometer-core:1.12.4"— mirrors thelib-jedis-poolprecedent; no runtime dep forced on consumers.AbstractMessageStream(works for bothRedisMessageStreamandLocalMessageStream).MeterRegistry-aware constructor; existing 1-arg constructor preserved → fully backward-compatible.lib-data-stream-redis@1.6.0.What's next on this PR
StreamMetrics/NoopStreamMetricshelper classesAbstractMessageStreamconstructor +addConsumergauge +processMessagestimer/counterConsumerStatustri-state plumbing throughprocessMessageNoClassDefFoundErrorwithout micrometer-core at runtime)Test plan
./gradlew :lib-data-stream-redis:checkpassesSimpleMeterRegistryunit tests assert: gauge tracks length, counter increments per outcome, timer records only on non-EMPTY outcomesAbstractMessageStreamon a classloader withoutmicrometer-core, construct via 1-arg ctor, noNoClassDefFoundError🤖 Generated with Claude Code