Skip to content

Commit

Permalink
chore: metrics for kafka high watermark and current offset -> lag size (
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed Jul 5, 2023
1 parent b0d6e35 commit 6933c98
Show file tree
Hide file tree
Showing 23 changed files with 237 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,20 @@ def section_streaming(panels):
)
]
),
panels.timeseries_count(
"Kafka Consumer Lag Size",
"Kafka Consumer Lag Size by source_id, partition and actor_id",
[
panels.target(
f"{metric('high_watermark')}",
"source={{source_id}} partition={{partition}}"
),
panels.target(
f"{metric('latest_message_id')}",
"source={{source_id}} partition={{partition}} actor_id={{actor_id}}"
)
]
),
panels.timeseries_rowsps(
"Sink Throughput(rows/s)",
"The figure shows the number of rows output by each sink executor actor per second.",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ macro_rules! impl_split_enumerator {
($({ $variant_name:ident, $split_enumerator_name:ident} ),*) => {
impl SplitEnumeratorImpl {

pub async fn create(properties: ConnectorProperties) -> Result<Self> {
pub async fn create(properties: ConnectorProperties, context: SourceEnumeratorContextRef) -> Result<Self> {
match properties {
$( ConnectorProperties::$variant_name(props) => $split_enumerator_name::new(*props).await.map(Self::$variant_name), )*
$( ConnectorProperties::$variant_name(props) => $split_enumerator_name::new(*props, context).await.map(Self::$variant_name), )*
other => Err(anyhow!(
"split enumerator type for config {:?} is not supported",
other
Expand Down
16 changes: 15 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::source::kinesis::enumerator::client::KinesisSplitEnumerator;
use crate::source::kinesis::source::reader::KinesisSplitReader;
use crate::source::kinesis::split::KinesisSplit;
use crate::source::kinesis::{KinesisProperties, KINESIS_CONNECTOR};
use crate::source::monitor::EnumeratorMetrics;
use crate::source::nexmark::source::reader::NexmarkSplitReader;
use crate::source::nexmark::{
NexmarkProperties, NexmarkSplit, NexmarkSplitEnumerator, NEXMARK_CONNECTOR,
Expand All @@ -77,11 +78,13 @@ pub trait SplitEnumerator: Sized {
type Split: SplitMetaData + Send + Sync;
type Properties;

async fn new(properties: Self::Properties) -> Result<Self>;
async fn new(properties: Self::Properties, context: SourceEnumeratorContextRef)
-> Result<Self>;
async fn list_splits(&mut self) -> Result<Vec<Self::Split>>;
}

pub type SourceContextRef = Arc<SourceContext>;
pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;

/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;
Expand All @@ -101,6 +104,17 @@ impl Default for SourceCtrlOpts {
}
}

#[derive(Debug, Default)]
pub struct SourceEnumeratorContext {
pub info: SourceEnumeratorInfo,
pub metrics: Arc<EnumeratorMetrics>,
}

#[derive(Clone, Copy, Debug, Default)]
pub struct SourceEnumeratorInfo {
pub source_id: u32,
}

#[derive(Debug, Default)]
pub struct SourceContext {
pub source_info: SourceInfo,
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_rpc_client::ConnectorClient;
use crate::source::cdc::{
CdcProperties, CdcSplitBase, DebeziumCdcSplit, MySqlCdcSplit, PostgresCdcSplit,
};
use crate::source::SplitEnumerator;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

pub const DATABASE_SERVERS_KEY: &str = "database.servers";

Expand All @@ -41,7 +41,10 @@ impl SplitEnumerator for DebeziumSplitEnumerator {
type Properties = CdcProperties;
type Split = DebeziumCdcSplit;

async fn new(props: CdcProperties) -> anyhow::Result<DebeziumSplitEnumerator> {
async fn new(
props: CdcProperties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<DebeziumSplitEnumerator> {
tracing::debug!("start validate cdc properties");
let connector_client = ConnectorClient::new(
HostAddr::from_str(&props.connector_node_addr)
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/datagen/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use async_trait::async_trait;

use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::SplitEnumerator;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct DatagenSplitEnumerator {
Expand All @@ -27,7 +27,10 @@ impl SplitEnumerator for DatagenSplitEnumerator {
type Properties = DatagenProperties;
type Split = DatagenSplit;

async fn new(properties: DatagenProperties) -> anyhow::Result<DatagenSplitEnumerator> {
async fn new(
properties: DatagenProperties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<DatagenSplitEnumerator> {
let split_num = properties.split_num.unwrap_or_else(|| "1".to_string());
let split_num = split_num.parse::<i32>()?;
Ok(Self { split_num })
Expand Down
13 changes: 10 additions & 3 deletions src/connector/src/source/filesystem/s3/enumerator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::aws_auth::AwsAuthProps;
use crate::aws_utils::{default_conn_config, s3_client};
use crate::source::filesystem::file_common::FsSplit;
use crate::source::filesystem::s3::S3Properties;
use crate::source::SplitEnumerator;
use crate::source::{SourceEnumeratorContextRef, SplitEnumerator};

/// Get the prefix from a glob
fn get_prefix(glob: &str) -> String {
Expand Down Expand Up @@ -70,7 +70,10 @@ impl SplitEnumerator for S3SplitEnumerator {
type Properties = S3Properties;
type Split = FsSplit;

async fn new(properties: Self::Properties) -> anyhow::Result<Self> {
async fn new(
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<Self> {
let config = AwsAuthProps::from(&properties);
let sdk_config = config.build_config().await?;
let s3_client = s3_client(&sdk_config, Some(default_conn_config()));
Expand Down Expand Up @@ -144,6 +147,7 @@ mod tests {
}

use super::*;
use crate::source::SourceEnumeratorContext;
#[tokio::test]
#[ignore]
async fn test_s3_split_enumerator() {
Expand All @@ -155,7 +159,10 @@ mod tests {
secret: None,
endpoint_url: None,
};
let mut enumerator = S3SplitEnumerator::new(props.clone()).await.unwrap();
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
let names = splits.into_iter().map(|split| split.name).collect_vec();
assert_eq!(names.len(), 2);
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/filesystem/s3/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
use super::*;
use crate::parser::{CommonParserConfig, CsvParserConfig, SpecificParserConfig};
use crate::source::filesystem::{S3Properties, S3SplitEnumerator};
use crate::source::{SourceColumnDesc, SplitEnumerator};
use crate::source::{SourceColumnDesc, SourceEnumeratorContext, SplitEnumerator};

#[tokio::test]
#[ignore]
Expand All @@ -245,7 +245,10 @@ mod tests {
secret: None,
endpoint_url: None,
};
let mut enumerator = S3SplitEnumerator::new(props.clone()).await.unwrap();
let mut enumerator =
S3SplitEnumerator::new(props.clone(), SourceEnumeratorContext::default().into())
.await
.unwrap();
let splits = enumerator.list_splits().await.unwrap();
println!("splits {:?}", splits);

Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/google_pubsub/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use google_cloud_pubsub::subscription::{SeekTo, SubscriptionConfig};
use crate::source::base::SplitEnumerator;
use crate::source::google_pubsub::split::PubsubSplit;
use crate::source::google_pubsub::PubsubProperties;
use crate::source::SourceEnumeratorContextRef;

pub struct PubsubSplitEnumerator {
subscription: String,
Expand All @@ -32,7 +33,10 @@ impl SplitEnumerator for PubsubSplitEnumerator {
type Properties = PubsubProperties;
type Split = PubsubSplit;

async fn new(properties: Self::Properties) -> anyhow::Result<PubsubSplitEnumerator> {
async fn new(
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<PubsubSplitEnumerator> {
let split_count = properties.split_count;
let subscription = properties.subscription.to_owned();

Expand Down
59 changes: 44 additions & 15 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use rdkafka::{Offset, TopicPartitionList};
use crate::source::base::SplitEnumerator;
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL};
use crate::source::SourceEnumeratorContextRef;

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Expand All @@ -34,6 +35,7 @@ pub enum KafkaEnumeratorOffset {
}

pub struct KafkaSplitEnumerator {
context: SourceEnumeratorContextRef,
broker_address: String,
topic: String,
client: BaseConsumer<PrivateLinkConsumerContext>,
Expand All @@ -52,7 +54,10 @@ impl SplitEnumerator for KafkaSplitEnumerator {
type Properties = KafkaProperties;
type Split = KafkaSplit;

async fn new(properties: KafkaProperties) -> anyhow::Result<KafkaSplitEnumerator> {
async fn new(
properties: KafkaProperties,
context: SourceEnumeratorContextRef,
) -> anyhow::Result<KafkaSplitEnumerator> {
let mut config = rdkafka::ClientConfig::new();
let common_props = &properties.common;

Expand Down Expand Up @@ -88,6 +93,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
config.create_with_context(client_ctx).await?;

Ok(Self {
context,
broker_address,
topic,
client,
Expand All @@ -104,10 +110,14 @@ impl SplitEnumerator for KafkaSplitEnumerator {
self.broker_address, e
))
})?;
let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
let mut start_offsets = self
.fetch_start_offset(topic_partitions.as_ref(), &watermarks)
.await?;

let mut start_offsets = self.fetch_start_offset(topic_partitions.as_ref()).await?;

let mut stop_offsets = self.fetch_stop_offset(topic_partitions.as_ref()).await?;
let mut stop_offsets = self
.fetch_stop_offset(topic_partitions.as_ref(), &watermarks)
.await?;

let ret = topic_partitions
.into_iter()
Expand All @@ -124,6 +134,19 @@ impl SplitEnumerator for KafkaSplitEnumerator {
}

impl KafkaSplitEnumerator {
async fn get_watermarks(&self, partitions: &[i32]) -> KafkaResult<HashMap<i32, (i64, i64)>> {
let mut map = HashMap::new();
for partition in partitions {
let (low, high) = self
.client
.fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
.await?;
self.report_high_watermark(*partition, high);
map.insert(*partition, (low, high));
}
Ok(map)
}

pub async fn list_splits_batch(
&mut self,
expect_start_timestamp_millis: Option<i64>,
Expand All @@ -149,7 +172,6 @@ impl KafkaSplitEnumerator {
None
};

// println!("Start offset: {:?}", expect_start_offset);
let mut expect_stop_offset = if let Some(ts) = expect_stop_timestamp_millis {
Some(
self.fetch_offset_for_time(topic_partitions.as_ref(), ts)
Expand All @@ -158,7 +180,6 @@ impl KafkaSplitEnumerator {
} else {
None
};
// println!("Stop offset: {:?}", expect_stop_offset);

// Watermark here has nothing to do with watermark in streaming processing. Watermark
// here means smallest/largest offset available for reading.
Expand Down Expand Up @@ -216,17 +237,15 @@ impl KafkaSplitEnumerator {
async fn fetch_stop_offset(
&self,
partitions: &[i32],
watermarks: &HashMap<i32, (i64, i64)>,
) -> KafkaResult<HashMap<i32, Option<i64>>> {
match self.stop_offset {
KafkaEnumeratorOffset::Earliest => unreachable!(),
KafkaEnumeratorOffset::Latest => {
let mut map = HashMap::new();
for partition in partitions {
let (_, high_watermark) = self
.client
.fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
.await?;
map.insert(*partition, Some(high_watermark));
let (_, high_watermark) = watermarks.get(partition).unwrap();
map.insert(*partition, Some(*high_watermark));
}
Ok(map)
}
Expand All @@ -243,15 +262,13 @@ impl KafkaSplitEnumerator {
async fn fetch_start_offset(
&self,
partitions: &[i32],
watermarks: &HashMap<i32, (i64, i64)>,
) -> KafkaResult<HashMap<i32, Option<i64>>> {
match self.start_offset {
KafkaEnumeratorOffset::Earliest | KafkaEnumeratorOffset::Latest => {
let mut map = HashMap::new();
for partition in partitions {
let (low_watermark, high_watermark) = self
.client
.fetch_watermarks(self.topic.as_str(), *partition, self.sync_call_timeout)
.await?;
let (low_watermark, high_watermark) = watermarks.get(partition).unwrap();
let offset = match self.start_offset {
KafkaEnumeratorOffset::Earliest => low_watermark - 1,
KafkaEnumeratorOffset::Latest => high_watermark - 1,
Expand Down Expand Up @@ -311,6 +328,18 @@ impl KafkaSplitEnumerator {
Ok(result)
}

#[inline]
fn report_high_watermark(&self, partition: i32, offset: i64) {
self.context
.metrics
.high_watermark
.with_label_values(&[
&self.context.info.source_id.to_string(),
&partition.to_string(),
])
.set(offset);
}

async fn fetch_topic_partition(&self) -> anyhow::Result<Vec<i32>> {
// for now, we only support one topic
let metadata = self
Expand Down
6 changes: 2 additions & 4 deletions src/connector/src/source/kafka/source/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct KafkaMeta {
}

impl SourceMessage {
pub fn from_kafka_message_upsert(message: BorrowedMessage<'_>) -> Self {
pub fn from_kafka_message_upsert(message: &BorrowedMessage<'_>) -> Self {
let encoded = bincode::serialize(&UpsertMessage {
primary_key: message.key().unwrap_or_default().into(),
record: message.payload().unwrap_or_default().into(),
Expand All @@ -42,10 +42,8 @@ impl SourceMessage {
}),
}
}
}

impl<'a> From<BorrowedMessage<'a>> for SourceMessage {
fn from(message: BorrowedMessage<'a>) -> Self {
pub fn from_kafka_message(message: &BorrowedMessage<'_>) -> Self {
SourceMessage {
// TODO(TaoWu): Possible performance improvement: avoid memory copying here.
payload: message.payload().map(|p| p.to_vec()),
Expand Down

0 comments on commit 6933c98

Please sign in to comment.