Skip to content

Commit

Permalink
format let chains
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Apr 24, 2023
1 parent da57c30 commit 7cd4dc9
Show file tree
Hide file tree
Showing 55 changed files with 483 additions and 269 deletions.
4 changes: 2 additions & 2 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
shard.release(handle)
};
// do not deallocate data with holding mutex.
if let Some((key,value)) = data && let Some(listener) = &self.listener {
if let Some((key, value)) = data && let Some(listener) = &self.listener {
listener.on_release(key, value);
}
}
Expand Down Expand Up @@ -819,7 +819,7 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
shard.erase(hash, key)
};
// do not deallocate data with holding mutex.
if let Some((key,value)) = data && let Some(listener) = &self.listener {
if let Some((key, value)) = data && let Some(listener) = &self.listener {
listener.on_release(key, value);
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@ impl VirtualNode {
// be the one that contains RowId, and use a special method to skip the calculation of Hash
// and directly extract the `VirtualNode` from `RowId`.
pub fn compute_chunk(data_chunk: &DataChunk, keys: &[usize]) -> Vec<VirtualNode> {
if let Ok(idx) = keys.iter().exactly_one() &&
let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref() {

return serial_array.iter()
.map(|serial|extract_vnode_id_from_row_id(serial.unwrap().as_row_id()))
if let Ok(idx) = keys.iter().exactly_one()
&& let ArrayImpl::Serial(serial_array) = data_chunk.column_at(*idx).array_ref()
{
return serial_array
.iter()
.map(|serial| extract_vnode_id_from_row_id(serial.unwrap().as_row_id()))
.collect();
}

Expand Down
20 changes: 14 additions & 6 deletions src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,7 +1322,7 @@ impl Interval {
fn parse_postgres(s: &str) -> Result<Self> {
use DateTimeField::*;
let mut tokens = parse_interval(s)?;
if tokens.len()%2!=0 && let Some(TimeStrToken::Num(_)) = tokens.last() {
if tokens.len() % 2 != 0 && let Some(TimeStrToken::Num(_)) = tokens.last() {
tokens.push(TimeStrToken::TimeUnit(DateTimeField::Second));
}
if tokens.len() % 2 != 0 {
Expand Down Expand Up @@ -1354,22 +1354,30 @@ impl Interval {
}
})()
.and_then(|rhs| result.checked_add(&rhs))
.ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)))?;
.ok_or_else(|| {
ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s))
})?;
}
(TimeStrToken::Second(second), TimeStrToken::TimeUnit(interval_unit)) => {
result = match interval_unit {
Second => {
// If unsatisfied precision is passed as input, we should not return None (Error).
let usecs = (second.into_inner() * (USECS_PER_SEC as f64)).round() as i64;
// If unsatisfied precision is passed as input, we should not return
// None (Error).
let usecs =
(second.into_inner() * (USECS_PER_SEC as f64)).round() as i64;
Some(Interval::from_month_day_usec(0, 0, usecs))
}
_ => None,
}
.and_then(|rhs| result.checked_add(&rhs))
.ok_or_else(|| ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s)))?;
.ok_or_else(|| {
ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", s))
})?;
}
_ => {
return Err(ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into());
return Err(
ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into(),
);
}
}
}
Expand Down
11 changes: 4 additions & 7 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl JsonParser {
let value: BorrowedValue<'_> = simd_json::to_borrowed_value(&mut payload_mut)
.map_err(|e| RwError::from(ProtocolError(e.to_string())))?;

if let BorrowedValue::Array(ref objects) = value && matches!(op, Op::Insert) {
at_least_one_ok(
if let BorrowedValue::Array(ref objects) = value && matches!(op, Op::Insert) {
at_least_one_ok(
objects
.iter()
.map(|obj| Self::parse_single_value(obj, &mut writer))
Expand All @@ -119,13 +119,10 @@ impl JsonParser {
simd_json_parse_value(
&SourceFormat::Json,
&desc.data_type,
json_object_smart_get_value(&value,desc.name.as_str().into())
json_object_smart_get_value(&value, desc.name.as_str().into()),
)
.map_err(|e| {
tracing::error!(
"failed to process value: {}",
e
);
tracing::error!("failed to process value: {}", e);
e.into()
})
};
Expand Down
9 changes: 6 additions & 3 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ impl SourceContext {
return Err(e);
}
let mut err_str = e.inner().to_string();
if let Some(suppressor) = &self.error_suppressor &&
suppressor.lock().suppress_error(&err_str)
if let Some(suppressor) = &self.error_suppressor
&& suppressor.lock().suppress_error(&err_str)
{
err_str = format!("error msg suppressed (due to per-actor error limit: {})", suppressor.lock().max());
err_str = format!(
"error msg suppressed (due to per-actor error limit: {})",
suppressor.lock().max()
);
}
self.metrics
.user_source_error_count
Expand Down
14 changes: 5 additions & 9 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,12 @@ fn generator_from_data_type(
}
_ => {
let kind_key = format!("fields.{}.kind", name);
if let Some(kind) = fields_option_map.get(&kind_key) && kind.as_str() == SEQUENCE_FIELD_KIND {
if let Some(kind) = fields_option_map.get(&kind_key)
&& kind.as_str() == SEQUENCE_FIELD_KIND
{
let start_key = format!("fields.{}.start", name);
let end_key = format!("fields.{}.end", name);
let start_value =
fields_option_map.get(&start_key).map(|s| s.to_string());
let start_value = fields_option_map.get(&start_key).map(|s| s.to_string());
let end_value = fields_option_map.get(&end_key).map(|s| s.to_string());
FieldGeneratorImpl::with_number_sequence(
data_type,
Expand All @@ -299,12 +300,7 @@ fn generator_from_data_type(
let max_key = format!("fields.{}.max", name);
let min_value = fields_option_map.get(&min_key).map(|s| s.to_string());
let max_value = fields_option_map.get(&max_key).map(|s| s.to_string());
FieldGeneratorImpl::with_number_random(
data_type,
min_value,
max_value,
random_seed
)
FieldGeneratorImpl::with_number_random(data_type, min_value, max_value, random_seed)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/source/google_pubsub/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ impl PubsubSplitReader {
yield chunk;

// Stop if we've approached the stop_offset
if let Some(stop_offset) = self.stop_offset
&& latest_offset >= stop_offset {
if let Some(stop_offset) = self.stop_offset && latest_offset >= stop_offset {
return Ok(());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl KafkaSplitReader {
#[try_stream(boxed, ok = Vec<SourceMessage>, error = anyhow::Error)]
pub async fn into_data_stream(self) {
if let Some(stop_offset) = self.stop_offset {
if let Some(start_offset) = self.start_offset && (start_offset+1) >= stop_offset {
if let Some(start_offset) = self.start_offset && (start_offset + 1) >= stop_offset {
yield Vec::new();
return Ok(());
} else if stop_offset == 0 {
Expand Down
4 changes: 3 additions & 1 deletion src/expr/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ fn lexer(input: &str) -> Vec<Token> {
_ => {
let mut literal = String::new();
literal.push(c);
while let Some(&c) = chars.peek() && !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n') {
while let Some(&c) = chars.peek()
&& !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n')
{
literal.push(chars.next().unwrap());
}
Token::Literal(literal)
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ impl TestCaseResult {
if original_test_case.planner_error.is_none() && let Some(ref err) = self.planner_error {
return Err(anyhow!("unexpected planner error: {}", err));
}
if original_test_case.optimizer_error.is_none() && let Some(ref err) = self.optimizer_error {
if original_test_case.optimizer_error.is_none() && let Some(ref err) = self.optimizer_error
{
return Err(anyhow!("unexpected optimizer error: {}", err));
}

Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/binder/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ impl Binder {
}

pub fn bind_cast_inner(&mut self, expr: Expr, data_type: DataType) -> Result<ExprImpl> {
if let Expr::Array(Array {elem: ref expr, ..}) = expr && matches!(&data_type, DataType::List{ .. } ) {
if let Expr::Array(Array { elem: ref expr, .. }) = expr
&& matches!(&data_type, DataType::List { .. })
{
return self.bind_array_cast(expr.clone(), data_type);
}
let lhs = self.bind_expr(expr)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ fn unescape_c_style(s: &str) -> Result<String> {
for _ in 0..len {
if let Some(c) = chars.peek() && c.is_ascii_hexdigit() {
unicode_seq.push(chars.next().unwrap());
}else{
} else {
break;
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ fn unescape_c_style(s: &str) -> Result<String> {
for _ in 0..2 {
if let Some(c) = chars.peek() && matches!(*c, '0'..='7') {
unicode_seq.push(chars.next().unwrap());
}else{
} else {
break;
}
}
Expand Down
14 changes: 11 additions & 3 deletions src/frontend/src/binder/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,17 @@ impl Binder {
) -> Result<ColumnOrder> {
let order_type = OrderType::from_bools(asc, nulls_first);
let column_index = match expr {
Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => match *index != usize::MAX {
true => *index,
false => return Err(ErrorCode::BindError(format!("ORDER BY \"{}\" is ambiguous", name.real_value())).into()),
Expr::Identifier(name) if let Some(index) = name_to_index.get(&name.real_value()) => {
match *index != usize::MAX {
true => *index,
false => {
return Err(ErrorCode::BindError(format!(
"ORDER BY \"{}\" is ambiguous",
name.real_value()
))
.into())
}
}
}
Expr::Value(Value::Number(number)) => match number.parse::<usize>() {
Ok(index) if 1 <= index && index <= visible_output_num => index - 1,
Expand Down
30 changes: 20 additions & 10 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ impl Binder {
if let Some(from_alias) = alias {
original_alias.name = from_alias.name;
let mut alias_iter = from_alias.columns.into_iter();
original_alias.columns = original_alias.columns.into_iter().map(|ident| {
alias_iter
.next()
.unwrap_or(ident)
}).collect();
original_alias.columns = original_alias
.columns
.into_iter()
.map(|ident| alias_iter.next().unwrap_or(ident))
.collect();
}

self.bind_table_to_context(
Expand All @@ -316,11 +316,18 @@ impl Binder {

// Share the CTE.
let input_relation = Relation::Subquery(Box::new(BoundSubquery { query }));
let share_relation = Relation::Share(Box::new(BoundShare { share_id, input: input_relation }));
let share_relation = Relation::Share(Box::new(BoundShare {
share_id,
input: input_relation,
}));
Ok(share_relation)
} else {

self.bind_relation_by_name_inner(schema_name.as_deref(), &table_name, alias, for_system_time_as_of_proctime)
self.bind_relation_by_name_inner(
schema_name.as_deref(),
&table_name,
alias,
for_system_time_as_of_proctime,
)
}
}

Expand Down Expand Up @@ -352,7 +359,9 @@ impl Binder {
err_msg: &str,
) -> Result<Box<InputRef>> {
if let Some(time_col_arg) = arg
&& let Some(ExprImpl::InputRef(time_col)) = self.bind_function_arg(time_col_arg)?.into_iter().next() {
&& let Some(ExprImpl::InputRef(time_col)) =
self.bind_function_arg(time_col_arg)?.into_iter().next()
{
Ok(time_col)
} else {
Err(ErrorCode::BindError(err_msg.to_string()).into())
Expand Down Expand Up @@ -439,7 +448,8 @@ impl Binder {
.get_function_by_name_args(
func_name,
&args.iter().map(|arg| arg.return_type()).collect_vec(),
) && matches!(func.kind, FunctionKind::Table { .. })
)
&& matches!(func.kind, FunctionKind::Table { .. })
{
TableFunction::new_user_defined(func.clone(), args)
} else if let Ok(table_function_type) = TableFunctionType::from_str(func_name) {
Expand Down
Loading

0 comments on commit 7cd4dc9

Please sign in to comment.