Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: Batch Stream Aggregation Executor #4786

Merged
merged 23 commits into from May 29, 2019
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
42a60f7
Initialize stream aggr executor executor
sticnarf May 25, 2019
4cc8d43
Add unchecked RPN function eval
sticnarf May 26, 2019
7f5e827
Merge branch 'unchecked-eval' into stream-agg
sticnarf May 26, 2019
4c9d3b1
Finish process_batch_input part
sticnarf May 26, 2019
2586739
Use aggr framework
sticnarf May 27, 2019
e871872
Finish stream aggr tests
sticnarf May 27, 2019
b30f9d7
Add some more comments
sticnarf May 27, 2019
1214feb
Add checks for no aggregation expression
sticnarf May 27, 2019
fbeb91e
Make clippy happy
sticnarf May 27, 2019
6fdb4ad
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
0874736
Add no agg function tests for stream agg
sticnarf May 27, 2019
10ceccd
address comments
sticnarf May 27, 2019
24c0196
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
75ebc25
Fix panic when there is no aggr function
sticnarf May 27, 2019
70b0d8b
Merge branch 'stream-agg' of https://github.com/sticnarf/tikv into st…
sticnarf May 27, 2019
8b8dc9e
Not include src_is_drained in is_partial_results_ready
sticnarf May 28, 2019
e80022c
Rename iterate_each_group_for_partial_aggregation to iterate_availabl…
sticnarf May 28, 2019
181f35d
Merge branch 'master' into stream-agg
sticnarf May 28, 2019
4050b55
Merge branch 'master' into stream-agg
breeswish May 28, 2019
fb76ff8
Merge branch 'master' into stream-agg
breeswish May 28, 2019
fd97342
Merge branch 'master' into stream-agg
breeswish May 28, 2019
8aa769a
Merge branch 'master' into stream-agg
breeswish May 29, 2019
eb9e483
Merge branch 'master' into stream-agg
breeswish May 29, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -14,7 +14,7 @@ pub type Bytes = Vec<u8>;
pub use crate::coprocessor::codec::mysql::{Decimal, Duration, Json, Time as DateTime};

// Dynamic eval types.
pub use self::scalar::ScalarValue;
pub use self::scalar::{ScalarValue, ScalarValueRef};
pub use self::vector::{VectorValue, VectorValueExt};
pub use self::vector_like::{VectorLikeValueRef, VectorLikeValueRefSpecialized};

@@ -28,6 +28,40 @@ pub enum ScalarValue {
Json(Option<super::Json>),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ScalarValueRef<'a> {
This conversation was marked as resolved by sticnarf

This comment has been minimized.

Copy link
@breeswish

breeswish May 27, 2019

Member

derive Copy, Clone

Int(&'a Option<super::Int>),
Real(&'a Option<super::Real>),
Decimal(&'a Option<super::Decimal>),
Bytes(&'a Option<super::Bytes>),
DateTime(&'a Option<super::DateTime>),
Duration(&'a Option<super::Duration>),
Json(&'a Option<super::Json>),
}

impl<'a> ScalarValueRef<'a> {
#[inline]
#[allow(clippy::clone_on_copy)]
pub fn to_owned(self) -> ScalarValue {
This conversation was marked as resolved by breeswish

This comment has been minimized.

Copy link
@lonng

lonng May 28, 2019

Member

The semantic is more like the trait Into 🤔 ? Is implement Into trait is more appropriate?

This comment has been minimized.

Copy link
@breeswish

This comment has been minimized.

Copy link
@lonng

lonng May 28, 2019

Member

OK, sorry about this.

match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
}
}
}
}

impl<'a> PartialEq<ScalarValue> for ScalarValueRef<'a> {
fn eq(&self, other: &ScalarValue) -> bool {
match_template_evaluable! {
TT, match (self, other) {
(ScalarValueRef::TT(v1), ScalarValue::TT(v2)) => v1 == &v2,
_ => false
}
}
}
}

impl ScalarValue {
#[inline]
pub fn eval_type(&self) -> EvalType {
@@ -6,6 +6,7 @@ use cop_datatype::{EvalType, FieldTypeAccessor, FieldTypeFlag, FieldTypeTp};
use tipb::expression::FieldType;

use super::*;
use crate::coprocessor::codec::data_type::scalar::ScalarValueRef;
use crate::coprocessor::codec::datum;
use crate::coprocessor::codec::mysql::Tz;
use crate::coprocessor::codec::{Error, Result};
@@ -175,6 +176,20 @@ impl VectorValue {
Ok(())
}

/// Returns a `ScalarValueRef` to the element at the index.
///
/// # Panics
///
/// Panics if index is out of range.
#[inline]
pub fn get_scalar_ref(&self, index: usize) -> ScalarValueRef<'_> {
match_template_evaluable! {
TT, match self {
VectorValue::TT(v) => ScalarValueRef::TT(&v[index]),
}
}
}

/// Pushes a value into the column by decoding the datum and converting to current
/// column's type.
///
@@ -262,11 +262,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for FastHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let aggr_fns_len = entities.each_aggr_fn.len();
let mut group_by_column = LazyBatchColumn::decoded_with_capacity_and_tp(
self.groups.len(),
@@ -290,6 +293,12 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for FastHashAggregationImp

Ok(vec![group_by_column])
}

/// Fast hash aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

fn calc_groups_each_row<T: Evaluable + Eq + std::hash::Hash>(
@@ -6,6 +6,7 @@ mod limit_executor;
mod selection_executor;
mod simple_aggr_executor;
mod slow_hash_aggr_executor;
mod stream_aggr_executor;
mod table_scan_executor;
mod util;

@@ -15,4 +16,5 @@ pub use self::limit_executor::BatchLimitExecutor;
pub use self::selection_executor::BatchSelectionExecutor;
pub use self::simple_aggr_executor::BatchSimpleAggregationExecutor;
pub use self::slow_hash_aggr_executor::BatchSlowHashAggregationExecutor;
pub use self::stream_aggr_executor::BatchStreamAggregationExecutor;
pub use self::table_scan_executor::BatchTableScanExecutor;
@@ -163,14 +163,22 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SimpleAggregationImpl
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);
iteratee(entities, &self.states)?;
Ok(Vec::new())
}

/// Simple aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

#[cfg(test)]
@@ -255,11 +255,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SlowHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let number_of_groups = self.groups.len();
let group_by_exps_len = self.group_by_exps.len();
let mut group_by_columns: Vec<_> = self
@@ -287,6 +290,12 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SlowHashAggregationImp

Ok(group_by_columns)
}

/// Slow hash aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

#[cfg(test)]
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.