diff --git a/src/common/src/cache.rs b/src/common/src/cache.rs index bcdb8f84eac2..52a45f0c959a 100644 --- a/src/common/src/cache.rs +++ b/src/common/src/cache.rs @@ -757,7 +757,7 @@ impl LruCache { shard.release(handle) }; // do not deallocate data with holding mutex. - if let Some((key,value)) = data && let Some(listener) = &self.listener { + if let Some((key, value)) = data && let Some(listener) = &self.listener { listener.on_release(key, value); } } @@ -819,7 +819,7 @@ impl LruCache { shard.erase(hash, key) }; // do not deallocate data with holding mutex. - if let Some((key,value)) = data && let Some(listener) = &self.listener { + if let Some((key, value)) = data && let Some(listener) = &self.listener { listener.on_release(key, value); } } diff --git a/src/common/src/hash/consistent_hash/vnode.rs b/src/common/src/hash/consistent_hash/vnode.rs index 79f62f922111..f2cbb144dd16 100644 --- a/src/common/src/hash/consistent_hash/vnode.rs +++ b/src/common/src/hash/consistent_hash/vnode.rs @@ -113,11 +113,12 @@ impl VirtualNode { // be the one that contains RowId, and use a special method to skip the calculation of Hash // and directly extract the `VirtualNode` from `RowId`. pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec { - if let Ok(idx) = keys.iter().exactly_one() && - let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref() { - - return serial_array.iter() - .map(|serial|extract_vnode_id_from_row_id(serial.unwrap().as_row_id())) + if let Ok(idx) = keys.iter().exactly_one() + && let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref() + { + return serial_array + .iter() + .map(|serial| extract_vnode_id_from_row_id(serial.unwrap().as_row_id())) .collect(); } diff --git a/src/common/src/types/interval.rs b/src/common/src/types/interval.rs index 9ed45da6ccc1..2fb6fd98315e 100644 --- a/src/common/src/types/interval.rs +++ b/src/common/src/types/interval.rs @@ -1324,7 +1324,7 @@ impl Interval { fn parse_postgres(s: &str) -> Result { use DateTimeField::*; let mut tokens = parse_interval(s)?; - if tokens.len()%2!=0 && let Some(TimeStrToken::Num(_)) = tokens.last() { + if tokens.len() % 2 != 0 && let Some(TimeStrToken::Num(_)) = tokens.last() { tokens.push(TimeStrToken::TimeUnit(DateTimeField::Second)); } if tokens.len() % 2 != 0 { @@ -1356,22 +1356,30 @@ impl Interval { } })() .and_then(|rhs| result.checked_add(&rhs)) - .ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)))?; + .ok_or_else(|| { + ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)) + })?; } (TimeStrToken::Second(second), TimeStrToken::TimeUnit(interval_unit)) => { result = match interval_unit { Second => { - // If unsatisfied precision is passed as input, we should not return None (Error). - let usecs = (second.into_inner() * (USECS_PER_SEC as f64)).round_ties_even() as i64; + // If unsatisfied precision is passed as input, we should not return + // None (Error). + let usecs = (second.into_inner() * (USECS_PER_SEC as f64)) + .round_ties_even() as i64; Some(Interval::from_month_day_usec(0, 0, usecs)) } _ => None, } .and_then(|rhs| result.checked_add(&rhs)) - .ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)))?; + .ok_or_else(|| { + ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)) + })?; } _ => { - return Err(ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into()); + return Err( + ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into(), + ); } } } diff --git a/src/connector/src/parser/debezium/mongo_json_parser.rs b/src/connector/src/parser/debezium/mongo_json_parser.rs index 875549b38e65..81d43a658d7a 100644 --- a/src/connector/src/parser/debezium/mongo_json_parser.rs +++ b/src/connector/src/parser/debezium/mongo_json_parser.rs @@ -77,32 +77,30 @@ fn parse_bson_value( )), _ => Err(RwError::from(ProtocolError(format!( "Can not convert bson {:?} to {:?}", - id_field, - id_type + id_field, id_type ))))?, }, DataType::Int32 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt"){ - let int_str = obj["$numberInt"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) - } else { - Err(RwError::from(ProtocolError(format!( - "Can not convert bson {:?} to {:?}", - id_field, - id_type - ))))? + if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt") { + let int_str = obj["$numberInt"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default())) + } else { + Err(RwError::from(ProtocolError(format!( + "Can not convert bson {:?} to {:?}", + id_field, id_type + ))))? } } DataType::Int64 => { - if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong") { - let int_str = obj["$numberLong"].as_str().unwrap_or_default(); - Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) - } else { - Err(RwError::from(ProtocolError(format!( - "Can not convert bson {:?} to {:?}", - id_field, - id_type - ))))? + if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong") + { + let int_str = obj["$numberLong"].as_str().unwrap_or_default(); + Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default())) + } else { + Err(RwError::from(ProtocolError(format!( + "Can not convert bson {:?} to {:?}", + id_field, id_type + ))))? } } _ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index ab1d2326ddd8..abf787a8a898 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -107,8 +107,8 @@ impl JsonParser { let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut) .map_err(|e| RwError::from(ProtocolError(e.to_string())))?; - if let BorrowedValue::Array(ref objects) = value && matches!(op, Op::Insert) { - at_least_one_ok( + if let BorrowedValue::Array(ref objects) = value && matches!(op, Op::Insert) { + at_least_one_ok( objects .iter() .map(|obj| Self::parse_single_value(obj, &mut writer)) @@ -119,13 +119,10 @@ impl JsonParser { simd_json_parse_value( &SourceFormat::Json, &desc.data_type, - json_object_smart_get_value(&value,desc.name.as_str().into()) + json_object_smart_get_value(&value, desc.name.as_str().into()), ) .map_err(|e| { - tracing::error!( - "failed to process value: {}", - e - ); + tracing::error!("failed to process value: {}", e); e.into() }) }; diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 5dfc8f0ddeac..45c5a98339ea 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -116,10 +116,13 @@ impl SourceContext { return Err(e); } let mut err_str = e.inner().to_string(); - if let Some(suppressor) = &self.error_suppressor && - suppressor.lock().suppress_error(&err_str) + if let Some(suppressor) = &self.error_suppressor + && suppressor.lock().suppress_error(&err_str) { - err_str = format!("error msg suppressed (due to per-actor error limit: {})", suppressor.lock().max()); + err_str = format!( + "error msg suppressed (due to per-actor error limit: {})", + suppressor.lock().max() + ); } self.metrics .user_source_error_count diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index 98038f087f7b..834beda202ff 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -280,11 +280,12 @@ fn generator_from_data_type( } _ => { let kind_key = format!("fields.{}.kind", name); - if let Some(kind) = fields_option_map.get(&kind_key) && kind.as_str() == SEQUENCE_FIELD_KIND { + if let Some(kind) = fields_option_map.get(&kind_key) + && kind.as_str() == SEQUENCE_FIELD_KIND + { let start_key = format!("fields.{}.start", name); let end_key = format!("fields.{}.end", name); - let start_value = - fields_option_map.get(&start_key).map(|s| s.to_string()); + let start_value = fields_option_map.get(&start_key).map(|s| s.to_string()); let end_value = fields_option_map.get(&end_key).map(|s| s.to_string()); FieldGeneratorImpl::with_number_sequence( data_type, @@ -299,12 +300,7 @@ fn generator_from_data_type( let max_key = format!("fields.{}.max", name); let min_value = fields_option_map.get(&min_key).map(|s| s.to_string()); let max_value = fields_option_map.get(&max_key).map(|s| s.to_string()); - FieldGeneratorImpl::with_number_random( - data_type, - min_value, - max_value, - random_seed - ) + FieldGeneratorImpl::with_number_random(data_type, min_value, max_value, random_seed) } } } diff --git a/src/connector/src/source/google_pubsub/source/reader.rs b/src/connector/src/source/google_pubsub/source/reader.rs index f89719b0df7d..57059182794e 100644 --- a/src/connector/src/source/google_pubsub/source/reader.rs +++ b/src/connector/src/source/google_pubsub/source/reader.rs @@ -100,8 +100,7 @@ impl PubsubSplitReader { yield chunk; // Stop if we've approached the stop_offset - if let Some(stop_offset) = self.stop_offset - && latest_offset >= stop_offset { + if let Some(stop_offset) = self.stop_offset && latest_offset >= stop_offset { return Ok(()); } } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 4dccb7b1ec54..217162b847f1 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -152,7 +152,7 @@ impl KafkaSplitReader { #[try_stream(boxed, ok = Vec, error = anyhow::Error)] pub async fn into_data_stream(self) { if let Some(stop_offset) = self.stop_offset { - if let Some(start_offset) = self.start_offset && (start_offset+1) >= stop_offset { + if let Some(start_offset) = self.start_offset && (start_offset + 1) >= stop_offset { yield Vec::new(); return Ok(()); } else if stop_offset == 0 { diff --git a/src/expr/src/expr/build.rs b/src/expr/src/expr/build.rs index 5c2e9f94cde5..035cbdc67ca3 100644 --- a/src/expr/src/expr/build.rs +++ b/src/expr/src/expr/build.rs @@ -242,7 +242,9 @@ fn lexer(input: &str) -> Vec { _ => { let mut literal = String::new(); literal.push(c); - while let Some(&c) = chars.peek() && !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n') { + while let Some(&c) = chars.peek() + && !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n') + { literal.push(chars.next().unwrap()); } Token::Literal(literal) diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index dec52aa6f8e2..a8718a102f07 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -202,7 +202,8 @@ impl TestCaseResult { if original_test_case.planner_error.is_none() && let Some(ref err) = self.planner_error { return Err(anyhow!("unexpected planner error: {}", err)); } - if original_test_case.optimizer_error.is_none() && let Some(ref err) = self.optimizer_error { + if original_test_case.optimizer_error.is_none() && let Some(ref err) = self.optimizer_error + { return Err(anyhow!("unexpected optimizer error: {}", err)); } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 0fec6cf5073d..29d815b87905 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -493,7 +493,9 @@ impl Binder { } pub fn bind_cast_inner(&mut self, expr: Expr, data_type: DataType) -> Result { - if let Expr::Array(Array {elem: ref expr, ..}) = expr && matches!(&data_type, DataType::List{ .. } ) { + if let Expr::Array(Array { elem: ref expr, .. }) = expr + && matches!(&data_type, DataType::List { .. }) + { return self.bind_array_cast(expr.clone(), data_type); } let lhs = self.bind_expr_inner(expr)?; diff --git a/src/frontend/src/binder/expr/value.rs b/src/frontend/src/binder/expr/value.rs index 1f7cabe8bc0b..0db6e2d899ef 100644 --- a/src/frontend/src/binder/expr/value.rs +++ b/src/frontend/src/binder/expr/value.rs @@ -241,7 +241,7 @@ fn unescape_c_style(s: &str) -> Result { for _ in 0..len { if let Some(c) = chars.peek() && c.is_ascii_hexdigit() { unicode_seq.push(chars.next().unwrap()); - }else{ + } else { break; } } @@ -285,7 +285,7 @@ fn unescape_c_style(s: &str) -> Result { for _ in 0..2 { if let Some(c) = chars.peek() && matches!(*c, '0'..='7') { unicode_seq.push(chars.next().unwrap()); - }else{ + } else { break; } } diff --git a/src/frontend/src/binder/query.rs b/src/frontend/src/binder/query.rs index 9381ecdd7422..6f9235dc52b0 100644 --- a/src/frontend/src/binder/query.rs +++ b/src/frontend/src/binder/query.rs @@ -236,9 +236,17 @@ impl Binder { ) -> Result { let order_type = OrderType::from_bools(asc, nulls_first); let column_index = match expr { - Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => match *index != usize::MAX { - true => *index, - false => return Err(ErrorCode::BindError(format!("ORDER BY \"{}\" is ambiguous", name.real_value())).into()), + Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => { + match *index != usize::MAX { + true => *index, + false => { + return Err(ErrorCode::BindError(format!( + "ORDER BY \"{}\" is ambiguous", + name.real_value() + )) + .into()) + } + } } Expr::Value(Value::Number(number)) => match number.parse::() { Ok(index) if 1 <= index && index <= visible_output_num => index - 1, diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 2a7496b9706d..0ac9ae9b9c81 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -296,11 +296,11 @@ impl Binder { if let Some(from_alias) = alias { original_alias.name = from_alias.name; let mut alias_iter = from_alias.columns.into_iter(); - original_alias.columns = original_alias.columns.into_iter().map(|ident| { - alias_iter - .next() - .unwrap_or(ident) - }).collect(); + original_alias.columns = original_alias + .columns + .into_iter() + .map(|ident| alias_iter.next().unwrap_or(ident)) + .collect(); } self.bind_table_to_context( @@ -316,11 +316,18 @@ impl Binder { // Share the CTE. let input_relation = Relation::Subquery(Box::new(BoundSubquery { query })); - let share_relation = Relation::Share(Box::new(BoundShare { share_id, input: input_relation })); + let share_relation = Relation::Share(Box::new(BoundShare { + share_id, + input: input_relation, + })); Ok(share_relation) } else { - - self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_proctime) + self.bind_relation_by_name_inner( + schema_name.as_deref(), + &table_name, + alias, + for_system_time_as_of_proctime, + ) } } @@ -352,7 +359,9 @@ impl Binder { err_msg: &str, ) -> Result> { if let Some(time_col_arg) = arg - && let Some(ExprImpl::InputRef(time_col)) = self.bind_function_arg(time_col_arg)?.into_iter().next() { + && let Some(ExprImpl::InputRef(time_col)) = + self.bind_function_arg(time_col_arg)?.into_iter().next() + { Ok(time_col) } else { Err(ErrorCode::BindError(err_msg.to_string()).into()) @@ -439,7 +448,8 @@ impl Binder { .get_function_by_name_args( func_name, &args.iter().map(|arg| arg.return_type()).collect_vec(), - ) && matches!(func.kind, FunctionKind::Table { .. }) + ) + && matches!(func.kind, FunctionKind::Table { .. }) { TableFunction::new_user_defined(func.clone(), args) } else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) { diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 0b518623c5c2..0bae1dbd7fb1 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -550,7 +550,8 @@ impl ExprImpl { pub fn as_eq_cond(&self) -> Option<(InputRef, InputRef)> { if let ExprImpl::FunctionCall(function_call) = self && function_call.get_expr_type() == ExprType::Equal - && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = function_call.clone().decompose_as_binary() + && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = + function_call.clone().decompose_as_binary() { if x.index() < y.index() { Some((*x, *y)) @@ -565,7 +566,8 @@ impl ExprImpl { pub fn as_is_not_distinct_from_cond(&self) -> Option<(InputRef, InputRef)> { if let ExprImpl::FunctionCall(function_call) = self && function_call.get_expr_type() == ExprType::IsNotDistinctFrom - && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = function_call.clone().decompose_as_binary() + && let (_, ExprImpl::InputRef(x), ExprImpl::InputRef(y)) = + function_call.clone().decompose_as_binary() { if x.index() < y.index() { Some((*x, *y)) @@ -740,8 +742,9 @@ impl ExprImpl { } pub fn as_eq_const(&self) -> Option<(InputRef, ExprImpl)> { - if let ExprImpl::FunctionCall(function_call) = self && - function_call.get_expr_type() == ExprType::Equal{ + if let ExprImpl::FunctionCall(function_call) = self + && function_call.get_expr_type() == ExprType::Equal + { match function_call.clone().decompose_as_binary() { (_, ExprImpl::InputRef(x), y) if y.is_const() => Some((*x, y)), (_, x, ExprImpl::InputRef(y)) if x.is_const() => Some((*y, x)), @@ -753,8 +756,9 @@ impl ExprImpl { } pub fn as_eq_correlated_input_ref(&self) -> Option<(InputRef, CorrelatedInputRef)> { - if let ExprImpl::FunctionCall(function_call) = self && - function_call.get_expr_type() == ExprType::Equal{ + if let ExprImpl::FunctionCall(function_call) = self + && function_call.get_expr_type() == ExprType::Equal + { match function_call.clone().decompose_as_binary() { (_, ExprImpl::InputRef(x), ExprImpl::CorrelatedInputRef(y)) => Some((*x, *y)), (_, ExprImpl::CorrelatedInputRef(x), ExprImpl::InputRef(y)) => Some((*y, *x)), @@ -766,8 +770,9 @@ impl ExprImpl { } pub fn as_is_null(&self) -> Option { - if let ExprImpl::FunctionCall(function_call) = self && - function_call.get_expr_type() == ExprType::IsNull{ + if let ExprImpl::FunctionCall(function_call) = self + && function_call.get_expr_type() == ExprType::IsNull + { match function_call.clone().decompose_as_unary() { (_, ExprImpl::InputRef(x)) => Some(*x), _ => None, @@ -811,28 +816,32 @@ impl ExprImpl { } pub fn as_in_const_list(&self) -> Option<(InputRef, Vec)> { - if let ExprImpl::FunctionCall(function_call) = self && - function_call.get_expr_type() == ExprType::In { + if let ExprImpl::FunctionCall(function_call) = self + && function_call.get_expr_type() == ExprType::In + { let mut inputs = function_call.inputs().iter().cloned(); - let input_ref= match inputs.next().unwrap() { + let input_ref = match inputs.next().unwrap() { ExprImpl::InputRef(i) => *i, - _ => { return None } + _ => return None, }; - let list: Vec<_> = inputs.map(|expr|{ - // Non constant IN will be bound to OR - assert!(expr.is_const()); - expr - }).collect(); - - Some((input_ref, list)) + let list: Vec<_> = inputs + .map(|expr| { + // Non constant IN will be bound to OR + assert!(expr.is_const()); + expr + }) + .collect(); + + Some((input_ref, list)) } else { None } } pub fn as_or_disjunctions(&self) -> Option> { - if let ExprImpl::FunctionCall(function_call) = self && - function_call.get_expr_type() == ExprType::Or { + if let ExprImpl::FunctionCall(function_call) = self + && function_call.get_expr_type() == ExprType::Or + { Some(to_disjunctions(self.clone())) } else { None diff --git a/src/frontend/src/handler/extended_handle.rs b/src/frontend/src/handler/extended_handle.rs index 3675b302e8e2..563058a941e2 100644 --- a/src/frontend/src/handler/extended_handle.rs +++ b/src/frontend/src/handler/extended_handle.rs @@ -68,11 +68,10 @@ pub fn handle_parse( Statement::Query(_) | Statement::Insert { .. } | Statement::Delete { .. } - | Statement::Update { .. } => query::handle_parse(handler_args, statement, specific_param_types), - Statement::CreateView { - query, - .. - } => { + | Statement::Update { .. } => { + query::handle_parse(handler_args, statement, specific_param_types) + } + Statement::CreateView { query, .. } => { if have_parameter_in_query(query) { return Err(ErrorCode::NotImplemented( "CREATE VIEW with parameters".to_string(), @@ -82,15 +81,13 @@ pub fn handle_parse( } Ok(PrepareStatement::PureStatement(statement)) } - Statement::CreateTable { - query, - .. - } => { + Statement::CreateTable { query, .. } => { if let Some(query) = query && have_parameter_in_query(query) { Err(ErrorCode::NotImplemented( "CREATE TABLE AS SELECT with parameters".to_string(), None.into(), - ).into()) + ) + .into()) } else { Ok(PrepareStatement::PureStatement(statement)) } @@ -100,7 +97,8 @@ pub fn handle_parse( Err(ErrorCode::NotImplemented( "CREATE SINK AS SELECT with parameters".to_string(), None.into(), - ).into()) + ) + .into()) } else { Ok(PrepareStatement::PureStatement(statement)) } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 0acf25cc5756..60283d54fae9 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -252,17 +252,15 @@ impl ToBatchPb for BatchSeqScan { impl ToLocalBatch for BatchSeqScan { fn to_local(&self) -> Result { - let dist = - if self.logical.is_sys_table() { + let dist = if self.logical.is_sys_table() { Distribution::Single } else if let Some(distribution_key) = self.logical.distribution_key() - && !distribution_key.is_empty() { - Distribution::UpstreamHashShard( - distribution_key, - self.logical.table_desc().table_id, - ) + && !distribution_key.is_empty() + { + Distribution::UpstreamHashShard(distribution_key, self.logical.table_desc().table_id) } else { - // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before scan. + // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before + // scan. Distribution::SomeShard }; Ok(Self::new_inner(self.logical.clone(), dist, self.scan_ranges.clone()).into()) diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index 8fb9efaaac94..2a56063cb524 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -583,7 +583,11 @@ impl LogicalMultiJoin { let r_tree = std::mem::take(&mut merge_node.join_tree); let new_height = usize::max(l_tree.height, r_tree.height) + 1; - if let Some(min_height) = optimized_bushy_tree.as_ref().map(|(t, _)| t.join_tree.height) && min_height < new_height { + if let Some(min_height) = optimized_bushy_tree + .as_ref() + .map(|(t, _)| t.join_tree.height) + && min_height < new_height + { continue; } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 3801469693fb..df658cd3a21e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -491,7 +491,10 @@ impl ToStream for LogicalSource { self.wrap_with_optional_generated_columns_stream_proj()? }; - if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{ + if let Some(catalog) = self.source_catalog() + && !catalog.watermark_descs.is_empty() + && !self.core.for_table + { plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); } diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index 1497ab74e59a..c5950a4a00a4 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -81,8 +81,15 @@ impl PlanVisitor for CardinalityVisitor { // We don't have UNIQUE key now. So we hack here to support some complex queries on // system tables. - if let Some(scan) = input.as_logical_scan() && scan.is_sys_table() && scan.table_name() == PG_NAMESPACE_TABLE_NAME { - let nspname = scan.output_col_idx().iter().find(|i| scan.table_desc().columns[**i].name == "nspname").unwrap(); + if let Some(scan) = input.as_logical_scan() + && scan.is_sys_table() + && scan.table_name() == PG_NAMESPACE_TABLE_NAME + { + let nspname = scan + .output_col_idx() + .iter() + .find(|i| scan.table_desc().columns[**i].name == "nspname") + .unwrap(); unique_keys.push([*nspname].into_iter().collect()); } diff --git a/src/frontend/src/optimizer/plan_visitor/execution_mode_decider.rs b/src/frontend/src/optimizer/plan_visitor/execution_mode_decider.rs index 85175a3b42be..f1f3d4edfdc7 100644 --- a/src/frontend/src/optimizer/plan_visitor/execution_mode_decider.rs +++ b/src/frontend/src/optimizer/plan_visitor/execution_mode_decider.rs @@ -59,7 +59,8 @@ impl PlanVisitor for ExecutionModeDecider { fn visit_batch_limit(&mut self, batch_limit: &BatchLimit) -> bool { if let Some(batch_seq_scan) = batch_limit.input().as_batch_seq_scan() && batch_seq_scan.scan_ranges().is_empty() - && batch_limit.limit() + batch_limit.offset() < 100{ + && batch_limit.limit() + batch_limit.offset() < 100 + { true } else { self.visit(batch_limit.input()) diff --git a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs index 89baf9dc000c..4758b5907d27 100644 --- a/src/frontend/src/optimizer/rule/index_delta_join_rule.rs +++ b/src/frontend/src/optimizer/rule/index_delta_join_rule.rs @@ -105,9 +105,13 @@ impl Rule for IndexDeltaJoinRule { // Primary table is also an index. let primary_table = table_scan.logical(); if let Some(primary_table_distribution_key) = primary_table.distribution_key() - && primary_table_distribution_key == join_indices { + && primary_table_distribution_key == join_indices + { // Check join key is prefix of primary table order key - let primary_table_order_key_prefix = primary_table.table_desc().pk.iter() + let primary_table_order_key_prefix = primary_table + .table_desc() + .pk + .iter() .map(|x| x.column_index) .take(primary_table_distribution_key.len()) .collect_vec(); @@ -118,7 +122,11 @@ impl Rule for IndexDeltaJoinRule { if chain_type != table_scan.chain_type() { Some( - StreamTableScan::new_with_chain_type(table_scan.logical().clone(), chain_type).into() + StreamTableScan::new_with_chain_type( + table_scan.logical().clone(), + chain_type, + ) + .into(), ) } else { Some(table_scan.clone().into()) diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index fa62308f88de..98b58faf340f 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -392,9 +392,9 @@ impl IndexSelectionRule { let mut result = vec![]; for expr in conjunctions { // it's OR clause! - if let ExprImpl::FunctionCall(function_call) = expr && - function_call.get_expr_type() == ExprType::Or { - + if let ExprImpl::FunctionCall(function_call) = expr + && function_call.get_expr_type() == ExprType::Or + { let mut index_to_be_merged = vec![]; let disjunctions = to_disjunctions(expr.clone()); @@ -406,10 +406,16 @@ impl IndexSelectionRule { for (column_index, expr) in iter { let mut index_paths = vec![]; let conjunctions = to_conjunctions(expr); - index_paths.extend(self.gen_index_path(column_index, &conjunctions, logical_scan).into_iter()); + index_paths.extend( + self.gen_index_path(column_index, &conjunctions, logical_scan) + .into_iter(), + ); // complex condition, recursively gen paths if conjunctions.len() > 1 { - index_paths.extend(self.gen_paths(&conjunctions, logical_scan, primary_table_row_size).into_iter()); + index_paths.extend( + self.gen_paths(&conjunctions, logical_scan, primary_table_row_size) + .into_iter(), + ); } match self.choose_min_cost_path(&index_paths, primary_table_row_size) { @@ -417,8 +423,8 @@ impl IndexSelectionRule { // One arm of OR clause can't use index, bail out index_to_be_merged.clear(); break; - }, - Some((path, _)) => index_to_be_merged.push(path) + } + Some((path, _)) => index_to_be_merged.push(path), } } @@ -812,16 +818,18 @@ impl<'a> TableScanIoEstimator<'a> { // Equal for (i, expr) in conjunctions.iter().enumerate() { if let Some((input_ref, _const_expr)) = expr.as_eq_const() - && input_ref.index == column_idx { - conjunctions.remove(i); - return MatchItem::Equal; + && input_ref.index == column_idx + { + conjunctions.remove(i); + return MatchItem::Equal; } } // In for (i, expr) in conjunctions.iter().enumerate() { if let Some((input_ref, in_const_list)) = expr.as_in_const_list() - && input_ref.index == column_idx { + && input_ref.index == column_idx + { conjunctions.remove(i); return MatchItem::In(in_const_list.len()); } @@ -834,12 +842,13 @@ impl<'a> TableScanIoEstimator<'a> { while i < conjunctions.len() { let expr = &conjunctions[i]; if let Some((input_ref, op, _const_expr)) = expr.as_comparison_const() - && input_ref.index == column_idx { + && input_ref.index == column_idx + { conjunctions.remove(i); match op { ExprType::LessThan | ExprType::LessThanOrEqual => right_side_bound = true, ExprType::GreaterThan | ExprType::GreaterThanOrEqual => left_side_bound = true, - _ => unreachable!() + _ => unreachable!(), }; } else { i += 1; diff --git a/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs b/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs index e79520dda509..399ade01e53b 100644 --- a/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/join_project_transpose_rule.rs @@ -51,10 +51,22 @@ impl Rule for JoinProjectTransposeRule { let mut has_new_right: bool = false; // prepare for pull up left child. - let new_left = if let Some(project) = left.as_logical_project() && left_input_index_on_condition.iter().all(|index| project.exprs()[*index].as_input_ref().is_some()) && join_type != JoinType::RightAnti && join_type != JoinType::RightSemi && join_type != JoinType::RightOuter && join_type != JoinType::FullOuter { - let (exprs,child) = project.clone().decompose(); - - old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type().composite(&project.o2i_col_mapping())); + let new_left = if let Some(project) = left.as_logical_project() + && left_input_index_on_condition + .iter() + .all(|index| project.exprs()[*index].as_input_ref().is_some()) + && join_type != JoinType::RightAnti + && join_type != JoinType::RightSemi + && join_type != JoinType::RightOuter + && join_type != JoinType::FullOuter + { + let (exprs, child) = project.clone().decompose(); + + old_i2new_i = old_i2new_i.union( + &join + .i2l_col_mapping_ignore_join_type() + .composite(&project.o2i_col_mapping()), + ); full_proj_exprs.extend(exprs); @@ -65,29 +77,65 @@ impl Rule for JoinProjectTransposeRule { old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type()); for i in 0..left_output_len { - full_proj_exprs.push(InputRef{index:i,data_type:left.schema().data_types()[i].clone()}.into()); + full_proj_exprs.push( + InputRef { + index: i, + data_type: left.schema().data_types()[i].clone(), + } + .into(), + ); } left }; // prepare for pull up right child. - let new_right = if let Some(project) = right.as_logical_project() && right_input_index_on_condition.iter().all(|index| project.exprs()[*index].as_input_ref().is_some()) && join_type != JoinType::LeftAnti && join_type != JoinType::LeftSemi && join_type != JoinType::LeftOuter && join_type != JoinType::FullOuter { - let (exprs,child) = project.clone().decompose(); - - old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().composite(&project.o2i_col_mapping()).clone_with_offset(new_left.schema().len())); - - let mut index_writer = IndexRewriter::new(ColIndexMapping::identity(child.schema().len()).clone_with_offset(new_left.schema().len())); - full_proj_exprs.extend(exprs.into_iter().map(|expr| index_writer.rewrite_expr(expr))); + let new_right = if let Some(project) = right.as_logical_project() + && right_input_index_on_condition + .iter() + .all(|index| project.exprs()[*index].as_input_ref().is_some()) + && join_type != JoinType::LeftAnti + && join_type != JoinType::LeftSemi + && join_type != JoinType::LeftOuter + && join_type != JoinType::FullOuter + { + let (exprs, child) = project.clone().decompose(); + + old_i2new_i = old_i2new_i.union( + &join + .i2r_col_mapping_ignore_join_type() + .composite(&project.o2i_col_mapping()) + .clone_with_offset(new_left.schema().len()), + ); + + let mut index_writer = IndexRewriter::new( + ColIndexMapping::identity(child.schema().len()) + .clone_with_offset(new_left.schema().len()), + ); + full_proj_exprs.extend( + exprs + .into_iter() + .map(|expr| index_writer.rewrite_expr(expr)), + ); has_new_right = true; child } else { - old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().clone_with_offset(new_left.schema().len())); + old_i2new_i = old_i2new_i.union( + &join + .i2r_col_mapping_ignore_join_type() + .clone_with_offset(new_left.schema().len()), + ); for i in 0..right_output_len { - full_proj_exprs.push(InputRef{index:i+new_left.schema().len(),data_type:right.schema().data_types()[i].clone()}.into()); + full_proj_exprs.push( + InputRef { + index: i + new_left.schema().len(), + data_type: right.schema().data_types()[i].clone(), + } + .into(), + ); } right diff --git a/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs b/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs index d29599bb5fe1..87459f69899e 100644 --- a/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs +++ b/src/frontend/src/optimizer/rule/pull_up_hop_rule.rs @@ -51,56 +51,85 @@ impl Rule for PullUpHopRule { let mut pull_up_left = false; let mut pull_up_right = false; - let (new_left, - left_time_col, - left_window_slide, - left_window_size, - left_window_offset, - ) = if let Some(hop) = left.as_logical_hop_window() && left_input_index_on_condition.iter().all(|&index| hop.output_window_start_col_idx().map_or(true, |v|index!=v) && hop.output_window_end_col_idx().map_or(true, |v|index!=v)) && join_type != JoinType::RightAnti && join_type != JoinType::RightSemi && join_type != JoinType::RightOuter && join_type != JoinType::FullOuter { - let (input, - time_col, - window_slide, - window_size, - window_offset, - _) = hop.clone().into_parts(); - - old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type().composite(&hop.o2i_col_mapping())); - left_output_pos.iter().for_each(|&pos| { - output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); - }); - pull_up_left = true; - (input,Some(time_col),Some(window_slide),Some(window_size),Some(window_offset)) - } else { - old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type()); - - (left,None,None,None,None) - }; - - let (new_right, - right_time_col, - right_window_slide, - right_window_size, - right_window_offset - ) = if let Some(hop) = right.as_logical_hop_window() && right_input_index_on_condition.iter().all(|&index| hop.output_window_start_col_idx().map_or(true, |v|index!=v) && hop.output_window_end_col_idx().map_or(true, |v|index!=v)) && join_type != JoinType::LeftAnti && join_type != JoinType::LeftSemi && join_type != JoinType::LeftOuter && join_type != JoinType::FullOuter { - let (input, - time_col, - window_slide, - window_size, - window_offset, - _) = hop.clone().into_parts(); - - old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().composite(&hop.o2i_col_mapping()).clone_with_offset(new_left.schema().len())); - - right_output_pos.iter().for_each(|&pos| { - output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); - }); - pull_up_right = true; - (input,Some(time_col),Some(window_slide),Some(window_size),Some(window_offset)) - } else { - old_i2new_i = old_i2new_i.union(&join.i2r_col_mapping_ignore_join_type().clone_with_offset(new_left.schema().len())); + let (new_left, left_time_col, left_window_slide, left_window_size, left_window_offset) = + if let Some(hop) = left.as_logical_hop_window() + && left_input_index_on_condition.iter().all(|&index| { + hop.output_window_start_col_idx() + .map_or(true, |v| index != v) + && hop.output_window_end_col_idx().map_or(true, |v| index != v) + }) + && join_type != JoinType::RightAnti + && join_type != JoinType::RightSemi + && join_type != JoinType::RightOuter + && join_type != JoinType::FullOuter + { + let (input, time_col, window_slide, window_size, window_offset, _) = + hop.clone().into_parts(); + + old_i2new_i = old_i2new_i.union( + &join + .i2l_col_mapping_ignore_join_type() + .composite(&hop.o2i_col_mapping()), + ); + left_output_pos.iter().for_each(|&pos| { + output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); + }); + pull_up_left = true; + ( + input, + Some(time_col), + Some(window_slide), + Some(window_size), + Some(window_offset), + ) + } else { + old_i2new_i = old_i2new_i.union(&join.i2l_col_mapping_ignore_join_type()); + + (left, None, None, None, None) + }; + + let (new_right, right_time_col, right_window_slide, right_window_size, right_window_offset) = + if let Some(hop) = right.as_logical_hop_window() + && right_input_index_on_condition.iter().all(|&index| { + hop.output_window_start_col_idx() + .map_or(true, |v| index != v) + && hop.output_window_end_col_idx().map_or(true, |v| index != v) + }) + && join_type != JoinType::LeftAnti + && join_type != JoinType::LeftSemi + && join_type != JoinType::LeftOuter + && join_type != JoinType::FullOuter + { + let (input, time_col, window_slide, window_size, window_offset, _) = + hop.clone().into_parts(); + + old_i2new_i = old_i2new_i.union( + &join + .i2r_col_mapping_ignore_join_type() + .composite(&hop.o2i_col_mapping()) + .clone_with_offset(new_left.schema().len()), + ); - (right,None,None,None,None) - }; + right_output_pos.iter().for_each(|&pos| { + output_index[pos] = hop.output2internal_col_mapping().map(output_index[pos]); + }); + pull_up_right = true; + ( + input, + Some(time_col), + Some(window_slide), + Some(window_size), + Some(window_offset), + ) + } else { + old_i2new_i = old_i2new_i.union( + &join + .i2r_col_mapping_ignore_join_type() + .clone_with_offset(new_left.schema().len()), + ); + + (right, None, None, None, None) + }; if !pull_up_left && !pull_up_right { return None; diff --git a/src/frontend/src/optimizer/rule/rewrite_like_expr_rule.rs b/src/frontend/src/optimizer/rule/rewrite_like_expr_rule.rs index 79427a0e77b7..6db893a93178 100644 --- a/src/frontend/src/optimizer/rule/rewrite_like_expr_rule.rs +++ b/src/frontend/src/optimizer/rule/rewrite_like_expr_rule.rs @@ -56,7 +56,9 @@ impl ExprVisitor for HasLikeExprVisitor { fn visit_function_call(&mut self, func_call: &FunctionCall) -> bool { if func_call.get_expr_type() == ExprType::Like - && let (_, ExprImpl::InputRef(_), ExprImpl::Literal(_)) = func_call.clone().decompose_as_binary() { + && let (_, ExprImpl::InputRef(_), ExprImpl::Literal(_)) = + func_call.clone().decompose_as_binary() + { true } else { func_call diff --git a/src/frontend/src/planner/query.rs b/src/frontend/src/planner/query.rs index 782fa42127ee..1e89959f3e31 100644 --- a/src/frontend/src/planner/query.rs +++ b/src/frontend/src/planner/query.rs @@ -56,7 +56,7 @@ impl Planner { } let mut out_fields = FixedBitSet::with_capacity(plan.schema().len()); out_fields.insert_range(..plan.schema().len() - extra_order_exprs_len); - if let Some(field) = plan.schema().fields.get(0) && field.name == "projected_row_id" { + if let Some(field) = plan.schema().fields.get(0) && field.name == "projected_row_id" { // Do not output projected_row_id hidden column. out_fields.set(0, false); } diff --git a/src/frontend/src/planner/select.rs b/src/frontend/src/planner/select.rs index 6384373ade97..efda99b8d7ff 100644 --- a/src/frontend/src/planner/select.rs +++ b/src/frontend/src/planner/select.rs @@ -184,10 +184,10 @@ impl Planner { if let BoundDistinct::Distinct = distinct { let fields = root.schema().fields(); - let group_key = if let Some(field) = fields.get(0) && field.name == "projected_row_id" { + let group_key = if let Some(field) = fields.get(0) && field.name == "projected_row_id" { // Do not group by projected_row_id hidden column. (1..fields.len()).collect() - }else { + } else { (0..fields.len()).collect() }; root = Agg::new(vec![], group_key, root).into(); diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index 1c5922af2f9f..640870b4547b 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -158,11 +158,14 @@ impl QueryManager { query: Query, pinned_snapshot: PinnedHummockSnapshot, ) -> SchedulerResult { - if let Some(query_limit) = self.disrtibuted_query_limit && self.query_metrics.running_query_num.get() as u64 == query_limit { + if let Some(query_limit) = self.disrtibuted_query_limit + && self.query_metrics.running_query_num.get() as u64 == query_limit + { self.query_metrics.rejected_query_counter.inc(); - return Err( - crate::scheduler::SchedulerError::QueryReachLimit(QueryMode::Distributed, query_limit) - ) + return Err(crate::scheduler::SchedulerError::QueryReachLimit( + QueryMode::Distributed, + query_limit, + )); } let query_id = query.query_id.clone(); let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 669a07fe2acf..80d11a248aa9 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -326,14 +326,18 @@ impl StageRunner { ) -> SchedulerResult<()> { let mut futures = vec![]; - if let Some(table_scan_info) = self.stage.table_scan_info.as_ref() && let Some(vnode_bitmaps) = table_scan_info.partitions() { + if let Some(table_scan_info) = self.stage.table_scan_info.as_ref() + && let Some(vnode_bitmaps) = table_scan_info.partitions() + { // If the stage has table scan nodes, we create tasks according to the data distribution // and partition of the table. // We let each task read one partition by setting the `vnode_ranges` of the scan node in // the task. // We schedule the task to the worker node that owns the data partition. let parallel_unit_ids = vnode_bitmaps.keys().cloned().collect_vec(); - let workers = self.worker_node_manager.get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + let workers = self + .worker_node_manager + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; for (i, (parallel_unit_id, worker)) in parallel_unit_ids .into_iter() @@ -346,7 +350,8 @@ impl StageRunner { task_id: i as u32, }; let vnode_ranges = vnode_bitmaps[¶llel_unit_id].clone(); - let plan_fragment = self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); + let plan_fragment = + self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); futures.push(self.schedule_task(task_id, plan_fragment, Some(worker))); } } else if let Some(source_info) = self.stage.source_info.as_ref() { @@ -356,12 +361,13 @@ impl StageRunner { stage_id: self.stage.id, task_id: id as u32, }; - let plan_fragment = self.create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.clone()))); - let worker = self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; + let plan_fragment = self + .create_plan_fragment(id as u32, Some(PartitionInfo::Source(split.clone()))); + let worker = + self.choose_worker(&plan_fragment, id as u32, self.stage.dml_table_id)?; futures.push(self.schedule_task(task_id, plan_fragment, worker)); } - } - else { + } else { for id in 0..self.stage.parallelism.unwrap() { let task_id = TaskIdPb { query_id: self.stage.query_id.id.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 9de86ff0e293..71899d2d1c1f 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -234,13 +234,18 @@ impl LocalQueryExecution { }; assert!(sources.is_empty()); - if let Some(table_scan_info) = second_stage.table_scan_info.clone() && let Some(vnode_bitmaps) = table_scan_info.partitions() { + if let Some(table_scan_info) = second_stage.table_scan_info.clone() + && let Some(vnode_bitmaps) = table_scan_info.partitions() + { // Similar to the distributed case (StageRunner::schedule_tasks). // Set `vnode_ranges` of the scan node in `local_execute_plan` of each // `exchange_source`. let (parallel_unit_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = vnode_bitmaps.clone().into_iter().unzip(); - let workers = self.front_env.worker_node_manager().get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + let workers = self + .front_env + .worker_node_manager() + .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; for (idx, (worker_node, partition)) in (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate() @@ -276,7 +281,7 @@ impl LocalQueryExecution { sources.push(exchange_source); } } else if let Some(source_info) = &second_stage.source_info { - for (id,split) in source_info.split_info().unwrap().iter().enumerate() { + for (id, split) in source_info.split_info().unwrap().iter().enumerate() { let second_stage_plan_node = self.convert_plan_node( &second_stage.root, &mut None, @@ -309,8 +314,7 @@ impl LocalQueryExecution { }; sources.push(exchange_source); } - } - else { + } else { let second_stage_plan_node = self.convert_plan_node(&second_stage.root, &mut None, None)?; let second_stage_plan_fragment = PlanFragment { diff --git a/src/frontend/src/utils/stream_graph_formatter.rs b/src/frontend/src/utils/stream_graph_formatter.rs index 844c0578a43a..c74a2de7fb9a 100644 --- a/src/frontend/src/utils/stream_graph_formatter.rs +++ b/src/frontend/src/utils/stream_graph_formatter.rs @@ -173,7 +173,10 @@ impl StreamGraphFormatter { self.pretty_add_table(inner.get_result_table().unwrap()), )); fields.push(("state tables", self.call_states(&inner.agg_call_states))); - fields.push(("distinct tables", self.distinct_tables(node, inner.get_distinct_dedup_tables()))); + fields.push(( + "distinct tables", + self.distinct_tables(node, inner.get_distinct_dedup_tables()), + )); } stream_node::NodeBody::HashAgg(inner) => { fields.push(( @@ -181,8 +184,11 @@ impl StreamGraphFormatter { self.pretty_add_table(inner.get_result_table().unwrap()), )); fields.push(("state tables", self.call_states(&inner.agg_call_states))); - fields.push(("distinct tables", self.distinct_tables(node, inner.get_distinct_dedup_tables()))); - }, + fields.push(( + "distinct tables", + self.distinct_tables(node, inner.get_distinct_dedup_tables()), + )); + } stream_node::NodeBody::HashJoin(node) => { fields.push(( "left table", @@ -193,19 +199,13 @@ impl StreamGraphFormatter { self.pretty_add_table(node.get_right_table().unwrap()), )); if let Some(tb) = &node.left_degree_table { - fields.push(( - "left degree table", - self.pretty_add_table(tb), - )); + fields.push(("left degree table", self.pretty_add_table(tb))); } if let Some(tb) = &node.right_degree_table { - fields.push(( - "right degree table", - self.pretty_add_table(tb), - )); + fields.push(("right degree table", self.pretty_add_table(tb))); } } - stream_node::NodeBody::TopN(node) =>{ + stream_node::NodeBody::TopN(node) => { fields.push(( "state table", self.pretty_add_table(node.get_table().unwrap()), @@ -235,10 +235,7 @@ impl StreamGraphFormatter { } stream_node::NodeBody::GroupTopN(node) => { let table = self.pretty_add_table(node.get_table().unwrap()); - fields.push(( - "state table", - table, - )); + fields.push(("state table", table)); } stream_node::NodeBody::AppendOnlyGroupTopN(node) => { fields.push(( @@ -258,7 +255,7 @@ impl StreamGraphFormatter { self.pretty_add_table(node.get_state_table().unwrap()), )); } - _ => {}, + _ => {} }; if self.verbose { diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 39c6f3f2a734..e9c870a0a14d 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -107,12 +107,15 @@ impl BarrierScheduler { pub async fn try_cancel_scheduled_create(&self, table_id: TableId) -> bool { let mut queue = self.inner.queue.write().await; if let Some(idx) = queue.iter().position(|scheduled| { - if let Command::CreateStreamingJob {table_fragments, ..} = &scheduled.command - && table_fragments.table_id() == table_id { - true - } else { - false - } + if let Command::CreateStreamingJob { + table_fragments, .. + } = &scheduled.command + && table_fragments.table_id() == table_id + { + true + } else { + false + } }) { queue.remove(idx).unwrap(); true diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 13cee545eafb..17e44ce937be 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -331,7 +331,7 @@ where let stream_node = actor.nodes.as_mut().unwrap(); visit_stream_node(stream_node, |body| { if let NodeBody::Merge(m) = body - && m.upstream_fragment_id == merge_update.upstream_fragment_id + && m.upstream_fragment_id == merge_update.upstream_fragment_id { m.upstream_fragment_id = merge_update.new_upstream_fragment_id.unwrap(); m.upstream_actor_id = merge_update.added_upstream_actor_id.clone(); diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 74c0a8d09dee..99a1ea4f655e 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -525,14 +525,19 @@ pub async fn start_worker_info_monitor( .with_label_values(&[(worker_type.as_str_name())]) .set(worker_num as i64); } - if let Some(client) = &election_client && let Ok(meta_members) = client.get_members().await { + if let Some(client) = &election_client + && let Ok(meta_members) = client.get_members().await + { meta_metrics .worker_num .with_label_values(&[WorkerType::Meta.as_str_name()]) .set(meta_members.len() as i64); meta_members.into_iter().for_each(|m| { - let role = if m.is_leader {"leader"} else {"follower"}; - meta_metrics.meta_type.with_label_values(&[&m.id, role]).set(1); + let role = if m.is_leader { "leader" } else { "follower" }; + meta_metrics + .meta_type + .with_label_values(&[&m.id, role]) + .set(1); }); } } diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index e57ec8512a0f..70452557d22f 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -461,7 +461,9 @@ pub async fn start_service_as_election_leader( )); let mut aws_cli = None; - if let Some(my_vpc_id) = &env.opts.vpc_id && let Some(security_group_id) = &env.opts.security_group_id { + if let Some(my_vpc_id) = &env.opts.vpc_id + && let Some(security_group_id) = &env.opts.security_group_id + { let cli = AwsEc2Client::new(my_vpc_id, security_group_id).await; aws_cli = Some(cli); } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 4d3b793bb8a4..8f9ff9bdd17a 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -114,10 +114,15 @@ impl CreatingStreamingJobInfo { async fn cancel_jobs(&self, job_ids: Vec) { let mut jobs = self.streaming_jobs.lock().await; for job_id in job_ids { - if let Some(job) = jobs.get_mut(&job_id) && let Some(shutdown_tx) = job.shutdown_tx.take() { - let _ = shutdown_tx.send(CreatingState::Canceling).await.inspect_err(|_| { - tracing::warn!("failed to send canceling state"); - }); + if let Some(job) = jobs.get_mut(&job_id) + && let Some(shutdown_tx) = job.shutdown_tx.take() + { + let _ = shutdown_tx + .send(CreatingState::Canceling) + .await + .inspect_err(|_| { + tracing::warn!("failed to send canceling state"); + }); } } } diff --git a/src/object_store/src/object/disk.rs b/src/object_store/src/object/disk.rs index 67cd42505a0e..79dba81775a5 100644 --- a/src/object_store/src/object/disk.rs +++ b/src/object_store/src/object/disk.rs @@ -294,8 +294,7 @@ impl ObjectStore for DiskObjectStore { // `DiskObjectStore::delete()` behaves the same way as `S3ObjectStore::delete()`. if let Err(e) = &result && e.kind() == ErrorKind::NotFound { Ok(()) - } - else { + } else { result.map_err(|e| ObjectError::disk(format!("failed to delete {}", path), e)) } } diff --git a/src/risedevtool/src/task/compactor_service.rs b/src/risedevtool/src/task/compactor_service.rs index 5a5bbc361ebb..435b3b49b406 100644 --- a/src/risedevtool/src/task/compactor_service.rs +++ b/src/risedevtool/src/task/compactor_service.rs @@ -35,7 +35,9 @@ impl CompactorService { let prefix_bin = env::var("PREFIX_BIN")?; if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { - Ok(Command::new(Path::new(&prefix_bin).join("risingwave").join("compactor"))) + Ok(Command::new( + Path::new(&prefix_bin).join("risingwave").join("compactor"), + )) } else { Ok(Command::new(Path::new(&prefix_bin).join("compactor"))) } diff --git a/src/risedevtool/src/task/compute_node_service.rs b/src/risedevtool/src/task/compute_node_service.rs index ecac39d704eb..52e699a7573b 100644 --- a/src/risedevtool/src/task/compute_node_service.rs +++ b/src/risedevtool/src/task/compute_node_service.rs @@ -35,7 +35,11 @@ impl ComputeNodeService { let prefix_bin = env::var("PREFIX_BIN")?; if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { - Ok(Command::new(Path::new(&prefix_bin).join("risingwave").join("compute-node"))) + Ok(Command::new( + Path::new(&prefix_bin) + .join("risingwave") + .join("compute-node"), + )) } else { Ok(Command::new(Path::new(&prefix_bin).join("compute-node"))) } diff --git a/src/risedevtool/src/task/frontend_service.rs b/src/risedevtool/src/task/frontend_service.rs index 59f7a6609aa5..cf917c9d6419 100644 --- a/src/risedevtool/src/task/frontend_service.rs +++ b/src/risedevtool/src/task/frontend_service.rs @@ -36,7 +36,11 @@ impl FrontendService { let prefix_bin = env::var("PREFIX_BIN")?; if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { - Ok(Command::new(Path::new(&prefix_bin).join("risingwave").join("frontend-node"))) + Ok(Command::new( + Path::new(&prefix_bin) + .join("risingwave") + .join("frontend-node"), + )) } else { Ok(Command::new(Path::new(&prefix_bin).join("frontend"))) } diff --git a/src/risedevtool/src/task/meta_node_service.rs b/src/risedevtool/src/task/meta_node_service.rs index 8d3e635a67b1..40bb6c0ad8d2 100644 --- a/src/risedevtool/src/task/meta_node_service.rs +++ b/src/risedevtool/src/task/meta_node_service.rs @@ -36,7 +36,9 @@ impl MetaNodeService { let prefix_bin = env::var("PREFIX_BIN")?; if let Ok(x) = env::var("ENABLE_ALL_IN_ONE") && x == "true" { - Ok(Command::new(Path::new(&prefix_bin).join("risingwave").join("meta-node"))) + Ok(Command::new( + Path::new(&prefix_bin).join("risingwave").join("meta-node"), + )) } else { Ok(Command::new(Path::new(&prefix_bin).join("meta-node"))) } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index 98003c1b07e9..3d69ac90b5b5 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -1600,7 +1600,9 @@ impl Parser { loop { assert!(self.index > 0); self.index -= 1; - if let Some(token) = self.tokens.get(self.index) && let Token::Whitespace(_) = token.token { + if let Some(token) = self.tokens.get(self.index) + && let Token::Whitespace(_) = token.token + { continue; } return; @@ -1986,7 +1988,9 @@ impl Parser { // parse: [ argname ] argtype let mut name = None; let mut data_type = self.parse_data_type()?; - if let DataType::Custom(n) = &data_type && !matches!(self.peek_token().token, Token::Comma | Token::RParen) { + if let DataType::Custom(n) = &data_type + && !matches!(self.peek_token().token, Token::Comma | Token::RParen) + { // the first token is actually a name name = Some(n.0[0].clone()); data_type = self.parse_data_type()?; diff --git a/src/storage/src/hummock/iterator/delete_range_iterator.rs b/src/storage/src/hummock/iterator/delete_range_iterator.rs index d1708a78cc9b..f3ee1e8cbdf7 100644 --- a/src/storage/src/hummock/iterator/delete_range_iterator.rs +++ b/src/storage/src/hummock/iterator/delete_range_iterator.rs @@ -252,7 +252,10 @@ impl DeleteRangeIterator for ForwardMergeRangeIterator { fn next(&mut self) { self.tmp_buffer .push(self.heap.pop().expect("no inner iter")); - while let Some(node) = self.heap.peek() && node.is_valid() && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() { + while let Some(node) = self.heap.peek() + && node.is_valid() + && node.next_extended_user_key() == self.tmp_buffer[0].next_extended_user_key() + { self.tmp_buffer.push(self.heap.pop().unwrap()); } for node in &self.tmp_buffer { diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index 9e2478a8102b..38a1e1da9dc2 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -362,9 +362,14 @@ pub async fn get_from_sstable_info( // Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not // contain `TablePrefix` and `VnodePrefix`. - if let Some(hash) = dist_key_hash && !hit_sstable_bloom_filter(sstable.value(), hash, local_stats) { + if let Some(hash) = dist_key_hash + && !hit_sstable_bloom_filter(sstable.value(), hash, local_stats) + { if !read_options.ignore_range_tombstone { - let delete_epoch = get_min_delete_range_epoch_from_sstable(sstable.value().as_ref(), full_key.user_key); + let delete_epoch = get_min_delete_range_epoch_from_sstable( + sstable.value().as_ref(), + full_key.user_key, + ); if delete_epoch <= full_key.epoch { return Ok(Some((HummockValue::Delete, delete_epoch))); } diff --git a/src/storage/src/hummock/sstable/delete_range_aggregator.rs b/src/storage/src/hummock/sstable/delete_range_aggregator.rs index 335197d99efc..e33be3dec456 100644 --- a/src/storage/src/hummock/sstable/delete_range_aggregator.rs +++ b/src/storage/src/hummock/sstable/delete_range_aggregator.rs @@ -311,7 +311,9 @@ impl CompactionDeleteRangeIterator { epoch: HummockEpoch, ) -> HummockEpoch { let target_extended_user_key = PointRange::from_user_key(target_user_key, false); - while let Some((extended_user_key, ..)) = self.events.events.get(self.seek_idx) && extended_user_key.as_ref().le(&target_extended_user_key) { + while let Some((extended_user_key, ..)) = self.events.events.get(self.seek_idx) + && extended_user_key.as_ref().le(&target_extended_user_key) + { self.apply(self.seek_idx); self.seek_idx += 1; } diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 4d9018653345..5c48bbff6512 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -242,10 +242,11 @@ impl SstableStore { let use_tiered_cache = !matches!(policy, CachePolicy::Disable); async move { - if use_tiered_cache && let Some(holder) = tiered_cache - .get(&(object_id, block_index as u64)) - .await - .map_err(HummockError::tiered_cache)? + if use_tiered_cache + && let Some(holder) = tiered_cache + .get(&(object_id, block_index as u64)) + .await + .map_err(HummockError::tiered_cache)? { // TODO(MrCroxx): `into_owned()` may perform buffer copy, eliminate it later. return Ok(holder.into_owned()); diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 1d901ee4226c..6370a13ee000 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -913,7 +913,9 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor { return; } let elem_to_compare_with_middle = - if let Some(low_last) = self.low.last_entry() - && cache_key <= *low_last.key() { + if let Some(low_last) = self.low.last_entry() && cache_key <= *low_last.key() { // Take the last element of `cache.low` and insert input row to it. let low_last = low_last.remove_entry(); self.low.insert(cache_key, (&row).into()); @@ -434,7 +433,8 @@ impl TopNCacheTrait for TopNCache { // insert 0 -> [0,0,0] if self.middle.len() - num_ties + 1 >= self.limit { while let Some(middle_last) = self.middle.last_entry() - && middle_last.key().0 == middle_last_order_by.clone() { + && middle_last.key().0 == middle_last_order_by.clone() + { let middle_last = middle_last.remove_entry(); res_ops.push(Op::Delete); res_rows.push(middle_last.1.clone()); @@ -591,8 +591,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache { } let elem_to_insert_into_middle = - if let Some(low_last) = self.low.last_entry() - && &cache_key <= low_last.key() { + if let Some(low_last) = self.low.last_entry() && &cache_key <= low_last.key() { // Take the last element of `cache.low` and insert input row to it. let low_last = low_last.remove_entry(); self.low.insert(cache_key, row_ref.into()); @@ -678,11 +677,13 @@ impl AppendOnlyTopNCacheTrait for TopNCache { // insert 0 -> [0,0,0] if self.middle.len() - num_ties + 1 >= self.limit { while let Some(middle_last) = self.middle.last_entry() - && &middle_last.key().0 == middle_last_order_by { + && &middle_last.key().0 == middle_last_order_by + { let middle_last = middle_last.remove_entry(); res_ops.push(Op::Delete); res_rows.push(middle_last.1.clone()); - managed_state.delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); + managed_state + .delete(row_deserializer.deserialize(middle_last.1.row.as_ref())?); } } diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 4ba3c2c75a5e..cc67c298ecf7 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -221,7 +221,9 @@ impl ManagedTopNState { ); while !topn_cache.is_high_cache_full() && let Some(item) = state_table_iter.next().await { let topn_row = self.get_topn_row(item?, group_key.len()); - topn_cache.high.insert(topn_row.cache_key, (&topn_row.row).into()); + topn_cache + .high + .insert(topn_row.cache_key, (&topn_row.row).into()); } if WITH_TIES && topn_cache.is_high_cache_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 78bca4fa0ae3..a5a126f3029f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -172,13 +172,16 @@ impl ManagedBarrierState { ref remaining_actors, ref mut collect_notifier, } => { - if remaining_actors.contains(&actor_id) && let Some(collect_notifier) = collect_notifier.take() && collect_notifier + if remaining_actors.contains(&actor_id) + && let Some(collect_notifier) = collect_notifier.take() + && collect_notifier .send(Err(anyhow!(format!( "Actor {actor_id} exit unexpectedly: {:?}", err )) .into())) - .is_err() { + .is_err() + { warn!("failed to notify actor {} exit: {:?}", actor_id, err); } } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index b3c0e59179a5..81061188f64e 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -769,7 +769,7 @@ impl LocalStreamManagerCore { let mut actor_infos = self.context.actor_infos.write(); for actor in new_actor_infos { let ret = actor_infos.insert(actor.get_actor_id(), actor.clone()); - if let Some(prev_actor) = ret && actor != &prev_actor{ + if let Some(prev_actor) = ret && actor != &prev_actor { bail!( "actor info mismatch when broadcasting {}", actor.get_actor_id() diff --git a/src/tests/sqlsmith/src/runner.rs b/src/tests/sqlsmith/src/runner.rs index 7877bcf360c4..259e4b7d2804 100644 --- a/src/tests/sqlsmith/src/runner.rs +++ b/src/tests/sqlsmith/src/runner.rs @@ -422,9 +422,7 @@ fn validate_response<_Row>(response: PgResult<_Row>) -> Result { Ok(_) => Ok(0), Err(e) => { // Permit runtime errors conservatively. - if let Some(e) = e.as_db_error() - && is_permissible_error(&e.to_string()) - { + if let Some(e) = e.as_db_error() && is_permissible_error(&e.to_string()) { tracing::info!("[SKIPPED ERROR]: {:#?}", e); return Ok(1); } diff --git a/src/tests/sqlsmith/src/sql_gen/expr.rs b/src/tests/sqlsmith/src/sql_gen/expr.rs index 0c71d590dfda..18770d5e1065 100644 --- a/src/tests/sqlsmith/src/sql_gen/expr.rs +++ b/src/tests/sqlsmith/src/sql_gen/expr.rs @@ -350,9 +350,11 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { .iter() .map(|t| { if let Some(from_tys) = IMPLICIT_CAST_TABLE.get(t) - && can_implicit_cast && self.flip_coin() { + && can_implicit_cast + && self.flip_coin() + { let from_ty = &from_tys.choose(&mut self.rng).unwrap().from_type; - self.gen_implicit_cast(from_ty, context) + self.gen_implicit_cast(from_ty, context) } else { self.gen_expr(t, context) } diff --git a/src/tests/sqlsmith/src/sql_gen/types.rs b/src/tests/sqlsmith/src/sql_gen/types.rs index 5799144d9da3..27febaf84a95 100644 --- a/src/tests/sqlsmith/src/sql_gen/types.rs +++ b/src/tests/sqlsmith/src/sql_gen/types.rs @@ -96,7 +96,8 @@ impl TryFrom for CastSig { fn try_from(value: RwCastSig) -> Result { if let Some(from_type) = data_type_name_to_ast_data_type(&value.from_type) - && let Some(to_type) = data_type_name_to_ast_data_type(&value.to_type) { + && let Some(to_type) = data_type_name_to_ast_data_type(&value.to_type) + { Ok(CastSig { from_type, to_type, @@ -120,8 +121,13 @@ impl TryFrom<&RwFuncSig> for FuncSig { type Error = String; fn try_from(value: &RwFuncSig) -> Result { - if let Some(inputs_type) = value.inputs_type.iter().map(data_type_name_to_ast_data_type).collect() - && let Some(ret_type) = data_type_name_to_ast_data_type(&value.ret_type) { + if let Some(inputs_type) = value + .inputs_type + .iter() + .map(data_type_name_to_ast_data_type) + .collect() + && let Some(ret_type) = data_type_name_to_ast_data_type(&value.ret_type) + { Ok(FuncSig { inputs_type, ret_type, @@ -145,8 +151,13 @@ impl TryFrom<&RwAggFuncSig> for AggFuncSig { type Error = String; fn try_from(value: &RwAggFuncSig) -> Result { - if let Some(inputs_type) = value.inputs_type.iter().map(data_type_name_to_ast_data_type).collect() - && let Some(ret_type) = data_type_name_to_ast_data_type(&value.ret_type) { + if let Some(inputs_type) = value + .inputs_type + .iter() + .map(data_type_name_to_ast_data_type) + .collect() + && let Some(ret_type) = data_type_name_to_ast_data_type(&value.ret_type) + { Ok(AggFuncSig { inputs_type, ret_type,