Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): source_error_count reporting to prometheus #7877

Merged
merged 20 commits into from Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions grafana/risingwave-dashboard.dashboard.py
Expand Up @@ -1461,18 +1461,28 @@ def section_streaming_errors(outer_panels):
panels = outer_panels.sub_panel()
return [
outer_panels.row_collapsed(
"Streaming Errors",
"User Streaming Errors",
[
panels.timeseries_count(
"User Errors by Type",
"Compute Errors by Type",
"",
[
panels.target(
f"sum({metric('user_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
f"sum({metric('user_compute_error_count')}) by (error_type, error_msg, fragment_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: fragment_id={{fragment_id}})",
),
],
),
panels.timeseries_count(
"Source Errors by Type",
"",
[
panels.target(
f"sum({metric('user_source_error_count')}) by (error_type, error_msg, fragment_id, table_id, executor_name)",
"{{error_type}}: {{error_msg}} ({{executor_name}}: table_id={{table_id}}, fragment_id={{fragment_id}})",
),
],
),
],
),
]
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/batch/src/executor/source.rs
Expand Up @@ -161,7 +161,7 @@ impl SourceExecutor {
Some(vec![self.split]),
self.column_ids,
self.metrics,
SourceInfo::new(u32::MAX, self.source_id),
SourceInfo::new(u32::MAX, self.source_id, u32::MAX),
)
.await?;

Expand Down
8 changes: 7 additions & 1 deletion src/connector/src/macros.rs
Expand Up @@ -182,6 +182,7 @@ macro_rules! impl_common_parser_logic {
if let Err(e) = self.parse_inner(content.as_ref(), builder.row_writer())
.await
{
self.error_ctx.report_stream_source_error(&e);
tracing::warn!("message parsing failed {}, skipping", e.to_string());
continue;
}
Expand Down Expand Up @@ -241,6 +242,11 @@ macro_rules! impl_common_split_reader_logic {
let source_id = self.source_info.source_id.to_string();
let split_id = self.split_id.clone();
let metrics = self.metrics.clone();
let error_ctx = $crate::source::base::SourceErrorContext::new(
self.source_info.source_id.table_id,
self.source_info.fragment_id,
self.metrics.clone()
);

let data_stream = self.into_data_stream();

Expand All @@ -265,7 +271,7 @@ macro_rules! impl_common_split_reader_logic {
})
.boxed();
let parser =
$crate::parser::ByteStreamSourceParserImpl::create(parser_config)?;
$crate::parser::ByteStreamSourceParserImpl::create(parser_config, error_ctx)?;
#[for_await]
for msg_batch in parser.into_stream(data_stream) {
yield msg_batch?;
Expand Down
14 changes: 10 additions & 4 deletions src/connector/src/parser/avro/parser.rs
Expand Up @@ -31,7 +31,7 @@ use crate::parser::avro::util::avro_field_to_column_desc;
use crate::parser::schema_registry::{extract_schema_id, Client};
use crate::parser::util::get_kafka_topic;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceErrorContext};

impl_common_parser_logic!(AvroParser);

Expand All @@ -40,6 +40,7 @@ pub struct AvroParser {
schema: Arc<Schema>,
schema_resolver: Option<Arc<ConfluentSchemaResolver>>,
rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -104,7 +105,11 @@ impl AvroParserConfig {

// confluent_wire_format, kafka only, subject-name: "${topic-name}-value"
impl AvroParser {
pub fn new(rw_columns: Vec<SourceColumnDesc>, config: AvroParserConfig) -> Result<Self> {
pub fn new(
rw_columns: Vec<SourceColumnDesc>,
config: AvroParserConfig,
error_ctx: SourceErrorContext,
) -> Result<Self> {
let AvroParserConfig {
schema,
schema_resolver,
Expand All @@ -113,6 +118,7 @@ impl AvroParser {
schema,
schema_resolver,
rw_columns,
error_ctx,
})
}

Expand Down Expand Up @@ -196,7 +202,7 @@ mod test {
};
use crate::parser::avro::util::unix_epoch_days;
use crate::parser::SourceStreamChunkBuilder;
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceErrorContext};

fn test_data_path(file_name: &str) -> String {
let curr_dir = env::current_dir().unwrap().into_os_string();
Expand Down Expand Up @@ -254,7 +260,7 @@ mod test {

async fn new_avro_parser_from_local(file_name: &str) -> error::Result<AvroParser> {
let conf = new_avro_conf_from_local(file_name).await?;
AvroParser::new(Vec::default(), conf)
AvroParser::new(Vec::default(), conf, SourceErrorContext::for_test())
}

#[tokio::test]
Expand Down
14 changes: 9 additions & 5 deletions src/connector/src/parser/canal/simd_json_parser.rs
Expand Up @@ -28,7 +28,7 @@ use simd_json::{BorrowedValue, StaticNode, ValueAccess};
use crate::parser::canal::operators::*;
use crate::parser::util::at_least_one_ok;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceErrorContext};
use crate::{ensure_rust_type, ensure_str, impl_common_parser_logic};

const AFTER: &str = "data";
Expand All @@ -40,11 +40,15 @@ impl_common_parser_logic!(CanalJsonParser);
#[derive(Debug)]
pub struct CanalJsonParser {
pub(crate) rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
}

impl CanalJsonParser {
pub fn new(rw_columns: Vec<SourceColumnDesc>) -> Result<Self> {
Ok(Self { rw_columns })
pub fn new(rw_columns: Vec<SourceColumnDesc>, error_ctx: SourceErrorContext) -> Result<Self> {
Ok(Self {
rw_columns,
error_ctx,
})
}

#[allow(clippy::unused_async)]
Expand Down Expand Up @@ -255,7 +259,7 @@ mod tests {
SourceColumnDesc::simple("win_rate", DataType::Float64, 5.into()),
];

let parser = CanalJsonParser::new(descs.clone()).unwrap();
let parser = CanalJsonParser::new(descs.clone(), SourceErrorContext::for_test()).unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

Expand Down Expand Up @@ -332,7 +336,7 @@ mod tests {
SourceColumnDesc::simple("v2", DataType::Int32, 1.into()),
];

let parser = CanalJsonParser::new(descs.clone()).unwrap();
let parser = CanalJsonParser::new(descs.clone(), SourceErrorContext::for_test()).unwrap();

let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);

Expand Down
14 changes: 10 additions & 4 deletions src/connector/src/parser/csv_parser.rs
Expand Up @@ -28,7 +28,7 @@ use crate::parser::{
BoxSourceWithStateStream, ByteStreamSourceParser, SourceColumnDesc, SourceStreamChunkBuilder,
SourceStreamChunkRowWriter, StreamChunkWithState, WriteGuard,
};
use crate::source::{BoxSourceStream, SplitId};
use crate::source::{BoxSourceStream, SourceErrorContext, SplitId};

macro_rules! to_rust_type {
($v:ident, $t:ty) => {
Expand All @@ -53,10 +53,15 @@ pub struct CsvParser {
output_cursor: usize,
ends: Vec<usize>,
ends_cursor: usize,
error_ctx: SourceErrorContext,
}

impl CsvParser {
pub fn new(rw_columns: Vec<SourceColumnDesc>, parser_config: CsvParserConfig) -> Result<Self> {
pub fn new(
rw_columns: Vec<SourceColumnDesc>,
parser_config: CsvParserConfig,
error_ctx: SourceErrorContext,
) -> Result<Self> {
let CsvParserConfig {
delimiter,
has_header,
Expand All @@ -70,6 +75,7 @@ impl CsvParser {
output_cursor: 0,
ends: vec![0],
ends_cursor: 1,
error_ctx,
})
}

Expand Down Expand Up @@ -316,7 +322,7 @@ mod tests {
delimiter: b',',
has_header: true,
};
let parser = CsvParser::new(descs, config).unwrap();
let parser = CsvParser::new(descs, config, SourceErrorContext::for_test()).unwrap();
let data = b"
name,age
pite,20
Expand All @@ -341,7 +347,7 @@ alex,10";
delimiter: b',',
has_header: true,
};
let parser = CsvParser::new(descs, config).unwrap();
let parser = CsvParser::new(descs, config, SourceErrorContext::for_test()).unwrap();
let data = b"
name,age
pite,20
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/parser/debezium/avro_parser.rs
Expand Up @@ -35,7 +35,7 @@ use crate::parser::schema_registry::{extract_schema_id, Client};
use crate::parser::schema_resolver::ConfluentSchemaResolver;
use crate::parser::util::get_kafka_topic;
use crate::parser::{SourceStreamChunkRowWriter, WriteGuard};
use crate::source::SourceColumnDesc;
use crate::source::{SourceColumnDesc, SourceErrorContext};

const BEFORE: &str = "before";
const AFTER: &str = "after";
Expand All @@ -51,6 +51,7 @@ pub struct DebeziumAvroParser {
inner_schema: Arc<Schema>,
schema_resolver: Arc<ConfluentSchemaResolver>,
rw_columns: Vec<SourceColumnDesc>,
error_ctx: SourceErrorContext,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -167,6 +168,7 @@ impl DebeziumAvroParser {
pub fn new(
rw_columns: Vec<SourceColumnDesc>,
config: DebeziumAvroParserConfig,
error_ctx: SourceErrorContext,
) -> Result<Self> {
let DebeziumAvroParserConfig {
outer_schema,
Expand All @@ -179,6 +181,7 @@ impl DebeziumAvroParser {
inner_schema,
schema_resolver,
rw_columns,
error_ctx,
})
}

Expand Down Expand Up @@ -435,7 +438,8 @@ mod tests {
.map(|c| SourceColumnDesc::from(&c))
.collect_vec();

let parser = DebeziumAvroParser::new(columns.clone(), config)?;
let parser =
DebeziumAvroParser::new(columns.clone(), config, SourceErrorContext::for_test())?;
let [(op, row)]: [_; 1] = parse_one(parser, columns, DEBEZIUM_AVRO_DATA)
.await
.try_into()
Expand Down