Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exec summary for limit executor #4599

Merged
merged 23 commits into from May 6, 2019
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab5d89f
Rename and move ExecSummary to support non-batch executors
breezewish Apr 29, 2019
1256fe7
Simplify exec summary to improve performance
breezewish Apr 29, 2019
b688c4e
Collect execution summary in normal table scan and index scan
breezewish Apr 29, 2019
1617154
Merge remote-tracking branch 'origin/master' into exec_summary_normal/2
breezewish Apr 29, 2019
a11da17
Merge branch 'exec_summary_normal/2' into exec_summary_normal/3
breezewish Apr 29, 2019
38a65a5
Support exec summary in limit
breezewish Apr 29, 2019
e7f34d8
Rename according to comments
breezewish Apr 30, 2019
adb2079
Merge branch 'exec_summary_normal/2' into exec_summary_normal/3
breezewish Apr 30, 2019
6777b33
Merge branch 'exec_summary_normal/3' into exec_summary_normal/4
breezewish Apr 30, 2019
5b7022d
Merge branch 'master' into exec_summary_normal/3
breezewish Apr 30, 2019
d034ea8
Merge branch 'master' into exec_summary_normal/3
breezewish Apr 30, 2019
9135017
Merge branch 'master' into exec_summary_normal/3
breezewish May 5, 2019
d5adda1
Add execution summary framework for non-batch executors
breezewish May 5, 2019
9e49f43
Address comments:
breezewish May 5, 2019
8f103b6
Merge branch 'exec_summary_normal/2.5' into exec_summary_normal/3
breezewish May 5, 2019
221113f
Merge remote-tracking branch 'origin/master' into exec_summary_normal/3
breezewish May 5, 2019
4182662
Merge branch 'exec_summary_normal/3' into exec_summary_normal/4
breezewish May 5, 2019
2e95f0f
Merge branch 'master' into exec_summary_normal/4
May 5, 2019
423b234
Merge branch 'master' into exec_summary_normal/4
breezewish May 5, 2019
9531d6c
Merge branch 'master' into exec_summary_normal/4
breezewish May 5, 2019
198233d
Remove empty line
breezewish May 5, 2019
74db5ea
Merge branch 'master' into exec_summary_normal/4
breezewish May 6, 2019
9bba4c1
Merge branch 'master' into exec_summary_normal/4
breezewish May 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/coprocessor/dag/builder.rs
Expand Up @@ -160,8 +160,13 @@ impl DAGBuilder {
let first = exec_descriptors
.next()
.ok_or_else(|| Error::Other(box_err!("has no executor")))?;

let mut src = Self::build_normal_first_executor::<_, C>(first, store, ranges, collect)?;
let mut summary_slot_index = 0;

for mut exec in exec_descriptors {
summary_slot_index += 1;

let curr: Box<dyn Executor + Send> = match exec.get_tp() {
ExecType::TypeTableScan | ExecType::TypeIndexScan => {
return Err(box_err!("got too much *scan exec, should be only one"));
Expand All @@ -184,7 +189,11 @@ impl DAGBuilder {
ExecType::TypeTopN => {
Box::new(TopNExecutor::new(exec.take_topN(), Arc::clone(&ctx), src)?)
}
ExecType::TypeLimit => Box::new(LimitExecutor::new(exec.take_limit(), src)),
ExecType::TypeLimit => Box::new(LimitExecutor::new(
C::new(summary_slot_index),
exec.take_limit(),
src,
)),
};
src = curr;
}
Expand Down
22 changes: 15 additions & 7 deletions src/coprocessor/dag/executor/limit.rs
Expand Up @@ -3,22 +3,24 @@
use tipb::executor::Limit;

use super::ExecutorMetrics;
use crate::coprocessor::dag::exec_summary::ExecSummary;
use crate::coprocessor::dag::exec_summary::{ExecSummary, ExecSummaryCollector};
use crate::coprocessor::dag::executor::{Executor, Row};
use crate::coprocessor::dag::expr::EvalWarnings;
use crate::coprocessor::Result;

/// Retrieves rows from the source executor and only produces part of the rows.
pub struct LimitExecutor<'a> {
pub struct LimitExecutor<C: ExecSummaryCollector> {
summary_collector: C,
limit: u64,
cursor: u64,
src: Box<dyn Executor + Send + 'a>,
src: Box<dyn Executor + Send>,
first_collect: bool,
}

impl<'a> LimitExecutor<'a> {
pub fn new(limit: Limit, src: Box<dyn Executor + Send + 'a>) -> LimitExecutor<'_> {
impl<C: ExecSummaryCollector> LimitExecutor<C> {
pub fn new(summary_collector: C, limit: Limit, src: Box<dyn Executor + Send>) -> Self {
LimitExecutor {
summary_collector,
limit: limit.get_limit(),
cursor: 0,
src,
Expand All @@ -27,15 +29,19 @@ impl<'a> LimitExecutor<'a> {
}
}

impl<'a> Executor for LimitExecutor<'a> {
impl<C: ExecSummaryCollector> Executor for LimitExecutor<C> {
fn next(&mut self) -> Result<Option<Row>> {
let timer = self.summary_collector.on_start_iterate();
if self.cursor >= self.limit {
self.summary_collector.on_finish_iterate(timer, 0);
return Ok(None);
}
if let Some(row) = self.src.next()? {
self.cursor += 1;
self.summary_collector.on_finish_iterate(timer, 1);
Ok(Some(row))
} else {
self.summary_collector.on_finish_iterate(timer, 0);
Ok(None)
}
}
Expand All @@ -62,6 +68,7 @@ impl<'a> Executor for LimitExecutor<'a> {

fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) {
self.src.collect_execution_summaries(target);
self.summary_collector.collect_into(target);
}
}

Expand All @@ -72,6 +79,7 @@ mod tests {

use super::super::tests::*;
use super::*;
use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled;

#[test]
fn test_limit_executor() {
Expand Down Expand Up @@ -101,7 +109,7 @@ mod tests {
let limit = 5;
limit_meta.set_limit(limit);
// init topn executor
let mut limit_ect = LimitExecutor::new(limit_meta, ts_ect);
let mut limit_ect = LimitExecutor::new(ExecSummaryCollectorDisabled, limit_meta, ts_ect);
let mut limit_rows = Vec::with_capacity(limit as usize);
while let Some(row) = limit_ect.next().unwrap() {
limit_rows.push(row.take_origin());
Expand Down