Skip to content

Commit

Permalink
copr: support FirstRow aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Jun 13, 2016
1 parent a9cef95 commit c18b890
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
27 changes: 24 additions & 3 deletions src/server/coprocessor/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,25 @@ use super::Result;
pub fn build_aggr_func(expr: &Expr) -> Result<Box<AggrFunc>> {
match expr.get_tp() {
ExprType::Count => Ok(box 0),
ExprType::First => Ok(box None),
et => Err(box_err!("unsupport AggrExprType: {:?}", et)),
}
}

/// `AggrFunc` is used to execute aggregate operations.
pub trait AggrFunc {
/// `update` is used for update aggregate context.
fn update(&mut self, args: &[Datum]) -> Result<()>;
fn update(&mut self, args: Vec<Datum>) -> Result<()>;
/// `calc` calculates the aggregated result and push it to collector.
fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()>;
}

type Count = u64;

impl AggrFunc for Count {
fn update(&mut self, args: &[Datum]) -> Result<()> {
fn update(&mut self, args: Vec<Datum>) -> Result<()> {
for arg in args {
if *arg == Datum::Null {
if arg == Datum::Null {
return Ok(());
}
}
Expand All @@ -38,3 +39,23 @@ impl AggrFunc for Count {
Ok(())
}
}

type First = Option<Datum>;

impl AggrFunc for First {
fn update(&mut self, mut args: Vec<Datum>) -> Result<()> {
if self.is_some() {
return Ok(());
}
if args.len() != 1 {
return Err(box_err!("Wrong number of args for AggFuncFirstRow: {}", args.len()));
}
*self = args.pop();
Ok(())
}

fn calc(&mut self, collector: &mut Vec<Datum>) -> Result<()> {
collector.push(self.take().unwrap());
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/server/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,15 +432,15 @@ impl SelectContextCore {
for (expr, func) in aggr_exprs.iter().zip(funcs) {
// TODO: cache args
let args = box_try!(self.eval.batch_eval(expr.get_children()));
try!(func.update(&args));
try!(func.update(args));
}
}
Entry::Vacant(e) => {
let mut aggrs = Vec::with_capacity(aggr_exprs.len());
for expr in aggr_exprs {
let mut aggr = try!(aggregate::build_aggr_func(expr));
let args = box_try!(self.eval.batch_eval(expr.get_children()));
try!(aggr.update(&args));
try!(aggr.update(args));
aggrs.push(aggr);
}
self.gks.push(gk);
Expand Down
30 changes: 30 additions & 0 deletions tests/coprocessor/test_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,36 @@ fn test_aggr_count() {
end_point.stop().unwrap();
}

#[test]
fn test_aggr_first() {
let count = 10;
let (mut store, mut end_point, ti) = initial_data(count);

let mut col = Expr::new();
col.set_tp(ExprType::ColumnRef);
col.mut_val().encode_i64(1).unwrap();
let mut expr = Expr::new();
expr.set_tp(ExprType::First);
expr.mut_children().push(col);

let req = prepare_sel(&mut store, &ti, None, None, vec![expr], vec![3]);
let resp = handle_select(&end_point, req);
assert_eq!(resp.get_rows().len(), 6);
for (i, row) in resp.get_rows().iter().enumerate() {
let idx = if i == 0 {
1
} else {
i * 2
};
let gk = datum::encode_value(&[Datum::Bytes(format!("varchar:{}", i).into_bytes())]);
let expected_datum = vec![Datum::Bytes(gk.unwrap()), Datum::I64(idx as i64)];
let expected_encoded = datum::encode_value(&expected_datum).unwrap();
assert_eq!(row.get_data(), &*expected_encoded);
}

end_point.stop().unwrap();
}

#[test]
fn test_limit() {
let count = 10;
Expand Down

0 comments on commit c18b890

Please sign in to comment.