diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index c0f09342ecc9..ccb3096e4304 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -270,7 +270,7 @@ macro_rules! impl_common_split_reader_logic { let data_stream = self.into_data_stream(); let data_stream = data_stream - .map_ok(move |data_batch| { + .inspect_ok(move |data_batch| { metrics .partition_input_count .with_label_values(&[&actor_id, &source_id, &split_id]) @@ -286,7 +286,6 @@ macro_rules! impl_common_split_reader_logic { .partition_input_bytes .with_label_values(&[&actor_id, &source_id, &split_id]) .inc_by(sum_bytes); - data_batch }) .boxed(); let parser = diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 0cd54491fced..a70877164df6 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -151,8 +151,22 @@ impl SplitReader for DatagenSplitReader { // spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE).boxed() match self.parser_config.specific { SpecificParserConfig::Native => { - spawn_data_generation_stream(self.generator.into_native_stream(), BUFFER_SIZE) - .boxed() + let actor_id = self.source_ctx.source_info.actor_id.to_string(); + let source_id = self.source_ctx.source_info.source_id.to_string(); + let split_id = self.split_id.to_string(); + let metrics = self.source_ctx.metrics.clone(); + spawn_data_generation_stream( + self.generator + .into_native_stream() + .inspect_ok(move |chunk_with_states| { + metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(chunk_with_states.chunk.cardinality() as u64); + }), + BUFFER_SIZE, + ) + .boxed() } _ => self.into_chunk_stream(), } diff --git a/src/connector/src/source/nexmark/source/reader.rs b/src/connector/src/source/nexmark/source/reader.rs index 06c252b2eeda..df081acabb83 100644 --- a/src/connector/src/source/nexmark/source/reader.rs +++ b/src/connector/src/source/nexmark/source/reader.rs @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; use maplit::hashmap; use nexmark::config::NexmarkConfig; @@ -105,15 +105,30 @@ impl SplitReader for NexmarkSplitReader { } fn into_stream(self) -> BoxSourceWithStateStream { + let actor_id = self.source_ctx.source_info.actor_id.to_string(); + let source_id = self.source_ctx.source_info.source_id.to_string(); + let split_id = self.split_id.clone(); + let metrics = self.source_ctx.metrics.clone(); + // Will buffer at most 4 event chunks. const BUFFER_SIZE: usize = 4; - spawn_data_generation_stream(self.into_chunk_stream(), BUFFER_SIZE).boxed() + spawn_data_generation_stream( + self.into_native_stream() + .inspect_ok(move |chunk_with_states| { + metrics + .partition_input_count + .with_label_values(&[&actor_id, &source_id, &split_id]) + .inc_by(chunk_with_states.chunk.cardinality() as u64); + }), + BUFFER_SIZE, + ) + .boxed() } } impl NexmarkSplitReader { #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] - async fn into_chunk_stream(mut self) { + async fn into_native_stream(mut self) { let start_time = Instant::now(); let start_offset = self.generator.global_offset(); let start_ts = self.generator.timestamp();