New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coprocessor: Add batch aggregate function FIRST #4771
Changes from 11 commits
b08c791
2920306
1e248f4
9d2b66c
8af2b20
d3edf13
b5433c8
fcca6d0
719787e
998de19
3fa479d
76bb4ca
2c39951
e00cfdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. | ||
|
||
use std::marker::PhantomData; | ||
|
||
use cop_codegen::AggrFunction; | ||
use cop_datatype::EvalType; | ||
use tipb::expression::{Expr, ExprType, FieldType}; | ||
|
||
use crate::coprocessor::codec::data_type::*; | ||
use crate::coprocessor::codec::mysql::Tz; | ||
use crate::coprocessor::dag::expr::EvalContext; | ||
use crate::coprocessor::dag::rpn_expr::{RpnExpression, RpnExpressionBuilder}; | ||
use crate::coprocessor::Result; | ||
|
||
/// The parser for FIRST aggregate function. | ||
pub struct AggrFnDefinitionParserFirst; | ||
|
||
impl super::AggrDefinitionParser for AggrFnDefinitionParserFirst { | ||
fn check_supported(&self, aggr_def: &Expr) -> Result<()> { | ||
assert_eq!(aggr_def.get_tp(), ExprType::First); | ||
super::util::check_aggr_exp_supported_one_child(aggr_def) | ||
} | ||
|
||
fn parse( | ||
&self, | ||
mut aggr_def: Expr, | ||
time_zone: &Tz, | ||
src_schema: &[FieldType], | ||
out_schema: &mut Vec<FieldType>, | ||
out_exp: &mut Vec<RpnExpression>, | ||
) -> Result<Box<dyn super::AggrFunction>> { | ||
use cop_datatype::FieldTypeAccessor; | ||
use std::convert::TryFrom; | ||
assert_eq!(aggr_def.get_tp(), ExprType::First); | ||
let child = aggr_def.take_children().into_iter().next().unwrap(); | ||
let eval_type = EvalType::try_from(child.get_field_type().tp()).unwrap(); | ||
|
||
// FIRST outputs one column with the same type as its child | ||
out_schema.push(aggr_def.take_field_type()); | ||
|
||
// FIRST doesn't need to cast, so using the expression directly. | ||
out_exp.push(RpnExpressionBuilder::build_from_expr_tree( | ||
child, | ||
time_zone, | ||
src_schema.len(), | ||
)?); | ||
|
||
match_template_evaluable! { | ||
TT, match eval_type { | ||
EvalType::TT => Ok(Box::new(AggrFnFirst::<TT>::new())) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// The FIRST aggregate function. | ||
#[derive(Debug, AggrFunction)] | ||
#[aggr_function(state = AggrFnStateFirst::<T>::new())] | ||
pub struct AggrFnFirst<T>(PhantomData<T>) | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>; | ||
|
||
impl<T> AggrFnFirst<T> | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>, | ||
{ | ||
fn new() -> Self { | ||
AggrFnFirst(PhantomData) | ||
} | ||
} | ||
|
||
/// The state of the FIRST aggregate function. | ||
#[derive(Debug)] | ||
pub enum AggrFnStateFirst<T> | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>, | ||
{ | ||
Empty, | ||
Valued(Option<T>), | ||
} | ||
|
||
impl<T> AggrFnStateFirst<T> | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>, | ||
{ | ||
pub fn new() -> Self { | ||
AggrFnStateFirst::Empty | ||
} | ||
} | ||
|
||
// Here we manually implement `AggrFunctionStateUpdatePartial` instead of implementing | ||
// `ConcreteAggrFunctionState` so that `update_repeat` and `update_vector` can be faster. | ||
impl<T> super::AggrFunctionStateUpdatePartial<T> for AggrFnStateFirst<T> | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>, | ||
{ | ||
#[inline] | ||
fn update(&mut self, _ctx: &mut EvalContext, value: &Option<T>) -> Result<()> { | ||
if let AggrFnStateFirst::Empty = self { | ||
// TODO: avoid this clone | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😕 There's no way to avoid the clone since value is an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we cannot avoid it unless we change the parameter type. But we do not always need the clone, for example, we can actually move out the value if it is owned in the RpnStackNode. To accomplish that, we need a big change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But for an aggregate function, it's rare we need to clone the value. FIRST is quite a rarely used aggregate function too. (I know only one case like this: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. We can ignore the clone cost, since it is called only once for every group, which is fine. |
||
*self = AggrFnStateFirst::Valued(value.as_ref().cloned()); | ||
} | ||
Ok(()) | ||
} | ||
|
||
#[inline] | ||
fn update_repeat( | ||
&mut self, | ||
ctx: &mut EvalContext, | ||
value: &Option<T>, | ||
repeat_times: usize, | ||
) -> Result<()> { | ||
assert!(repeat_times > 0); | ||
self.update(ctx, value) | ||
} | ||
|
||
#[inline] | ||
fn update_vector(&mut self, ctx: &mut EvalContext, values: &[Option<T>]) -> Result<()> { | ||
if let Some(v) = values.first() { | ||
self.update(ctx, v)?; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
// In order to make `AggrFnStateFirst` satisfy the `AggrFunctionState` trait, we default impl all | ||
// `AggrFunctionStateUpdatePartial` of `Evaluable` for all `AggrFnStateFirst`. | ||
impl<T1, T2> super::AggrFunctionStateUpdatePartial<T1> for AggrFnStateFirst<T2> | ||
where | ||
T1: Evaluable, | ||
T2: Evaluable, | ||
VectorValue: VectorValueExt<T2>, | ||
{ | ||
#[inline] | ||
default fn update(&mut self, _ctx: &mut EvalContext, _value: &Option<T1>) -> Result<()> { | ||
panic!("Unmatched parameter type") | ||
} | ||
|
||
#[inline] | ||
default fn update_repeat( | ||
&mut self, | ||
_ctx: &mut EvalContext, | ||
_value: &Option<T1>, | ||
_repeat_times: usize, | ||
) -> Result<()> { | ||
panic!("Unmatched parameter type") | ||
} | ||
|
||
#[inline] | ||
default fn update_vector( | ||
&mut self, | ||
_ctx: &mut EvalContext, | ||
_values: &[Option<T1>], | ||
) -> Result<()> { | ||
panic!("Unmatched parameter type") | ||
} | ||
} | ||
|
||
impl<T> super::AggrFunctionState for AggrFnStateFirst<T> | ||
where | ||
T: Evaluable, | ||
VectorValue: VectorValueExt<T>, | ||
{ | ||
fn push_result(&self, _ctx: &mut EvalContext, target: &mut [VectorValue]) -> Result<()> { | ||
assert_eq!(target.len(), 1); | ||
let res = if let AggrFnStateFirst::Valued(v) = self { | ||
v.clone() | ||
} else { | ||
None | ||
}; | ||
target[0].push(res); | ||
Ok(()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::super::AggrFunction; | ||
use super::*; | ||
|
||
#[test] | ||
fn test_update() { | ||
let mut ctx = EvalContext::default(); | ||
let function = AggrFnFirst::<Int>::new(); | ||
let mut state = function.create_state(); | ||
|
||
let mut result = [VectorValue::with_capacity(0, EvalType::Int)]; | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None]); | ||
|
||
state.update(&mut ctx, &Some(1)).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None, Some(1)]); | ||
|
||
state.update(&mut ctx, &Some(2)).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None, Some(1), Some(1)]); | ||
} | ||
|
||
#[test] | ||
fn test_update_repeat() { | ||
let mut ctx = EvalContext::default(); | ||
let function = AggrFnFirst::<Bytes>::new(); | ||
let mut state = function.create_state(); | ||
|
||
let mut result = [VectorValue::with_capacity(0, EvalType::Bytes)]; | ||
|
||
state.update_repeat(&mut ctx, &Some(vec![1]), 2).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_bytes_slice(), &[Some(vec![1])]); | ||
|
||
state.update_repeat(&mut ctx, &Some(vec![2]), 3).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_bytes_slice(), &[Some(vec![1]), Some(vec![1])]); | ||
} | ||
|
||
#[test] | ||
fn test_update_vector() { | ||
let mut ctx = EvalContext::default(); | ||
let function = AggrFnFirst::<Int>::new(); | ||
let mut state = function.create_state(); | ||
let mut result = [VectorValue::with_capacity(0, EvalType::Int)]; | ||
|
||
state.update_vector(&mut ctx, &[Some(0); 0]).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None]); | ||
|
||
state.update_vector(&mut ctx, &[None, Some(2)]).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None, None]); | ||
|
||
state.update_vector(&mut ctx, &[Some(1)]).unwrap(); | ||
state.push_result(&mut ctx, &mut result[..]).unwrap(); | ||
assert_eq!(result[0].as_int_slice(), &[None, None, None]); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
mod impl_avg; | ||
mod impl_count; | ||
mod impl_first; | ||
mod parser; | ||
mod summable; | ||
mod util; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel weird to add the comment here. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, seems too trivial... Let me delete it.