Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: Batch Stream Aggregation Executor #4786

Merged
merged 23 commits into from May 29, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
42a60f7
Initialize stream aggr executor executor
sticnarf May 25, 2019
4cc8d43
Add unchecked RPN function eval
sticnarf May 26, 2019
7f5e827
Merge branch 'unchecked-eval' into stream-agg
sticnarf May 26, 2019
4c9d3b1
Finish process_batch_input part
sticnarf May 26, 2019
2586739
Use aggr framework
sticnarf May 27, 2019
e871872
Finish stream aggr tests
sticnarf May 27, 2019
b30f9d7
Add some more comments
sticnarf May 27, 2019
1214feb
Add checks for no aggregation expression
sticnarf May 27, 2019
fbeb91e
Make clippy happy
sticnarf May 27, 2019
6fdb4ad
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
0874736
Add no agg function tests for stream agg
sticnarf May 27, 2019
10ceccd
address comments
sticnarf May 27, 2019
24c0196
Merge branch 'master' into stream-agg
sticnarf May 27, 2019
75ebc25
Fix panic when there is no aggr function
sticnarf May 27, 2019
70b0d8b
Merge branch 'stream-agg' of https://github.com/sticnarf/tikv into st…
sticnarf May 27, 2019
8b8dc9e
Not include src_is_drained in is_partial_results_ready
sticnarf May 28, 2019
e80022c
Rename iterate_each_group_for_partial_aggregation to iterate_availabl…
sticnarf May 28, 2019
181f35d
Merge branch 'master' into stream-agg
sticnarf May 28, 2019
4050b55
Merge branch 'master' into stream-agg
breezewish May 28, 2019
fb76ff8
Merge branch 'master' into stream-agg
breezewish May 28, 2019
fd97342
Merge branch 'master' into stream-agg
breezewish May 28, 2019
8aa769a
Merge branch 'master' into stream-agg
breezewish May 29, 2019
eb9e483
Merge branch 'master' into stream-agg
breezewish May 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/coprocessor/codec/data_type/mod.rs
Expand Up @@ -14,7 +14,7 @@ pub type Bytes = Vec<u8>;
pub use crate::coprocessor::codec::mysql::{Decimal, Duration, Json, Time as DateTime};

// Dynamic eval types.
pub use self::scalar::ScalarValue;
pub use self::scalar::{ScalarValue, ScalarValueRef};
pub use self::vector::{VectorValue, VectorValueExt};
pub use self::vector_like::{VectorLikeValueRef, VectorLikeValueRefSpecialized};

Expand Down
34 changes: 34 additions & 0 deletions src/coprocessor/codec/data_type/scalar.rs
Expand Up @@ -28,6 +28,40 @@ pub enum ScalarValue {
Json(Option<super::Json>),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ScalarValueRef<'a> {
sticnarf marked this conversation as resolved.
Show resolved Hide resolved
Int(&'a Option<super::Int>),
Real(&'a Option<super::Real>),
Decimal(&'a Option<super::Decimal>),
Bytes(&'a Option<super::Bytes>),
DateTime(&'a Option<super::DateTime>),
Duration(&'a Option<super::Duration>),
Json(&'a Option<super::Json>),
}

impl<'a> ScalarValueRef<'a> {
#[inline]
#[allow(clippy::clone_on_copy)]
pub fn to_owned(self) -> ScalarValue {
breezewish marked this conversation as resolved.
Show resolved Hide resolved
match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
}
}
}
}

impl<'a> PartialEq<ScalarValue> for ScalarValueRef<'a> {
fn eq(&self, other: &ScalarValue) -> bool {
match_template_evaluable! {
TT, match (self, other) {
(ScalarValueRef::TT(v1), ScalarValue::TT(v2)) => v1 == &v2,
_ => false
}
}
}
}

impl ScalarValue {
#[inline]
pub fn eval_type(&self) -> EvalType {
Expand Down
15 changes: 15 additions & 0 deletions src/coprocessor/codec/data_type/vector.rs
Expand Up @@ -6,6 +6,7 @@ use cop_datatype::{EvalType, FieldTypeAccessor, FieldTypeFlag, FieldTypeTp};
use tipb::expression::FieldType;

use super::*;
use crate::coprocessor::codec::data_type::scalar::ScalarValueRef;
use crate::coprocessor::codec::datum;
use crate::coprocessor::codec::mysql::Tz;
use crate::coprocessor::codec::{Error, Result};
Expand Down Expand Up @@ -175,6 +176,20 @@ impl VectorValue {
Ok(())
}

/// Returns a `ScalarValueRef` to the element at the index.
///
/// # Panics
///
/// Panics if index is out of range.
#[inline]
pub fn get_scalar_ref(&self, index: usize) -> ScalarValueRef<'_> {
match_template_evaluable! {
TT, match self {
VectorValue::TT(v) => ScalarValueRef::TT(&v[index]),
}
}
}

/// Pushes a value into the column by decoding the datum and converting to current
/// column's type.
///
Expand Down
11 changes: 10 additions & 1 deletion src/coprocessor/dag/batch/executors/fast_hash_aggr_executor.rs
Expand Up @@ -262,11 +262,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for FastHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let aggr_fns_len = entities.each_aggr_fn.len();
let mut group_by_column = LazyBatchColumn::decoded_with_capacity_and_tp(
self.groups.len(),
Expand All @@ -290,6 +293,12 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for FastHashAggregationImp

Ok(vec![group_by_column])
}

/// Fast hash aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

fn calc_groups_each_row<T: Evaluable + Eq + std::hash::Hash>(
Expand Down
2 changes: 2 additions & 0 deletions src/coprocessor/dag/batch/executors/mod.rs
Expand Up @@ -6,6 +6,7 @@ mod limit_executor;
mod selection_executor;
mod simple_aggr_executor;
mod slow_hash_aggr_executor;
mod stream_aggr_executor;
mod table_scan_executor;
mod util;

Expand All @@ -15,4 +16,5 @@ pub use self::limit_executor::BatchLimitExecutor;
pub use self::selection_executor::BatchSelectionExecutor;
pub use self::simple_aggr_executor::BatchSimpleAggregationExecutor;
pub use self::slow_hash_aggr_executor::BatchSlowHashAggregationExecutor;
pub use self::stream_aggr_executor::BatchStreamAggregationExecutor;
pub use self::table_scan_executor::BatchTableScanExecutor;
10 changes: 9 additions & 1 deletion src/coprocessor/dag/batch/executors/simple_aggr_executor.rs
Expand Up @@ -163,14 +163,22 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SimpleAggregationImpl
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);
iteratee(entities, &self.states)?;
Ok(Vec::new())
}

/// Simple aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

#[cfg(test)]
Expand Down
11 changes: 10 additions & 1 deletion src/coprocessor/dag/batch/executors/slow_hash_aggr_executor.rs
Expand Up @@ -255,11 +255,14 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SlowHashAggregationImp
}

#[inline]
fn iterate_each_group_for_aggregation(
fn iterate_available_groups(
&mut self,
entities: &mut Entities<Src>,
src_is_drained: bool,
mut iteratee: impl FnMut(&mut Entities<Src>, &[Box<dyn AggrFunctionState>]) -> Result<()>,
) -> Result<Vec<LazyBatchColumn>> {
assert!(src_is_drained);

let number_of_groups = self.groups.len();
let group_by_exps_len = self.group_by_exps.len();
let mut group_by_columns: Vec<_> = self
Expand Down Expand Up @@ -287,6 +290,12 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for SlowHashAggregationImp

Ok(group_by_columns)
}

/// Slow hash aggregation can output aggregate results only if the source is drained.
#[inline]
fn is_partial_results_ready(&self) -> bool {
false
}
}

#[cfg(test)]
Expand Down