From 52a39fd2b550ea9a63c35c2a22224b8dbc5733e0 Mon Sep 17 00:00:00 2001 From: jon-chuang <9093549+jon-chuang@users.noreply.github.com> Date: Thu, 23 Feb 2023 16:09:34 +0800 Subject: [PATCH] feat(stream): `ErrorSuppressor` for user compute errors (#8132) `ErrorSuppressor` for user compute errors Approved-By: fuyufjh Co-Authored-By: jon-chuang Co-Authored-By: jon-chuang <9093549+jon-chuang@users.noreply.github.com> --- src/common/src/config.rs | 8 ++++++ src/common/src/error.rs | 40 +++++++++++++++++++++++++++ src/stream/src/executor/actor.rs | 37 ++++++++++++------------- src/stream/src/task/stream_manager.rs | 1 + 4 files changed, 66 insertions(+), 20 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 8afb1dc8f082..90123abc0974 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -252,6 +252,10 @@ pub struct StreamingConfig { #[serde(default)] pub developer: DeveloperConfig, + + /// Max unique user stream errors per actor + #[serde(default = "default::streaming::unique_user_stream_errors")] + pub unique_user_stream_errors: usize, } impl Default for StreamingConfig { @@ -636,6 +640,10 @@ mod default { pub fn async_stack_trace() -> AsyncStackTraceOption { AsyncStackTraceOption::On } + + pub fn unique_user_stream_errors() -> usize { + 10 + } } pub mod file_cache { diff --git a/src/common/src/error.rs b/src/common/src/error.rs index afb768e7ed25..df2508143137 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -13,9 +13,11 @@ // limitations under the License. use std::backtrace::Backtrace; +use std::collections::HashSet; use std::convert::Infallible; use std::fmt::{Debug, Display, Formatter}; use std::io::Error as IoError; +use std::time::{Duration, SystemTime}; use memcomparable::Error as MemComparableError; use risingwave_pb::ProstFieldNotFound; @@ -29,6 +31,8 @@ use crate::util::value_encoding::error::ValueEncodingError; /// Header used to store serialized [`RwError`] in grpc status. pub const RW_ERROR_GRPC_HEADER: &str = "risingwave-error-bin"; +const ERROR_SUPPRESSOR_RESET_DURATION: Duration = Duration::from_millis(60 * 60 * 1000); // 1h + pub trait Error = std::error::Error + Send + Sync + 'static; pub type BoxedError = Box; @@ -422,6 +426,42 @@ macro_rules! bail { }; } +#[derive(Debug)] +pub struct ErrorSuppressor { + max_unique: usize, + unique: HashSet, + last_reset_time: SystemTime, +} + +impl ErrorSuppressor { + pub fn new(max_unique: usize) -> Self { + Self { + max_unique, + last_reset_time: SystemTime::now(), + unique: Default::default(), + } + } + + pub fn suppress_error(&mut self, error: &str) -> bool { + self.try_reset(); + if self.unique.contains(error) { + false + } else if self.unique.len() < self.max_unique { + self.unique.insert(error.to_string()); + false + } else { + // We have exceeded the capacity. + true + } + } + + fn try_reset(&mut self) { + if self.last_reset_time.elapsed().unwrap() >= ERROR_SUPPRESSOR_RESET_DURATION { + *self = Self::new(self.max_unique) + } + } +} + #[cfg(test)] mod tests { use std::convert::Into; diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index 6c4a941dfa35..d11459369bd0 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -22,6 +21,7 @@ use futures::pin_mut; use hytra::TrAdder; use minitrace::prelude::*; use parking_lot::Mutex; +use risingwave_common::error::ErrorSuppressor; use risingwave_common::util::epoch::EpochPair; use risingwave_expr::ExprError; use tokio_stream::StreamExt; @@ -37,13 +37,11 @@ pub struct ActorContext { pub id: ActorId, pub fragment_id: u32, - // TODO: report errors and prompt the user. - pub errors: Mutex>>, - last_mem_val: Arc, cur_mem_val: Arc, total_mem_val: Arc>, streaming_metrics: Arc, + pub error_suppressor: Arc>, } pub type ActorContextRef = Arc; @@ -53,11 +51,11 @@ impl ActorContext { Arc::new(Self { id, fragment_id: 0, - errors: Default::default(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val: Arc::new(TrAdder::new()), streaming_metrics: Arc::new(StreamingMetrics::unused()), + error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(10))), }) } @@ -66,35 +64,34 @@ impl ActorContext { fragment_id: u32, total_mem_val: Arc>, streaming_metrics: Arc, + unique_user_errors: usize, ) -> ActorContextRef { Arc::new(Self { id, fragment_id, - errors: Default::default(), cur_mem_val: Arc::new(0.into()), last_mem_val: Arc::new(0.into()), total_mem_val, streaming_metrics, + error_suppressor: Arc::new(Mutex::new(ErrorSuppressor::new(unique_user_errors))), }) } pub fn on_compute_error(&self, err: ExprError, identity: &str) { tracing::error!("Compute error: {}, executor: {identity}", err); let executor_name = identity.split(' ').next().unwrap_or("name_not_found"); - self.streaming_metrics - .user_compute_error_count - .with_label_values(&[ - "ExprError", - &err.to_string(), - executor_name, - &self.fragment_id.to_string(), - ]) - .inc(); - self.errors - .lock() - .entry(identity.to_owned()) - .or_default() - .push(err); + let err_str = err.to_string(); + if !self.error_suppressor.lock().suppress_error(&err_str) { + self.streaming_metrics + .user_compute_error_count + .with_label_values(&[ + "ExprError", + &err_str, + executor_name, + &self.fragment_id.to_string(), + ]) + .inc(); + } } pub fn store_mem_usage(&self, val: usize) { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index d3e5ffd11e0b..54e94f832c42 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -607,6 +607,7 @@ impl LocalStreamManagerCore { actor.fragment_id, self.total_mem_val.clone(), self.streaming_metrics.clone(), + self.config.unique_user_stream_errors, ); let vnode_bitmap = actor .vnode_bitmap