[da-vinci][server] Add OTel metrics to KafkaConsumerServiceStats#2604
Conversation
There was a problem hiding this comment.
Pull request overview
Adds OpenTelemetry (OTel) instrumentation for KafkaConsumerServiceStats as part of the server-side OTel migration, including a new consumer-pool-action dimension and associated metric entity definitions/tests.
Changes:
- Introduced
KafkaConsumerServiceOtelMetricEntitywith 13 OTel metric definitions and wired it into server metric entity registration. - Refactored
KafkaConsumerServiceStatsto use the joint Tehuti+OTel MetricEntityState APIs (including an OTel-only partition assignment histogram and an enum-dimensioned “consumer action” metric). - Added a new OTel dimension enum (
VeniceConsumerPoolAction) + dimension registry updates and extensive unit/fixture tests validating OTel and Tehuti behavior.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensions.java | Adds VENICE_CONSUMER_POOL_ACTION dimension constant. |
| internal/venice-client-common/src/main/java/com/linkedin/venice/stats/dimensions/VeniceConsumerPoolAction.java | New dimension enum for subscribe vs update-assignment action tagging. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceMetricsDimensionsTest.java | Extends dimension-name format tests to cover the new dimension. |
| internal/venice-client-common/src/test/java/com/linkedin/venice/stats/dimensions/VeniceConsumerPoolActionTest.java | Fixture-based test validating dimension enum values/mapping. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceOtelMetricEntity.java | Defines OTel metric entities for KafkaConsumerServiceStats. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ServerMetricEntity.java | Registers KafkaConsumerServiceOtelMetricEntity in server entity list. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java | Main joint Tehuti+OTel refactor; adds MetricEntityState fields and recording paths. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/AggKafkaConsumerServiceStats.java | Threads Venice cluster name through stats supplier; adds total-only OTel partition recording helper. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java | Passes cluster name into agg stats; records per-consumer partition counts to the new OTel histogram. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ServerMetricEntityTest.java | Updates expected server metric entity count after registering the new entity class. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/KafkaConsumerServiceOtelMetricEntityTest.java | Fixture test validating the new metric entity definitions (name/type/unit/dims). |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/KafkaConsumerServiceStatsOtelTest.java | End-to-end tests validating OTel exports + Tehuti regression + isolation behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/KafkaConsumerServiceTehutiMetricNameTest.java | Ensures Tehuti metric-name enum maps to the expected legacy sensor names. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
ASYNC_COUNTER_FOR_HIGH_PERF_CASES registers an ObservableCounter callback per instance. For pollCountOtel and pollNonEmptyCountOtel (total-only metrics), per-store callbacks would always report zero. Pass null as otelRepository on per-store instances to avoid registering redundant callbacks, matching the same pattern used for the AsyncGauge metric. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
sushantmane
left a comment
There was a problem hiding this comment.
Thanks, @m-nagarajan!
Problem Statement
KafkaConsumerServiceStatshas 17 Tehuti sensors tracking PubSub consumer pool behavior (poll latency, record counts, partition assignments, idle time, etc.) but no OpenTelemetry instrumentation.Key challenges:
AggKafkaConsumerServiceStats.recordTotal*().max_elapsed_time_since_last_successful_polluses aLongSuppliercallback — Tehuti-only, intentionally excluded from OTel (redundant withidle_time/POLL_TIME_SINCE_LAST_SUCCESS).delegate_subscribe_latencyandupdate_current_assignment_latencyconsolidate into one OTel metric differentiated by aVeniceConsumerPoolActiondimension.Solution
Migrated 16 of 17 Tehuti sensors to 12 OTel metrics using the joint Tehuti+OTel API pattern, integrated directly into
KafkaConsumerServiceStats(no separate OtelStats class). One Tehuti sensor (max_elapsed_time_since_last_successful_pollAsyncGauge) is intentionally Tehuti-only.OTel Metric Mapping (12 OTel metrics from 17 Tehuti sensors)
bytes_per_pollingestion.pubsub.consumer.poll.bytesconsumer_poll_result_numingestion.pubsub.consumer.poll.record_countconsumer_poll_requestingestion.pubsub.consumer.poll.countconsumer_poll_request_latencyingestion.pubsub.consumer.poll.timeconsumer_poll_non_zero_result_numingestion.pubsub.consumer.poll.non_empty_countconsumer_poll_erroringestion.pubsub.consumer.poll.error_countconsumer_records_producing_to_write_buffer_latencyingestion.pubsub.consumer.produce_to_write_buffer_timedetected_deleted_topic_numingestion.pubsub.consumer.topic.detected_deleted_countdetected_no_running_ingestion_topic_partition_numingestion.pubsub.consumer.orphan_subscription_countdelegate_subscribe_latency+update_current_assignment_latencyingestion.pubsub.consumer.pool_action.timeidle_timeingestion.pubsub.consumer.poll.time_since_last_successingestion.pubsub.consumer.partition_assignment.countmax_elapsed_time_since_last_successful_pollKey design decisions
Shared OTel instrument for consumer actions (10): Two
MetricEntityStateOneEnum<VeniceConsumerPoolAction>fields share the same OTel metric entity, each binding a different Tehuti sensor. OTel deduplicates by metric name; recordings are differentiated by the SUBSCRIBE/UPDATE_ASSIGNMENT dimension value.Asymmetric partition histogram (12): OTel-only
MetricEntityStateBase(4-arg factory) records raw per-consumer partition counts. Tehuti keeps its 4 pre-computed gauges unchanged. NewrecordTotalPartitionAssignmentForOtel()method onAggKafkaConsumerServiceStatscalled fromKafkaConsumerService.recordPartitionsPerConsumerSensor().ASYNC_COUNTER per-store suppression (3, 5): Per-store instances pass
totalOnlyOtelRepo=null, suppressing OTel callbacks that would produce misleading data points (pool-wide values tagged with per-store attributes). Per-store Tehuti shares the total'sLongAdderRateGaugesensor.No
isTotalStats(true)on OpenTelemetryMetricsSetup: Unlike per-store-aggregated classes, this class has metrics that are only recorded on the total instance, so OTel must remain enabled on total.New dimension enum:
VeniceConsumerPoolAction(SUBSCRIBE, UPDATE_ASSIGNMENT) implementingVeniceDimensionInterface, with correspondingVENICE_CONSUMER_POOL_ACTIONentry inVeniceMetricsDimensions.Extracted shared
registerPerStoreAndTotal(MetricEntityState): Moved from a private method inServerHttpRequestStatsto a protected method inAbstractVeniceStats, reused by bothServerHttpRequestStatsandKafkaConsumerServiceStats.VENICE_STORE_NAME removed from total-only metrics (3–9, 11, 12): Total-only metrics are always recorded on the total instance where
storeNameis an opaque identifier (e.g.,total.kafka_consumer_service_for_<region>), not an actual store name. Including VENICE_STORE_NAME would emit a misleading dimension value. Only the 2 per-store metrics (1, 2) retain VENICE_STORE_NAME. The production code builds a separateclusterOnlyDimensionsMap(backed byCollections.unmodifiableMap) for these metrics.Files changed
New files (6):
KafkaConsumerServiceOtelMetricEntity.java— 12 OTel metric entity definitionsVeniceConsumerPoolAction.java— dimension enumKafkaConsumerServiceStatsOtelTest.java— 22 test methodsKafkaConsumerServiceOtelMetricEntityTest.java— ModuleMetricEntityTestFixtureKafkaConsumerServiceTehutiMetricNameTest.java— TehutiMetricNameEnumTestFixtureVeniceConsumerPoolActionTest.java— VeniceDimensionTestFixtureModified files (10):
KafkaConsumerServiceStats.java— added MetricEntityState fields, joint API recordingAggKafkaConsumerServiceStats.java— addedrecordTotalPartitionAssignmentForOtel()KafkaConsumerService.java— calls new OTel partition recording methodServerMetricEntity.java— registeredKafkaConsumerServiceOtelMetricEntityServerMetricEntityTest.java— updated count (87 → 99)VeniceMetricsDimensions.java— addedVENICE_CONSUMER_POOL_ACTIONVeniceMetricsDimensionsTest.java— added switch cases for new dimensionAbstractVeniceStats.java— extracted sharedregisterPerStoreAndTotal(MetricEntityState)methodAbstractVeniceStatsTest.java— test for extractedregisterPerStoreAndTotalmethodServerHttpRequestStats.java— removed duplicate privateregisterPerStoreAndTotalmethodCode changes
VeniceMetricsConfig.emitOtelMetrics()(existing config, default: disabled). No new config flags introduced.Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed. No new synchronization needed — all state is final or uses existing thread-safe patterns.ConcurrentHashMap,CopyOnWriteArrayList). No new collections — existing patterns preserved.How was this PR tested?
KafkaConsumerServiceStatsOtelTest(22 test methods) covering:InMemoryMetricReaderMetricsRepositoryKafkaConsumerServiceOtelMetricEntityTest,KafkaConsumerServiceTehutiMetricNameTest, andVeniceConsumerPoolActionTest(enum validation fixtures).ServerMetricEntityTestcount updated (87 → 99).Does this PR introduce any user-facing or breaking changes?