Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: Batch Stream Aggregation Executor #4786

Merged
merged 23 commits into from May 29, 2019
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
42a60f7
Initialize stream aggr executor executor
sticnarf May 25, 2019
4cc8d43
Add unchecked RPN function eval
sticnarf May 26, 2019
7f5e827
Merge branch 'unchecked-eval' into stream-agg
sticnarf May 26, 2019
4c9d3b1
Finish process_batch_input part
sticnarf May 26, 2019
2586739
Use aggr framework
sticnarf May 27, 2019
e871872
Finish stream aggr tests
sticnarf May 27, 2019
b30f9d7
Add some more comments
sticnarf May 27, 2019
1214feb
Add checks for no aggregation expression
sticnarf May 27, 2019
fbeb91e
Make clippy happy
sticnarf May 27, 2019
6fdb4ad
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
0874736
Add no agg function tests for stream agg
sticnarf May 27, 2019
10ceccd
address comments
sticnarf May 27, 2019
24c0196
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
75ebc25
Fix panic when there is no aggr function
sticnarf May 27, 2019
70b0d8b
Merge branch 'stream-agg' of https://github.com/sticnarf/tikv into st…
sticnarf May 27, 2019
8b8dc9e
Not include src_is_drained in is_partial_results_ready
sticnarf May 28, 2019
e80022c
Rename iterate_each_group_for_partial_aggregation to iterate_availabl…
sticnarf May 28, 2019
181f35d
Merge branch 'master' into stream-agg
sticnarf May 28, 2019
4050b55
Merge branch 'master' into stream-agg
breeswish May 28, 2019
fb76ff8
Merge branch 'master' into stream-agg
breeswish May 28, 2019
fd97342
Merge branch 'master' into stream-agg
breeswish May 28, 2019
8aa769a
Merge branch 'master' into stream-agg
breeswish May 29, 2019
eb9e483
Merge branch 'master' into stream-agg
breeswish May 29, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

address comments

Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed May 27, 2019
commit 10ceccd4a354118f6394e0b633d7cf22d223ddaa
@@ -14,7 +14,7 @@ pub type Bytes = Vec<u8>;
pub use crate::coprocessor::codec::mysql::{Decimal, Duration, Json, Time as DateTime};

// Dynamic eval types.
pub use self::scalar::ScalarValue;
pub use self::scalar::{ScalarValue, ScalarValueRef};
pub use self::vector::{VectorValue, VectorValueExt};
pub use self::vector_like::{VectorLikeValueRef, VectorLikeValueRefSpecialized};

@@ -28,6 +28,7 @@ pub enum ScalarValue {
Json(Option<super::Json>),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ScalarValueRef<'a> {
This conversation was marked as resolved by sticnarf

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

derive Copy, Clone

Int(&'a Option<super::Int>),
Real(&'a Option<super::Real>),
@@ -38,6 +39,18 @@ pub enum ScalarValueRef<'a> {
Json(&'a Option<super::Json>),
}

impl<'a> ScalarValueRef<'a> {
#[inline]
#[allow(clippy::clone_on_copy)]
pub fn to_owned(self) -> ScalarValue {
This conversation was marked as resolved by breeswish

This comment has been minimized.

Copy link
@lonng

lonng May 28, 2019

Member

The semantic is more like the trait Into 🤔 ? Is implement Into trait is more appropriate?

This comment has been minimized.

Copy link
@breeswish

This comment has been minimized.

Copy link
@lonng

lonng May 28, 2019

Member

OK, sorry about this.

match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
}
}
}
}

impl<'a> PartialEq<ScalarValue> for ScalarValueRef<'a> {
fn eq(&self, other: &ScalarValue) -> bool {
match_template_evaluable! {
@@ -150,15 +163,3 @@ impl From<f64> for ScalarValue {
ScalarValue::Real(Real::new(s).ok())
}
}

impl<'a> From<ScalarValueRef<'a>> for ScalarValue {
#[inline]
#[allow(clippy::clone_on_copy)]
fn from(s: ScalarValueRef<'a>) -> ScalarValue {
match_template_evaluable! {
TT, match s {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
}
}
}
}
@@ -176,8 +176,13 @@ impl VectorValue {
Ok(())
}

/// Returns a `ScalarValueRef` to the element at the index.
///
/// # Panics
///
/// Panics if index is out of range.
#[inline]
pub fn get_unchecked(&self, index: usize) -> ScalarValueRef<'_> {
pub fn get_scalar_ref(&self, index: usize) -> ScalarValueRef<'_> {
match_template_evaluable! {
TT, match self {
VectorValue::TT(v) => ScalarValueRef::TT(&v[index]),
@@ -262,12 +262,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for FastHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_each_group_for_partial_aggregation(
&mut self,
entities: &mut Entities<Src>,
_src_is_drained: bool,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let aggr_fns_len = entities.each_aggr_fn.len();
let mut group_by_column = LazyBatchColumn::decoded_with_capacity_and_tp(
self.groups.len(),
@@ -163,12 +163,13 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SimpleAggregationImpl
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_each_group_for_partial_aggregation(
&mut self,
entities: &mut Entities<Src>,
_src_is_drained: bool,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);
iteratee(entities, &self.states)?;
Ok(Vec::new())
}
@@ -255,12 +255,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SlowHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_each_group_for_partial_aggregation(
&mut self,
entities: &mut Entities<Src>,
_src_is_drained: bool,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let number_of_groups = self.groups.len();
let group_by_exps_len = self.group_by_exps.len();
let mut group_by_columns: Vec<_> = self
@@ -9,6 +9,7 @@ use tipb::expression::{Expr, FieldType};

use crate::coprocessor::codec::batch::{LazyBatchColumn, LazyBatchColumnVec};
use crate::coprocessor::codec::data_type::*;
use crate::coprocessor::codec::mysql::time::Tz;
use crate::coprocessor::dag::aggr_fn::*;
use crate::coprocessor::dag::batch::executors::util::aggr_executor::*;
use crate::coprocessor::dag::batch::interface::*;
@@ -80,13 +81,13 @@ impl BatchStreamAggregationExecutor<Box<dyn BatchExecutor>> {

pub struct BatchStreamAggregationImpl {
group_by_exps: Vec<RpnExpression>,
// used in the `iterate_each_group_for_aggregation` method
/// used in the `iterate_each_group_for_aggregation` method
group_by_exps_types: Vec<EvalType>,
// Stores all group keys. The last `group_by_exps.len()` elements are the keys of
// the current group
/// Stores all group keys for the current result partial.
/// The last `group_by_exps.len()` elements are the keys of the last group.
keys: Vec<ScalarValue>,
// Stores all group states. The last `each_aggr_fn.len()` elements are the states of
// the current group
/// Stores all group states for the current result partial.
/// The last `each_aggr_fn.len()` elements are the states of the last group.
states: Vec<Box<dyn AggrFunctionState>>,
}

@@ -175,23 +176,28 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation

// Decode columns with mutable input first, so subsequent access to input can be immutable
// (and the borrow checker will be happy)
ensure_columns_decoded(context, &self.group_by_exps, src_schema, &mut input)?;
ensure_columns_decoded(context, &entities.each_aggr_exprs, src_schema, &mut input)?;
ensure_columns_decoded(&context.cfg.tz, &self.group_by_exps, src_schema, &mut input)?;
ensure_columns_decoded(
&context.cfg.tz,
&entities.each_aggr_exprs,
src_schema,
&mut input,
)?;
let group_by_results = eval_exprs(context, &self.group_by_exps, src_schema, &input)?;
let aggr_expr_results = eval_exprs(context, &entities.each_aggr_exprs, src_schema, &input)?;

// Stores input references, clone them when needed
let mut group_key = Vec::with_capacity(group_by_len);
let mut group_key_ref = Vec::with_capacity(group_by_len);
let mut group_start_row = 0;
for row_index in 0..rows_len {
for group_by_result in &group_by_results {
// Unwrap is fine because we have verified the group by expression before.
let group_column = group_by_result.vector_value().unwrap();
group_key.push(group_column.get_unchecked(row_index));
group_key_ref.push(group_column.get_scalar_ref(row_index));
}
match self.keys.rchunks_exact(group_by_len).next() {
Some(current_key) if &group_key[..] == current_key => {
group_key.clear();
Some(current_key) if &group_key_ref[..] == current_key => {
group_key_ref.clear();
}
_ => {
// Update the complete group
@@ -208,7 +214,8 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation

// create a new group
group_start_row = row_index;
self.keys.extend(group_key.drain(..).map(Into::into));
self.keys
.extend(group_key_ref.drain(..).map(ScalarValueRef::to_owned));
for aggr_fn in &entities.each_aggr_fn {
self.states.push(aggr_fn.create_state());
}
@@ -238,7 +245,7 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_each_group_for_partial_aggregation(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
@@ -259,7 +266,7 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation
.collect();
let aggr_fns_len = entities.each_aggr_fn.len();

// key and state ranges of complete groups
// key and state ranges of all available groups
let keys_range = ..number_of_groups * group_by_exps_len;
let states_range = ..number_of_groups * aggr_fns_len;

@@ -285,19 +292,19 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation
Ok(group_by_columns)
}

fn can_aggregate(&self, is_drained: bool) -> bool {
is_drained || self.keys.len() > self.group_by_exps.len()
fn is_partial_results_ready(&self, is_drained: bool) -> bool {
is_drained || AggregationExecutorImpl::<Src>::groups_len(self) >= 2
}
}

fn ensure_columns_decoded(
context: &mut EvalContext,
tz: &Tz,
exprs: &[RpnExpression],
schema: &[FieldType],
input: &mut LazyBatchColumnVec,
) -> Result<()> {
for expr in exprs {
expr.ensure_columns_decoded(context, schema, input)?;
expr.ensure_columns_decoded(tz, schema, input)?;
}
Ok(())
}
@@ -70,7 +70,10 @@ pub trait AggregationExecutorImpl<Src: BatchExecutor>: Send {
///
/// Implementors may return the content of each group as extra columns in the return value
/// if there are group by columns.
fn iterate_each_group_for_aggregation(
///
/// Implementors should not iterate the same group multiple times for the same partial
/// input data.
fn iterate_each_group_for_partial_aggregation(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

is_last_partial

This comment has been minimized.

Copy link
@sticnarf

sticnarf May 27, 2019

Author Contributor

I think src_is_drained may be better. "Last partial" doesn't make much sense to hash aggregation.

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

Partial means batch input (maybe some better name or explain it somewhere?) and we can treat hash agg as some executor that always generates output for the last partial.

@@ -79,9 +82,9 @@ pub trait AggregationExecutorImpl<Src: BatchExecutor>: Send {

/// Returns whether we can aggregate at the moment.
///
/// The default value is `is_drained`. Only StreamAgg can aggregate when not drained.
fn can_aggregate(&self, is_drained: bool) -> bool {
is_drained
/// The default value is `src_is_drained`. Only StreamAgg can aggregate when not drained.
fn is_partial_results_ready(&self, src_is_drained: bool) -> bool {

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

For stream agg, hash agg, simple agg, all of them, and maybe other kind of aggregation in future, when src_is_drained, partial results are always ready. Thus maybe we can simply extract the src_is_drained parameter from this function signature. Also, after the extraction looks like it would be better to leave this function without default impl, since this executor should have no knowledge about the implementor. We know that only one executor (stream agg) supports partial results, but this executor should not keep that knowledge.

src_is_drained
}
}

@@ -204,18 +207,18 @@ impl<Src: BatchExecutor, I: AggregationExecutorImpl<Src>> AggregationExecutor<Sr
.process_batch_input(&mut self.entities, src_result.data)?;
}

// Aggregate results if source executor is drained, otherwise just return nothing.
if self.imp.can_aggregate(src_is_drained) {
Ok((Some(self.aggregate(src_is_drained)?), src_is_drained))
if self.imp.is_partial_results_ready(src_is_drained) {
Ok((
Some(self.aggregate_partial_results(src_is_drained)?),
src_is_drained,
))
} else {
Ok((None, src_is_drained))
}
}

/// Generates aggregation results.
///
/// This function is ensured to be called at most once.
fn aggregate(&mut self, src_is_drained: bool) -> Result<LazyBatchColumnVec> {
/// Generates aggregation results of complete groups.
fn aggregate_partial_results(&mut self, src_is_drained: bool) -> Result<LazyBatchColumnVec> {
let groups_len = self.imp.groups_len();
let mut all_result_columns: Vec<_> = self
.entities
@@ -225,7 +228,7 @@ impl<Src: BatchExecutor, I: AggregationExecutorImpl<Src>> AggregationExecutor<Sr
.collect();

// Aggregate results for each group
let group_by_columns = self.imp.iterate_each_group_for_aggregation(
let group_by_columns = self.imp.iterate_each_group_for_partial_aggregation(
&mut self.entities,
src_is_drained,
|entities, states| {
@@ -285,18 +288,9 @@ impl<Src: BatchExecutor, I: AggregationExecutorImpl<Src>> BatchExecutor
is_drained: Err(e),
}
}
Ok((None, src_is_drained)) => {
self.is_ended = src_is_drained;
BatchExecuteResult {
data: LazyBatchColumnVec::empty(),
warnings: self.entities.context.take_warnings(),
is_drained: Ok(src_is_drained),
}
}
Ok((Some(data), src_is_drained)) => {
// When there is no error and there are some aggregation results,
// we return them as data.
Ok((data, src_is_drained)) => {
self.is_ended = src_is_drained;
let data = data.unwrap_or_else(LazyBatchColumnVec::empty);
BatchExecuteResult {
data,
warnings: self.entities.context.take_warnings(),
@@ -77,7 +77,8 @@ impl DAGBuilder {
}
}
ExecType::TypeStreamAgg => {
// FIXME: we'd better check whether the source executor is in order
// Note: We won't check whether the source of stream aggregation is in order.
// It is undefined behavior if the source is unordered.
let descriptor = ed.get_aggregation();
BatchStreamAggregationExecutor::check_supported(&descriptor).map_err(|e| {
Error::Other(box_err!(
@@ -8,6 +8,7 @@ use super::RpnFnCallPayload;
use crate::coprocessor::codec::batch::LazyBatchColumnVec;
use crate::coprocessor::codec::data_type::VectorLikeValueRef;
use crate::coprocessor::codec::data_type::{ScalarValue, VectorValue};
use crate::coprocessor::codec::mysql::time::Tz;
use crate::coprocessor::dag::expr::EvalContext;
use crate::coprocessor::Result;

@@ -132,15 +133,15 @@ impl RpnExpression {
// We iterate two times. The first time we decode all referred columns. The second time
// we evaluate. This is to make Rust's borrow checker happy because there will be
// mutable reference during the first iteration and we can't keep these references.
self.ensure_columns_decoded(context, schema, columns)?;
self.ensure_columns_decoded(&context.cfg.tz, schema, columns)?;
self.eval_unchecked(context, rows, schema, columns)
}

/// Evaluates the expression into a boolean vector.
///
/// # Panics
///
/// Panics if referenced columns are not decoded.
/// Panics if the expression is not valid.
///
/// Panics if the boolean vector output buffer is not large enough to contain all values.
pub fn eval_as_mysql_bools(
@@ -170,20 +171,35 @@ impl RpnExpression {
Ok(())
}

/// Decodes all referred columns which are not decoded. Then we ensure
/// all referred columns are decoded.
pub fn ensure_columns_decoded<'a>(

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

Missing doc comments

&'a self,
context: &mut EvalContext,
tz: &Tz,
schema: &'a [FieldType],
columns: &'a mut LazyBatchColumnVec,
) -> Result<()> {
for node in self.as_ref() {
if let RpnExpressionNode::ColumnRef { ref offset, .. } = node {
columns.ensure_column_decoded(*offset, &context.cfg.tz, &schema[*offset])?;
columns.ensure_column_decoded(*offset, tz, &schema[*offset])?;
}
}
Ok(())
}

/// Evaluates the expression into a vector.
///
/// It differs from `eval` in that `eval_unchecked` needn't receive a mutable reference
/// to `LazyBatchColumnVec`. However, since `eval_unchecked` doesn't decode columns,
/// it will panic if referred columns are not decoded.
///
/// # Panics
///
/// Panics if the expression is not valid.
///
/// Panics if referred columns are not decoded.
///
/// Panics when referenced column does not have equal length as specified in `rows`.
pub fn eval_unchecked<'a>(
This conversation was marked as resolved by sticnarf

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

shall we add an unsafe here to better catch mistakes? @kennytm

This comment has been minimized.

Copy link
@sticnarf

sticnarf May 27, 2019

Author Contributor

This function is safe. It will just panic when we need a decoded column while it is not.
The unchecked name may be misleading. Need some suggestions as well...

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

Actually I like this name pretty much 🤣 It's only problem is that it is not that "unsafe".

This conversation was marked as resolved by lonng

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

Please add doc comments to describe its behaviour.

This comment has been minimized.

Copy link
@lonng

lonng May 28, 2019

Member

Maybe eval_decoded is more appropriate here. The unchecked suffix used to unsafe code.

This comment has been minimized.

Copy link
@sticnarf

sticnarf May 28, 2019

Author Contributor

No strong opinion...What do you think? @breeswish

&'a self,
context: &mut EvalContext,
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.