Skip to content

Commit

Permalink
use fat bytes array
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Sep 6, 2016
1 parent 9e60b3d commit 0af4287
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 184 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ git = "https://github.com/pingcap/kvproto"

[dependencies.tipb]
git = "https://github.com/pingcap/tipb.git"
branch = "coocood/update-select-response"

[dependencies.mio]
git = "https://github.com/carllerche/mio.git"
Expand Down
134 changes: 65 additions & 69 deletions src/server/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::boxed::FnBox;
use std::rc::Rc;
use std::fmt::{self, Display, Formatter, Debug};

use tipb::select::{self, SelectRequest, SelectResponse, Row};
use tipb::select::{self, SelectRequest, SelectResponse, Chunk, RowMeta};
use tipb::schema::ColumnInfo;
use tipb::expression::{Expr, ExprType};
use protobuf::{Message as PbMsg, RepeatedField};
Expand All @@ -45,6 +45,7 @@ use super::metrics::*;

pub const REQ_TYPE_SELECT: i64 = 101;
pub const REQ_TYPE_INDEX: i64 = 102;
pub const BATCH_ROW_COUNT: usize = 64;

const DEFAULT_ERROR_CODE: i32 = 1;

Expand Down Expand Up @@ -260,7 +261,7 @@ impl TiDbEndPoint {
let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
match res {
Ok(rows) => sel_resp.set_rows(RepeatedField::from_vec(rows)),
Ok(()) => sel_resp.set_chunks(RepeatedField::from_vec(ctx.core.chunks)),
Err(e) => {
if let Error::Other(_) = e {
// should we handle locked here too?
Expand Down Expand Up @@ -355,6 +356,15 @@ fn inflate_with_col<'a, T>(eval: &mut Evaluator,
Ok(())
}

#[inline]
fn get_chunk(chunks: &mut Vec<Chunk>) -> &mut Chunk {
if chunks.last().map_or(true, |chunk| chunk.get_rows_meta().len() >= BATCH_ROW_COUNT) {
let chunk = Chunk::new();
chunks.push(chunk);
}
chunks.last_mut().unwrap()
}

pub struct SelectContextCore {
sel: SelectRequest,
eval: Evaluator,
Expand All @@ -364,6 +374,7 @@ pub struct SelectContextCore {
aggr_cols: Vec<ColumnInfo>,
gks: Vec<Rc<Vec<u8>>>,
gk_aggrs: HashMap<Rc<Vec<u8>>, Vec<Box<AggrFunc>>>,
chunks: Vec<Chunk>,
}

impl SelectContextCore {
Expand Down Expand Up @@ -419,27 +430,25 @@ impl SelectContextCore {
cond_cols: cond_cols,
gks: vec![],
gk_aggrs: map![],
chunks: vec![],
})
}

fn handle_row(&mut self,
h: i64,
row_data: HashMap<i64, &[u8]>,
dest: &mut Vec<Row>)
-> Result<()> {
fn handle_row(&mut self, h: i64, row_data: HashMap<i64, &[u8]>) -> Result<usize> {
// clear all dirty values.
self.eval.row.clear();

if try!(self.should_skip(h, &row_data)) {
return Ok(());
return Ok(0);
}

if self.aggr {
try!(self.aggregate(h, &row_data));
Ok(0)
} else {
dest.push(try!(self.get_row(h, row_data)))
try!(self.get_row(h, row_data));
Ok(1)
}
Ok(())
}

fn should_skip(&mut self, h: i64, values: &HashMap<i64, &[u8]>) -> Result<bool> {
Expand All @@ -452,30 +461,33 @@ impl SelectContextCore {
Ok(b.map_or(true, |v| !v))
}

fn get_row(&mut self, h: i64, values: HashMap<i64, &[u8]>) -> Result<Row> {
let mut row = Row::new();
let handle = box_try!(datum::encode_value(&[Datum::I64(h)]));
row.set_handle(handle);
fn get_row(&mut self, h: i64, values: HashMap<i64, &[u8]>) -> Result<()> {
let chunk = get_chunk(&mut self.chunks);
let mut meta = RowMeta::new();
meta.set_handle(h);
let cols = if self.sel.has_table_info() {
self.sel.get_table_info().get_columns()
} else {
self.sel.get_index_info().get_columns()
};
let last_len = chunk.get_rows_data().len();
for col in cols {
let col_id = col.get_column_id();
if let Some(v) = values.get(&col_id) {
row.mut_data().extend_from_slice(v);
chunk.mut_rows_data().extend_from_slice(v);
continue;
}
if col.get_pk_handle() {
box_try!(datum::encode_to(row.mut_data(), &[get_pk(col, h)], false));
box_try!(datum::encode_to(chunk.mut_rows_data(), &[get_pk(col, h)], false));
} else if mysql::has_not_null_flag(col.get_flag() as u64) {
return Err(box_err!("column {} of {} is missing", col_id, h));
} else {
box_try!(datum::encode_to(row.mut_data(), &[Datum::Null], false));
box_try!(datum::encode_to(chunk.mut_rows_data(), &[Datum::Null], false));
}
}
Ok(row)
meta.set_length((chunk.get_rows_data().len() - last_len) as i64);
chunk.mut_rows_meta().push(meta);
Ok(())
}

fn get_group_key(&mut self) -> Result<Vec<u8>> {
Expand Down Expand Up @@ -526,24 +538,28 @@ impl SelectContextCore {
/// Aggs: count(c1), sum(c2), avg(c3)
/// Rows: groupKey1, count1, value2, count3, value3
/// groupKey2, count1, value2, count3, value3
fn aggr_rows(&mut self) -> Result<Vec<Row>> {
let mut rows = Vec::with_capacity(self.gk_aggrs.len());
fn aggr_rows(&mut self) -> Result<()> {
self.chunks = Vec::with_capacity((self.gk_aggrs.len() + BATCH_ROW_COUNT - 1) /
BATCH_ROW_COUNT);
// Each aggregate partial result will be converted to two datum.
let mut row_data = Vec::with_capacity(1 + 2 * self.sel.get_aggregates().len());
for gk in self.gks.drain(..) {
let aggrs = self.gk_aggrs.remove(&gk).unwrap();

let mut row = Row::new();
let chunk = get_chunk(&mut self.chunks);
// The first column is group key.
row_data.push(Datum::Bytes(Rc::try_unwrap(gk).unwrap()));
for mut aggr in aggrs {
try!(aggr.calc(&mut row_data));
}
row.set_data(box_try!(datum::encode_value(&row_data)));
rows.push(row);
let last_len = chunk.get_rows_data().len();
box_try!(datum::encode_to(chunk.mut_rows_data(), &row_data, false));
let mut meta = RowMeta::new();
meta.set_length((chunk.get_rows_data().len() - last_len) as i64);
chunk.mut_rows_meta().push(meta);
row_data.clear();
}
Ok(rows)
Ok(())
}
}

Expand Down Expand Up @@ -583,58 +599,47 @@ impl<'a> SelectContext<'a> {
})
}

fn get_rows_from_sel(&mut self,
ranges: Vec<KeyRange>,
limit: usize,
desc: bool)
-> Result<Vec<Row>> {
let mut rows = vec![];
fn get_rows_from_sel(&mut self, ranges: Vec<KeyRange>, limit: usize, desc: bool) -> Result<()> {
let mut collected = 0;
for ran in ranges {
if rows.len() >= limit {
if collected >= limit {
break;
}
let timer = Instant::now();
let ran_rows = try!(self.get_rows_from_range(ran, limit - rows.len(), desc));
debug!("fetch {} rows takes {} ms",
ran_rows.len(),
let handle_cnt = try!(self.get_rows_from_range(ran, limit, desc));
debug!("fetch {} chunks takes {} ms",
handle_cnt,
duration_to_ms(timer.elapsed()));
rows.extend(ran_rows);
collected += handle_cnt;
}
if self.core.aggr {
self.core.aggr_rows()
} else {
Ok(rows)
Ok(())
}
}

fn get_rows_from_range(&mut self,
range: KeyRange,
limit: usize,
desc: bool)
-> Result<Vec<Row>> {
let mut rows = vec![];
if limit == 0 {
return Ok(rows);
}
fn get_rows_from_range(&mut self, range: KeyRange, limit: usize, desc: bool) -> Result<usize> {
let mut row_count = 0;
if is_point(&range) {
let value = match try!(self.snap.get(&Key::from_raw(range.get_start()))) {
None => return Ok(rows),
None => return Ok(0),
Some(v) => v,
};
let values = {
let ids = self.core.cols.as_ref().left().unwrap();
box_try!(table::cut_row(&value, ids))
};
let h = box_try!(table::decode_handle(range.get_start()));
try!(self.core.handle_row(h, values, &mut rows));
row_count += try!(self.core.handle_row(h, values));
} else {
let mut seek_key = if desc {
range.get_end().to_vec()
} else {
range.get_start().to_vec()
};
let mut scanner = try!(self.snap.scanner());
while limit > rows.len() {
while limit > row_count {
let kv = if desc {
try!(scanner.reverse_seek(Key::from_raw(&seek_key)))
} else {
Expand All @@ -656,50 +661,41 @@ impl<'a> SelectContext<'a> {
let ids = self.core.cols.as_ref().left().unwrap();
box_try!(table::cut_row(&value, ids))
};
try!(self.core.handle_row(h, row_data, &mut rows));
row_count += try!(self.core.handle_row(h, row_data));
seek_key = if desc {
box_try!(table::truncate_as_row_key(&key)).to_vec()
} else {
prefix_next(&key)
};
}
}
if self.core.aggr { Ok(vec![]) } else { Ok(rows) }
Ok(row_count)
}

fn get_rows_from_idx(&mut self,
ranges: Vec<KeyRange>,
limit: usize,
desc: bool)
-> Result<Vec<Row>> {
let mut rows = vec![];
fn get_rows_from_idx(&mut self, ranges: Vec<KeyRange>, limit: usize, desc: bool) -> Result<()> {
let mut collected = 0;
for r in ranges {
if rows.len() >= limit {
if collected >= limit {
break;
}
let part = try!(self.get_idx_row_from_range(r, limit, desc));
rows.extend(part);
collected += try!(self.get_idx_row_from_range(r, limit, desc));
}
if self.core.aggr {
self.core.aggr_rows()
} else {
Ok(rows)
Ok(())
}
}

fn get_idx_row_from_range(&mut self,
r: KeyRange,
limit: usize,
desc: bool)
-> Result<Vec<Row>> {
let mut rows = vec![];
fn get_idx_row_from_range(&mut self, r: KeyRange, limit: usize, desc: bool) -> Result<usize> {
let mut row_cnt = 0;
let mut seek_key = if desc {
r.get_end().to_vec()
} else {
r.get_start().to_vec()
};
let mut scanner = try!(self.snap.scanner());
while rows.len() < limit {
while row_cnt < limit {
let nk = if desc {
try!(scanner.reverse_seek(Key::from_raw(&seek_key)))
} else {
Expand All @@ -726,10 +722,10 @@ impl<'a> SelectContext<'a> {
} else {
box_try!(handle.decode_datum()).i64()
};
try!(self.core.handle_row(handle, values, &mut rows));
row_cnt += try!(self.core.handle_row(handle, values));
}
seek_key = if desc { key } else { prefix_next(&key) };
}
if self.core.aggr { Ok(vec![]) } else { Ok(rows) }
Ok(row_cnt)
}
}
Loading

0 comments on commit 0af4287

Please sign in to comment.