Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Top N Executor #4825

Merged
merged 7 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/coprocessor/codec/batch/lazy_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ impl LazyBatchColumn {
LazyBatchColumn::Decoded(VectorValue::with_capacity(capacity, eval_tp))
}

/// Creates a new empty `LazyBatchColumn` with the same schema.
#[inline]
pub fn clone_empty(&self, capacity: usize) -> Self {
match self {
LazyBatchColumn::Raw(_) => Self::raw_with_capacity(capacity),
LazyBatchColumn::Decoded(v) => LazyBatchColumn::Decoded(v.clone_empty(capacity)),
}
}

#[inline]
pub fn is_raw(&self) -> bool {
match self {
Expand Down
12 changes: 12 additions & 0 deletions src/coprocessor/codec/batch/lazy_column_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ impl LazyBatchColumnVec {
}
}

/// Creates a new empty `LazyBatchColumnVec` with the same number of columns and schema.
#[inline]
pub fn clone_empty(&self, capacity: usize) -> Self {
Self {
columns: self
.columns
.iter()
.map(|c| c.clone_empty(capacity))
.collect(),
}
}

/// Creates a new `LazyBatchColumnVec`, which contains `columns_count` number of raw columns.
#[cfg(test)]
pub fn with_raw_columns(columns_count: usize) -> Self {
Expand Down
115 changes: 82 additions & 33 deletions src/coprocessor/codec/data_type/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp::Ordering;

use cop_datatype::EvalType;

use super::*;
Expand Down Expand Up @@ -28,54 +30,29 @@ pub enum ScalarValue {
Json(Option<super::Json>),
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ScalarValueRef<'a> {
Int(&'a Option<super::Int>),
Real(&'a Option<super::Real>),
Decimal(&'a Option<super::Decimal>),
Bytes(&'a Option<super::Bytes>),
DateTime(&'a Option<super::DateTime>),
Duration(&'a Option<super::Duration>),
Json(&'a Option<super::Json>),
}

impl<'a> ScalarValueRef<'a> {
impl ScalarValue {
#[inline]
#[allow(clippy::clone_on_copy)]
pub fn to_owned(self) -> ScalarValue {
pub fn eval_type(&self) -> EvalType {
match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
ScalarValue::TT(_) => EvalType::TT,
}
}
}
}

impl<'a> PartialEq<ScalarValue> for ScalarValueRef<'a> {
fn eq(&self, other: &ScalarValue) -> bool {
match_template_evaluable! {
TT, match (self, other) {
(ScalarValueRef::TT(v1), ScalarValue::TT(v2)) => v1 == &v2,
_ => false
}
}
#[inline]
pub fn as_vector_like(&self) -> VectorLikeValueRef<'_> {
VectorLikeValueRef::Scalar(self)
}
}

impl ScalarValue {
#[inline]
pub fn eval_type(&self) -> EvalType {
pub fn as_scalar_value_ref(&self) -> ScalarValueRef<'_> {
match_template_evaluable! {
TT, match self {
ScalarValue::TT(_) => EvalType::TT,
ScalarValue::TT(v) => ScalarValueRef::TT(v),
}
}
}

#[inline]
pub fn as_vector_like(&self) -> VectorLikeValueRef<'_> {
VectorLikeValueRef::Scalar(self)
}
}

impl AsMySQLBool for ScalarValue {
Expand Down Expand Up @@ -163,3 +140,75 @@ impl From<f64> for ScalarValue {
ScalarValue::Real(Real::new(s).ok())
}
}

/// A scalar value reference container. Can be created from `ScalarValue` or `VectorValue`.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ScalarValueRef<'a> {
Int(&'a Option<super::Int>),
Real(&'a Option<super::Real>),
Decimal(&'a Option<super::Decimal>),
Bytes(&'a Option<super::Bytes>),
DateTime(&'a Option<super::DateTime>),
Duration(&'a Option<super::Duration>),
Json(&'a Option<super::Json>),
}

impl<'a> ScalarValueRef<'a> {
#[inline]
#[allow(clippy::clone_on_copy)]
pub fn to_owned(self) -> ScalarValue {
match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(v) => ScalarValue::TT(v.clone()),
}
}
}

#[inline]
pub fn eval_type(&self) -> EvalType {
match_template_evaluable! {
TT, match self {
ScalarValueRef::TT(_) => EvalType::TT,
}
}
}
}

impl<'a> Ord for ScalarValueRef<'a> {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other)
.expect("Cannot compare two ScalarValueRef in different type")
}
}

impl<'a> PartialOrd for ScalarValueRef<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match_template_evaluable! {
TT, match (self, other) {
// v1 and v2 are `Option<T>`. However, in MySQL NULL values are considered lower
// than any non-NULL value, so using `Option::PartialOrd` directly is fine.
(ScalarValueRef::TT(v1), ScalarValueRef::TT(v2)) => Some(v1.cmp(v2)),
_ => None,
}
}
}
}

impl<'a> Eq for ScalarValueRef<'a> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the Eq in derive directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch


impl<'a> PartialEq<ScalarValue> for ScalarValueRef<'a> {
fn eq(&self, other: &ScalarValue) -> bool {
match_template_evaluable! {
TT, match (self, other) {
(ScalarValueRef::TT(v1), ScalarValue::TT(v2)) => v1 == &v2,
_ => false
}
}
}
}

impl<'a> PartialEq<ScalarValueRef<'a>> for ScalarValue {
fn eq(&self, other: &ScalarValueRef<'_>) -> bool {
other == self
}
}
14 changes: 12 additions & 2 deletions src/coprocessor/codec/data_type/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
use std::convert::{TryFrom, TryInto};

use cop_datatype::{EvalType, FieldTypeAccessor, FieldTypeFlag, FieldTypeTp};
use tikv_util::codec::{bytes, number};
use tipb::expression::FieldType;

use super::*;
use crate::coprocessor::codec::data_type::scalar::ScalarValueRef;
use crate::coprocessor::codec::datum;
use crate::coprocessor::codec::mysql::Tz;
use crate::coprocessor::codec::{Error, Result};
use tikv_util::codec::{bytes, number};

/// A vector value container, a.k.a. column, for all concrete eval types.
///
Expand Down Expand Up @@ -52,6 +52,16 @@ impl VectorValue {
}
}

/// Creates a new empty `VectorValue` with the same eval type.
#[inline]
pub fn clone_empty(&self, capacity: usize) -> Self {
match_template_evaluable! {
TT, match self {
VectorValue::TT(_) => VectorValue::TT(Vec::with_capacity(capacity)),
}
}
}

/// Returns the `EvalType` used to construct current column.
#[inline]
pub fn eval_type(&self) -> EvalType {
Expand Down Expand Up @@ -175,7 +185,7 @@ impl VectorValue {
Ok(())
}

/// Returns a `ScalarValueRef` to the element at the index.
/// Gets a reference of the element in corresponding index.
///
/// # Panics
///
Expand Down
2 changes: 2 additions & 0 deletions src/coprocessor/dag/batch/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod simple_aggr_executor;
mod slow_hash_aggr_executor;
mod stream_aggr_executor;
mod table_scan_executor;
mod top_n_executor;
mod util;

pub use self::fast_hash_aggr_executor::BatchFastHashAggregationExecutor;
Expand All @@ -18,3 +19,4 @@ pub use self::simple_aggr_executor::BatchSimpleAggregationExecutor;
pub use self::slow_hash_aggr_executor::BatchSlowHashAggregationExecutor;
pub use self::stream_aggr_executor::BatchStreamAggregationExecutor;
pub use self::table_scan_executor::BatchTableScanExecutor;
pub use self::top_n_executor::BatchTopNExecutor;
8 changes: 2 additions & 6 deletions src/coprocessor/dag/batch/executors/stream_aggr_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ impl BatchStreamAggregationExecutor<Box<dyn BatchExecutor>> {
assert!(!group_by_definitions.is_empty());
for def in group_by_definitions {
RpnExpressionBuilder::check_expr_tree_supported(def)?;
if RpnExpressionBuilder::is_expr_eval_to_scalar(def)? {
return Err(box_err!("Group by expression cannot be a scalar"));
}
// Works for both vector and scalar. No need to check as other aggregation executor.
}

let aggr_definitions = descriptor.get_agg_func();
Expand Down Expand Up @@ -191,9 +189,7 @@ impl<Src: BatchExecutor> AggregationExecutorImpl<Src> for BatchStreamAggregation
let mut group_start_row = 0;
for row_index in 0..rows_len {
for group_by_result in &group_by_results {
// Unwrap is fine because we have verified the group by expression before.
let group_column = group_by_result.vector_value().unwrap();
group_key_ref.push(group_column.get_scalar_ref(row_index));
group_key_ref.push(group_by_result.get_scalar_ref(row_index));
}
match self.keys.rchunks_exact(group_by_len).next() {
Some(current_key) if &group_key_ref[..] == current_key => {
Expand Down
Loading