Skip to content

Commit

Permalink
coprocessor/dag:fix bug caused by use offset in expr (#2064)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche committed Jul 28, 2017
1 parent 789278f commit 24edb72
Show file tree
Hide file tree
Showing 7 changed files with 845 additions and 372 deletions.
14 changes: 7 additions & 7 deletions src/coprocessor/dag.rs
Expand Up @@ -33,7 +33,7 @@ use super::executor::limit::LimitExecutor;

pub struct DAGContext<'s> {
pub deadline: Instant,
pub columns: Vec<ColumnInfo>,
pub columns: Rc<Vec<ColumnInfo>>,
pub has_aggr: bool,
pub chunks: Vec<Chunk>,
req: DAGRequest,
Expand All @@ -52,7 +52,7 @@ impl<'s> DAGContext<'s> {
DAGContext {
req: req,
deadline: deadline,
columns: vec![],
columns: Rc::new(vec![]),
ranges: ranges,
snap: snap,
has_aggr: false,
Expand All @@ -68,10 +68,10 @@ impl<'s> DAGContext<'s> {
// check whether first exec is *scan and get the column info
match first.get_tp() {
ExecType::TypeTableScan => {
self.columns = first.get_tbl_scan().get_columns().to_vec();
self.columns = Rc::new(first.get_tbl_scan().get_columns().to_vec());
}
ExecType::TypeIndexScan => {
self.columns = first.get_idx_scan().get_columns().to_vec();
self.columns = Rc::new(first.get_idx_scan().get_columns().to_vec());
}
_ => {
return Err(box_err!("first exec type should be *Scan, but get {:?}",
Expand Down Expand Up @@ -123,19 +123,19 @@ impl<'s> DAGContext<'s> {
ExecType::TypeSelection => {
Box::new(try!(SelectionExecutor::new(exec.take_selection(),
self.eval_ctx.clone(),
&self.columns,
self.columns.clone(),
src)))
}
ExecType::TypeAggregation => {
Box::new(try!(AggregationExecutor::new(exec.take_aggregation(),
self.eval_ctx.clone(),
&self.columns,
self.columns.clone(),
src)))
}
ExecType::TypeTopN => {
Box::new(try!(TopNExecutor::new(exec.take_topN(),
self.eval_ctx.clone(),
&self.columns,
self.columns.clone(),
src)))
}
ExecType::TypeLimit => Box::new(LimitExecutor::new(exec.take_limit(), src)),
Expand Down
14 changes: 7 additions & 7 deletions src/coprocessor/endpoint.rs
Expand Up @@ -521,7 +521,7 @@ pub fn is_point(range: &KeyRange) -> bool {
}

#[inline]
fn get_pk(col: &ColumnInfo, h: i64) -> Datum {
pub fn get_pk(col: &ColumnInfo, h: i64) -> Datum {
if mysql::has_unsigned_flag(col.get_flag() as u64) {
// PK column is unsigned
Datum::U64(h as u64)
Expand All @@ -531,12 +531,12 @@ fn get_pk(col: &ColumnInfo, h: i64) -> Datum {
}

#[inline]
pub fn inflate_with_col<'a, T>(eval: &mut Evaluator,
ctx: &EvalContext,
values: &RowColsDict,
cols: T,
h: i64)
-> Result<()>
fn inflate_with_col<'a, T>(eval: &mut Evaluator,
ctx: &EvalContext,
values: &RowColsDict,
cols: T,
h: i64)
-> Result<()>
where T: IntoIterator<Item = &'a ColumnInfo>
{
for col in cols {
Expand Down
32 changes: 17 additions & 15 deletions src/coprocessor/executor/aggregation.rs
Expand Up @@ -21,11 +21,11 @@ use util::collections::{HashMap, HashMapEntry as Entry};
use super::super::codec::table::RowColsDict;
use super::super::codec::datum::{self, Datum, DatumEncoder, approximate_size};
use super::super::xeval::{Evaluator, EvalContext};
use super::super::endpoint::{inflate_with_col, SINGLE_GROUP};
use super::super::endpoint::SINGLE_GROUP;
use super::super::aggregate::{self, AggrFunc};
use super::super::metrics::*;
use super::super::Result;
use super::{Executor, Row, ExprColumnRefVisitor};
use super::{Executor, Row, ExprColumnRefVisitor, inflate_with_col_for_dag};

pub struct AggregationExecutor<'a> {
group_by: Vec<Expr>,
Expand All @@ -35,27 +35,23 @@ pub struct AggregationExecutor<'a> {
cursor: usize,
executed: bool,
ctx: Rc<EvalContext>,
cols: Vec<ColumnInfo>,
cols: Rc<Vec<ColumnInfo>>,
related_cols_offset: Vec<usize>, // offset of related columns
src: Box<Executor + 'a>,
}

impl<'a> AggregationExecutor<'a> {
pub fn new(mut meta: Aggregation,
ctx: Rc<EvalContext>,
columns: &[ColumnInfo],
columns: Rc<Vec<ColumnInfo>>,
src: Box<Executor + 'a>)
-> Result<AggregationExecutor<'a>> {
// collect all cols used in aggregation
let mut visitor = ExprColumnRefVisitor::new();
let mut visitor = ExprColumnRefVisitor::new(columns.len());
let group_by = meta.take_group_by().into_vec();
try!(visitor.batch_visit(&group_by));
let aggr_func = meta.take_agg_func().into_vec();
try!(visitor.batch_visit(&aggr_func));
// filter from all cols
let cols = columns.iter()
.filter(|col| visitor.col_ids.contains(&col.get_column_id()))
.cloned()
.collect();
COPR_EXECUTOR_COUNT.with_label_values(&["aggregation"]).inc();
Ok(AggregationExecutor {
group_by: group_by,
Expand All @@ -65,7 +61,8 @@ impl<'a> AggregationExecutor<'a> {
cursor: 0,
executed: false,
ctx: ctx,
cols: cols,
cols: columns,
related_cols_offset: visitor.column_offsets(),
src: src,
})
}
Expand All @@ -87,7 +84,12 @@ impl<'a> AggregationExecutor<'a> {
fn aggregate(&mut self) -> Result<()> {
while let Some(row) = try!(self.src.next()) {
let mut eval = Evaluator::default();
try!(inflate_with_col(&mut eval, &self.ctx, &row.data, &self.cols, row.handle));
try!(inflate_with_col_for_dag(&mut eval,
&self.ctx,
&row.data,
self.cols.clone(),
&self.related_cols_offset,
row.handle));
let group_key = Rc::new(try!(self.get_group_key(&mut eval)));
match self.group_key_aggrs.entry(group_key.clone()) {
Entry::Vacant(e) => {
Expand Down Expand Up @@ -226,16 +228,16 @@ mod test {

// init aggregation meta
let mut aggregation = Aggregation::default();
let group_by_cols = vec![2, 3];
let group_by_cols = vec![1, 2];
let group_by = build_group_by(&group_by_cols);
aggregation.set_group_by(RepeatedField::from_vec(group_by));
let aggr_funcs = vec![(ExprType::Avg, 1), (ExprType::Count, 3)];
let aggr_funcs = vec![(ExprType::Avg, 0), (ExprType::Count, 2)];
let aggr_funcs = build_aggr_func(&aggr_funcs);
aggregation.set_agg_func(RepeatedField::from_vec(aggr_funcs));
// init Aggregation Executor
let mut aggr_ect = AggregationExecutor::new(aggregation,
Rc::new(EvalContext::default()),
&cis,
Rc::new(cis),
Box::new(ts_ect))
.unwrap();
let expect_row_cnt = 4;
Expand Down
70 changes: 62 additions & 8 deletions src/coprocessor/executor/mod.rs
Expand Up @@ -11,11 +11,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::rc::Rc;

use util::codec::number::NumberDecoder;
use tipb::expression::{Expr, ExprType};
use super::codec::table::RowColsDict;
use tipb::schema::ColumnInfo;
use util::collections::{HashMapEntry as Entry, HashSet};

use super::codec::mysql;
use super::codec::datum::Datum;
use super::codec::table::{RowColsDict, TableDecoder};
use super::endpoint::get_pk;
use super::xeval::{Evaluator, EvalContext};
use super::Result;
use super::Error;

mod scanner;
pub mod table_scan;
Expand All @@ -25,20 +34,28 @@ pub mod topn;
pub mod limit;
pub mod aggregation;

#[allow(dead_code)]
pub struct ExprColumnRefVisitor {
pub col_ids: HashSet<i64>,
cols_offset: HashSet<usize>,
cols_len: usize,
}

#[allow(dead_code)]
impl ExprColumnRefVisitor {
pub fn new() -> ExprColumnRefVisitor {
ExprColumnRefVisitor { col_ids: HashSet::new() }
pub fn new(cols_len: usize) -> ExprColumnRefVisitor {
ExprColumnRefVisitor {
cols_offset: HashSet::default(),
cols_len: cols_len,
}
}

pub fn visit(&mut self, expr: &Expr) -> Result<()> {
if expr.get_tp() == ExprType::ColumnRef {
self.col_ids.insert(box_try!(expr.get_val().decode_i64()));
let offset = box_try!(expr.get_val().decode_i64()) as usize;
if offset >= self.cols_len {
return Err(Error::Other(box_err!("offset {} overflow, should be less than {}",
offset,
self.cols_len)));
}
self.cols_offset.insert(offset);
} else {
for sub_expr in expr.get_children() {
try!(self.visit(sub_expr));
Expand All @@ -53,6 +70,10 @@ impl ExprColumnRefVisitor {
}
Ok(())
}

pub fn column_offsets(self) -> Vec<usize> {
self.cols_offset.into_iter().collect()
}
}

#[derive(Debug)]
Expand All @@ -73,3 +94,36 @@ impl Row {
pub trait Executor {
fn next(&mut self) -> Result<Option<Row>>;
}

pub fn inflate_with_col_for_dag(eval: &mut Evaluator,
ctx: &EvalContext,
values: &RowColsDict,
columns: Rc<Vec<ColumnInfo>>,
offsets: &[usize],
h: i64)
-> Result<()> {
for offset in offsets {
let col = columns.get(*offset).unwrap();
if let Entry::Vacant(e) = eval.row.entry(*offset as i64) {
if col.get_pk_handle() {
let v = get_pk(col, h);
e.insert(v);
} else {
let col_id = col.get_column_id();
let value = match values.get(col_id) {
None if col.has_default_val() => {
// TODO: optimize it to decode default value only once.
box_try!(col.get_default_val().decode_col_value(ctx, col))
}
None if mysql::has_not_null_flag(col.get_flag() as u64) => {
return Err(box_err!("column {} of {} is missing", col_id, h));
}
None => Datum::Null,
Some(mut bs) => box_try!(bs.decode_col_value(ctx, col)),
};
e.insert(value);
}
}
}
Ok(())
}
43 changes: 19 additions & 24 deletions src/coprocessor/executor/selection.rs
Expand Up @@ -18,37 +18,31 @@ use tipb::schema::ColumnInfo;
use tipb::expression::Expr;
use super::super::xeval::{Evaluator, EvalContext};
use super::super::Result;
use super::{Row, Executor, ExprColumnRefVisitor};
use super::super::endpoint::inflate_with_col;
use super::{Row, Executor, ExprColumnRefVisitor, inflate_with_col_for_dag};
use super::super::metrics::*;

pub struct SelectionExecutor<'a> {
conditions: Vec<Expr>,
columns: Vec<ColumnInfo>,
cols: Rc<Vec<ColumnInfo>>,
related_cols_offset: Vec<usize>, // offset of related columns
ctx: Rc<EvalContext>,
src: Box<Executor + 'a>,
}

impl<'a> SelectionExecutor<'a> {
pub fn new(mut meta: Selection,
ctx: Rc<EvalContext>,
columns_info: &[ColumnInfo],
columns_info: Rc<Vec<ColumnInfo>>,
src: Box<Executor + 'a>)
-> Result<SelectionExecutor<'a>> {
let conditions = meta.take_conditions().into_vec();
let mut visitor = ExprColumnRefVisitor::new();
for cond in &conditions {
try!(visitor.visit(cond));
}

let columns = columns_info.iter()
.filter(|col| visitor.col_ids.get(&col.get_column_id()).is_some())
.cloned()
.collect::<Vec<ColumnInfo>>();
let mut visitor = ExprColumnRefVisitor::new(columns_info.len());
try!(visitor.batch_visit(&conditions));
COPR_EXECUTOR_COUNT.with_label_values(&["selection"]).inc();
Ok(SelectionExecutor {
conditions: conditions,
columns: columns,
cols: columns_info,
related_cols_offset: visitor.column_offsets(),
ctx: ctx,
src: src,
})
Expand All @@ -60,11 +54,12 @@ impl<'a> Executor for SelectionExecutor<'a> {
fn next(&mut self) -> Result<Option<Row>> {
'next: while let Some(row) = try!(self.src.next()) {
let mut evaluator = Evaluator::default();
try!(inflate_with_col(&mut evaluator,
&self.ctx,
&row.data,
&self.columns,
row.handle));
try!(inflate_with_col_for_dag(&mut evaluator,
&self.ctx,
&row.data,
self.cols.clone(),
&self.related_cols_offset,
row.handle));
for expr in &self.conditions {
let val = box_try!(evaluator.eval(&self.ctx, expr));
if !box_try!(val.into_bool(&self.ctx)).unwrap_or(false) {
Expand Down Expand Up @@ -111,13 +106,13 @@ mod tests {
expr
}

fn new_col_gt_u64_expr(col_id: i64, val: u64) -> Expr {
fn new_col_gt_u64_expr(offset: i64, val: u64) -> Expr {
let mut expr = Expr::new();
expr.set_tp(ExprType::GT);
expr.mut_children().push({
let mut lhs = Expr::new();
lhs.set_tp(ExprType::ColumnRef);
lhs.mut_val().encode_i64(col_id).unwrap();
lhs.mut_val().encode_i64(offset).unwrap();
lhs
});
expr.mut_children().push({
Expand Down Expand Up @@ -169,7 +164,7 @@ mod tests {

let mut selection_executor = SelectionExecutor::new(selection,
Rc::new(EvalContext::default()),
&cis,
Rc::new(cis),
Box::new(inner_table_scan))
.unwrap();

Expand Down Expand Up @@ -219,12 +214,12 @@ mod tests {

// selection executor
let mut selection = Selection::new();
let expr = new_col_gt_u64_expr(3, 5);
let expr = new_col_gt_u64_expr(2, 5);
selection.mut_conditions().push(expr);

let mut selection_executor = SelectionExecutor::new(selection,
Rc::new(EvalContext::default()),
&cis,
Rc::new(cis),
Box::new(inner_table_scan))
.unwrap();

Expand Down

0 comments on commit 24edb72

Please sign in to comment.