Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor/dag: use expression in dag #2261

Merged
merged 31 commits into from
Sep 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
aff0c62
expr/builtin_cast: implement eval for expression
AndreMouche Aug 29, 2017
71035a2
Merge branch 'master' into shirly/dag_expr
AndreMouche Aug 29, 2017
4cffa6d
expr/builtin_cast: remove offset check in build
AndreMouche Aug 29, 2017
8a583bc
expr/builtin_cast: merge master && fix bug
AndreMouche Aug 29, 2017
5b4c651
expr/mod.rs: address comments
AndreMouche Aug 29, 2017
2662df1
expr/mod.rs: use new expression in selection
AndreMouche Aug 29, 2017
0c080df
dag/executor: use expression in aggregation
AndreMouche Aug 30, 2017
4f24fd7
expr/mod.rs: use new expression in topn
AndreMouche Aug 30, 2017
7187e19
merge master && fix conflicts
AndreMouche Aug 31, 2017
877aced
merge master
AndreMouche Sep 7, 2017
38ae0bd
dag/executor: refactor
AndreMouche Sep 7, 2017
93d4005
Merge branch 'shirly/dag_expression' of github.com:pingcap/tikv into …
AndreMouche Sep 7, 2017
719a18e
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 8, 2017
b2a7065
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 8, 2017
64f6b6b
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 12, 2017
60a0694
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 13, 2017
9adaaf7
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 14, 2017
88bd51f
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 15, 2017
3ffee7e
executor/*: address comments
AndreMouche Sep 15, 2017
134f3bd
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 15, 2017
95e043b
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 18, 2017
a734f13
dag/expr: remove unuseful code
AndreMouche Sep 18, 2017
3a1d004
executor/aggregation: address comments
AndreMouche Sep 18, 2017
3e9b604
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 19, 2017
09c0fa2
executor/aggregation: address comments
AndreMouche Sep 19, 2017
2355555
dag/*: address comments
AndreMouche Sep 19, 2017
465ef5b
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 20, 2017
cc2bb05
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 22, 2017
e68aaa9
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 22, 2017
366c9c7
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 22, 2017
444a357
Merge branch 'master' into shirly/dag_expression
AndreMouche Sep 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 57 additions & 17 deletions src/coprocessor/dag/executor/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,66 @@ use std::rc::Rc;

use tipb::schema::ColumnInfo;
use tipb::executor::Aggregation;
use tipb::expression::Expr;
use tipb::expression::{Expr, ExprType};
use util::collections::{HashMap, HashMapEntry as Entry};

use coprocessor::codec::table::RowColsDict;
use coprocessor::codec::datum::{self, approximate_size, Datum, DatumEncoder};
use coprocessor::endpoint::SINGLE_GROUP;
use coprocessor::select::aggregate::{self, AggrFunc};
use coprocessor::select::xeval::{EvalContext, Evaluator};
use coprocessor::select::xeval::EvalContext;
use coprocessor::dag::expr::Expression;
use coprocessor::metrics::*;
use coprocessor::Result;

use super::{inflate_with_col_for_dag, Executor, ExprColumnRefVisitor, Row};

struct AggrFuncExpr {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be merged with AggrFunc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

address comments

args: Vec<Expression>,
tp: ExprType,
}

impl AggrFuncExpr {
fn batch_build(ctx: &EvalContext, expr: Vec<Expr>) -> Result<Vec<AggrFuncExpr>> {
let res: Vec<AggrFuncExpr> = try!(
expr.into_iter()
.map(|v| AggrFuncExpr::build(ctx, v))
.collect()
);
Ok(res)
}

fn build(ctx: &EvalContext, mut expr: Expr) -> Result<AggrFuncExpr> {
let args = box_try!(Expression::batch_build(
ctx,
expr.take_children().into_vec()
));
let tp = expr.get_tp();
Ok(AggrFuncExpr { args: args, tp: tp })
}

fn eval_args(&self, ctx: &EvalContext, row: &[Datum]) -> Result<Vec<Datum>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/eval_args/eval/

let res: Vec<Datum> = box_try!(self.args.iter().map(|v| v.eval(ctx, row)).collect());
Ok(res)
}
}

impl AggrFunc {
fn update_with_expr(
&mut self,
ctx: &EvalContext,
expr: &AggrFuncExpr,
row: &[Datum],
) -> Result<()> {
let vals = try!(expr.eval_args(ctx, row));
try!(self.update(ctx, vals));
Ok(())
}
}

pub struct AggregationExecutor<'a> {
group_by: Vec<Expr>,
aggr_func: Vec<Expr>,
group_by: Vec<Expression>,
aggr_func: Vec<AggrFuncExpr>,
group_keys: Vec<Rc<Vec<u8>>>,
group_key_aggrs: HashMap<Rc<Vec<u8>>, Vec<Box<AggrFunc>>>,
cursor: usize,
Expand Down Expand Up @@ -58,8 +102,8 @@ impl<'a> AggregationExecutor<'a> {
.with_label_values(&["aggregation"])
.inc();
Ok(AggregationExecutor {
group_by: group_by,
aggr_func: aggr_func,
group_by: box_try!(Expression::batch_build(ctx.as_ref(), group_by)),
aggr_func: try!(AggrFuncExpr::batch_build(ctx.as_ref(), aggr_func)),
group_keys: vec![],
group_key_aggrs: map![],
cursor: 0,
Expand All @@ -71,14 +115,14 @@ impl<'a> AggregationExecutor<'a> {
})
}

fn get_group_key(&mut self, eval: &mut Evaluator) -> Result<Vec<u8>> {
fn get_group_key(&self, row: &[Datum]) -> Result<Vec<u8>> {
if self.group_by.is_empty() {
let single_group = Datum::Bytes(SINGLE_GROUP.to_vec());
return Ok(box_try!(datum::encode_value(&[single_group])));
}
let mut vals = Vec::with_capacity(self.group_by.len());
for expr in &self.group_by {
let v = box_try!(eval.eval(&self.ctx, expr));
let v = box_try!(expr.eval(&self.ctx, row));
vals.push(v);
}
let res = box_try!(datum::encode_value(&vals));
Expand All @@ -87,23 +131,20 @@ impl<'a> AggregationExecutor<'a> {

fn aggregate(&mut self) -> Result<()> {
while let Some(row) = try!(self.src.next()) {
let mut eval = Evaluator::default();
try!(inflate_with_col_for_dag(
&mut eval,
let cols = try!(inflate_with_col_for_dag(
&self.ctx,
&row.data,
self.cols.clone(),
&self.related_cols_offset,
row.handle
));
let group_key = Rc::new(try!(self.get_group_key(&mut eval)));
let group_key = Rc::new(try!(self.get_group_key(&cols)));
match self.group_key_aggrs.entry(group_key.clone()) {
Entry::Vacant(e) => {
let mut aggrs = Vec::with_capacity(self.aggr_func.len());
for expr in &self.aggr_func {
let mut aggr = try!(aggregate::build_aggr_func(expr));
let vals = box_try!(eval.batch_eval(&self.ctx, expr.get_children()));
try!(aggr.update(&self.ctx, vals));
let mut aggr = try!(aggregate::build_aggr_func(expr.tp));
try!(aggr.update_with_expr(&self.ctx, expr, &cols));
aggrs.push(aggr);
}
self.group_keys.push(group_key);
Expand All @@ -112,8 +153,7 @@ impl<'a> AggregationExecutor<'a> {
Entry::Occupied(e) => {
let aggrs = e.into_mut();
for (expr, aggr) in self.aggr_func.iter().zip(aggrs) {
let vals = box_try!(eval.batch_eval(&self.ctx, expr.get_children()));
box_try!(aggr.update(&self.ctx, vals));
try!(aggr.update_with_expr(&self.ctx, expr, &cols));
}
}
}
Expand Down
46 changes: 22 additions & 24 deletions src/coprocessor/dag/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::rc::Rc;
use util::codec::number::NumberDecoder;
use tipb::expression::{Expr, ExprType};
use tipb::schema::ColumnInfo;
use util::collections::{HashMapEntry as Entry, HashSet};
use util::collections::HashSet;

use coprocessor::codec::mysql;
use coprocessor::codec::datum::Datum;
use coprocessor::codec::table::{RowColsDict, TableDecoder};
use coprocessor::endpoint::get_pk;
use coprocessor::select::xeval::{EvalContext, Evaluator};
use coprocessor::select::xeval::EvalContext;
use coprocessor::{Error, Result};

mod scanner;
Expand Down Expand Up @@ -104,35 +104,33 @@ pub trait Executor {
}

pub fn inflate_with_col_for_dag(
eval: &mut Evaluator,
ctx: &EvalContext,
values: &RowColsDict,
columns: Rc<Vec<ColumnInfo>>,
offsets: &[usize],
h: i64,
) -> Result<()> {
) -> Result<Vec<Datum>> {
let mut res = vec![Datum::Null; columns.len()];
for offset in offsets {
let col = columns.get(*offset).unwrap();
if let Entry::Vacant(e) = eval.row.entry(*offset as i64) {
if col.get_pk_handle() {
let v = get_pk(col, h);
e.insert(v);
} else {
let col_id = col.get_column_id();
let value = match values.get(col_id) {
None if col.has_default_val() => {
// TODO: optimize it to decode default value only once.
box_try!(col.get_default_val().decode_col_value(ctx, col))
}
None if mysql::has_not_null_flag(col.get_flag() as u64) => {
return Err(box_err!("column {} of {} is missing", col_id, h));
}
None => Datum::Null,
Some(mut bs) => box_try!(bs.decode_col_value(ctx, col)),
};
e.insert(value);
}
if col.get_pk_handle() {
let v = get_pk(col, h);
res[*offset] = v;
} else {
let col_id = col.get_column_id();
let value = match values.get(col_id) {
None if col.has_default_val() => {
// TODO: optimize it to decode default value only once.
box_try!(col.get_default_val().decode_col_value(ctx, col))
}
None if mysql::has_not_null_flag(col.get_flag() as u64) => {
return Err(box_err!("column {} of {} is missing", col_id, h));
}
None => Datum::Null,
Some(mut bs) => box_try!(bs.decode_col_value(ctx, col)),
};
res[*offset] = value;
}
}
Ok(())
Ok(res)
}
24 changes: 12 additions & 12 deletions src/coprocessor/dag/executor/selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ use std::rc::Rc;

use tipb::executor::Selection;
use tipb::schema::ColumnInfo;
use tipb::expression::Expr;

use coprocessor::metrics::*;
use coprocessor::select::xeval::{EvalContext, Evaluator};
use coprocessor::select::xeval::EvalContext;
use coprocessor::dag::expr::Expression;
use coprocessor::Result;

use super::{inflate_with_col_for_dag, Executor, ExprColumnRefVisitor, Row};

pub struct SelectionExecutor<'a> {
conditions: Vec<Expr>,
conditions: Vec<Expression>,
cols: Rc<Vec<ColumnInfo>>,
related_cols_offset: Vec<usize>, // offset of related columns
ctx: Rc<EvalContext>,
Expand All @@ -43,7 +43,7 @@ impl<'a> SelectionExecutor<'a> {
try!(visitor.batch_visit(&conditions));
COPR_EXECUTOR_COUNT.with_label_values(&["selection"]).inc();
Ok(SelectionExecutor {
conditions: conditions,
conditions: box_try!(Expression::batch_build(ctx.as_ref(), conditions)),
cols: columns_info,
related_cols_offset: visitor.column_offsets(),
ctx: ctx,
Expand All @@ -56,17 +56,15 @@ impl<'a> SelectionExecutor<'a> {
impl<'a> Executor for SelectionExecutor<'a> {
fn next(&mut self) -> Result<Option<Row>> {
'next: while let Some(row) = try!(self.src.next()) {
let mut evaluator = Evaluator::default();
try!(inflate_with_col_for_dag(
&mut evaluator,
let cols = try!(inflate_with_col_for_dag(
&self.ctx,
&row.data,
self.cols.clone(),
&self.related_cols_offset,
row.handle
));
for expr in &self.conditions {
let val = box_try!(evaluator.eval(&self.ctx, expr));
for filter in &self.conditions {
let val = box_try!(filter.eval(&self.ctx, &cols));
if !box_try!(val.into_bool(&self.ctx)).unwrap_or(false) {
continue 'next;
}
Expand All @@ -84,7 +82,7 @@ mod tests {
use kvproto::kvrpcpb::IsolationLevel;
use protobuf::RepeatedField;
use tipb::executor::TableScan;
use tipb::expression::{Expr, ExprType};
use tipb::expression::{Expr, ExprType, ScalarFuncSig};

use coprocessor::codec::mysql::types;
use coprocessor::codec::datum::Datum;
Expand All @@ -98,7 +96,8 @@ mod tests {

fn new_const_expr() -> Expr {
let mut expr = Expr::new();
expr.set_tp(ExprType::NullEQ);
expr.set_tp(ExprType::ScalarFunc);
expr.set_sig(ScalarFuncSig::NullEQInt);
expr.mut_children().push({
let mut lhs = Expr::new();
lhs.set_tp(ExprType::Null);
Expand All @@ -114,7 +113,8 @@ mod tests {

fn new_col_gt_u64_expr(offset: i64, val: u64) -> Expr {
let mut expr = Expr::new();
expr.set_tp(ExprType::GT);
expr.set_tp(ExprType::ScalarFunc);
expr.set_sig(ScalarFuncSig::GTInt);
expr.mut_children().push({
let mut lhs = Expr::new();
lhs.set_tp(ExprType::ColumnRef);
Expand Down
45 changes: 33 additions & 12 deletions src/coprocessor/dag/executor/topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,42 @@ use tipb::executor::TopN;
use tipb::schema::ColumnInfo;
use tipb::expression::ByItem;

use coprocessor::codec::datum::Datum;
use coprocessor::Result;
use coprocessor::select::xeval::{EvalContext, Evaluator};
use coprocessor::select::xeval::EvalContext;
use coprocessor::dag::expr::Expression;
use coprocessor::select::topn_heap::{SortRow, TopNHeap};
use coprocessor::metrics::*;

use super::{inflate_with_col_for_dag, Executor, ExprColumnRefVisitor, Row};

struct OrderBy {
items: Rc<Vec<ByItem>>,
exprs: Vec<Expression>,
}

impl OrderBy {
fn new(ctx: &EvalContext, mut order_by: Vec<ByItem>) -> Result<OrderBy> {
let exprs: Vec<Expression> = box_try!(
order_by
.iter_mut()
.map(|v| Expression::build(ctx, v.take_expr()))
.collect()
);
Ok(OrderBy {
items: Rc::new(order_by),
exprs: exprs,
})
}

fn eval(&self, ctx: &EvalContext, row: &[Datum]) -> Result<Vec<Datum>> {
let res: Vec<Datum> = box_try!(self.exprs.iter().map(|v| v.eval(ctx, row)).collect());
Ok(res)
}
}

pub struct TopNExecutor<'a> {
order_by: Rc<Vec<ByItem>>,
order_by: OrderBy,
cols: Rc<Vec<ColumnInfo>>,
related_cols_offset: Vec<usize>, // offset of related columns
heap: Option<TopNHeap>,
Expand All @@ -52,7 +79,7 @@ impl<'a> TopNExecutor<'a> {

COPR_EXECUTOR_COUNT.with_label_values(&["topn"]).inc();
Ok(TopNExecutor {
order_by: Rc::new(order_by),
order_by: try!(OrderBy::new(&ctx, order_by)),
heap: Some(try!(TopNHeap::new(meta.get_limit() as usize))),
cols: columns_info,
related_cols_offset: visitor.column_offsets(),
Expand All @@ -64,25 +91,19 @@ impl<'a> TopNExecutor<'a> {

fn fetch_all(&mut self) -> Result<()> {
while let Some(row) = try!(self.src.next()) {
let mut eval = Evaluator::default();
try!(inflate_with_col_for_dag(
&mut eval,
let cols = try!(inflate_with_col_for_dag(
&self.ctx,
&row.data,
self.cols.clone(),
&self.related_cols_offset,
row.handle
));
let mut ob_values = Vec::with_capacity(self.order_by.len());
for by_item in self.order_by.as_ref().iter() {
let v = box_try!(eval.eval(&self.ctx, by_item.get_expr()));
ob_values.push(v);
}
let ob_values = try!(self.order_by.eval(&self.ctx, &cols));
try!(self.heap.as_mut().unwrap().try_add_row(
row.handle,
row.data,
ob_values,
self.order_by.clone(),
self.order_by.items.clone(),
self.ctx.clone()
));
}
Expand Down
Loading