Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exec summary for limit executor #4599

Merged
merged 23 commits into from May 6, 2019
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ab5d89f
Rename and move ExecSummary to support non-batch executors
breeswish Apr 29, 2019
1256fe7
Simplify exec summary to improve performance
breeswish Apr 29, 2019
b688c4e
Collect execution summary in normal table scan and index scan
breeswish Apr 29, 2019
1617154
Merge remote-tracking branch 'origin/master' into exec_summary_normal/2
breeswish Apr 29, 2019
a11da17
Merge branch 'exec_summary_normal/2' into exec_summary_normal/3
breeswish Apr 29, 2019
38a65a5
Support exec summary in limit
breeswish Apr 29, 2019
e7f34d8
Rename according to comments
breeswish Apr 30, 2019
adb2079
Merge branch 'exec_summary_normal/2' into exec_summary_normal/3
breeswish Apr 30, 2019
6777b33
Merge branch 'exec_summary_normal/3' into exec_summary_normal/4
breeswish Apr 30, 2019
5b7022d
Merge branch 'master' into exec_summary_normal/3
breeswish Apr 30, 2019
d034ea8
Merge branch 'master' into exec_summary_normal/3
breeswish Apr 30, 2019
9135017
Merge branch 'master' into exec_summary_normal/3
breeswish May 5, 2019
d5adda1
Add execution summary framework for non-batch executors
breeswish May 5, 2019
9e49f43
Address comments:
breeswish May 5, 2019
8f103b6
Merge branch 'exec_summary_normal/2.5' into exec_summary_normal/3
breeswish May 5, 2019
221113f
Merge remote-tracking branch 'origin/master' into exec_summary_normal/3
breeswish May 5, 2019
4182662
Merge branch 'exec_summary_normal/3' into exec_summary_normal/4
breeswish May 5, 2019
2e95f0f
Merge branch 'master' into exec_summary_normal/4
zz-jason May 5, 2019
423b234
Merge branch 'master' into exec_summary_normal/4
breeswish May 5, 2019
9531d6c
Merge branch 'master' into exec_summary_normal/4
breeswish May 5, 2019
198233d
Remove empty line
breeswish May 5, 2019
74db5ea
Merge branch 'master' into exec_summary_normal/4
breeswish May 6, 2019
9bba4c1
Merge branch 'master' into exec_summary_normal/4
breeswish May 6, 2019
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Simplify exec summary to improve performance

Signed-off-by: Breezewish <breezewish@pingcap.com>
  • Loading branch information...
breeswish committed Apr 29, 2019
commit 1256fe704d8773e21f4c87d6e1bec0b061561263
@@ -195,15 +195,15 @@ mod tests {
// Collected statistics remain unchanged until `next_batch` generated delta statistics.
for _ in 0..2 {
executor.collect_statistics(&mut s);
let exec_summary = s.summary_per_executor[1].as_ref().unwrap();
let exec_summary = s.summary_per_executor[1];
assert_eq!(3, exec_summary.num_produced_rows);
assert_eq!(2, exec_summary.num_iterations);
}

// we get 1 row since the limit is 4
executor.next_batch(10);
executor.collect_statistics(&mut s);
let exec_summary = s.summary_per_executor[1].as_ref().unwrap();
let exec_summary = s.summary_per_executor[1];
assert_eq!(4, exec_summary.num_produced_rows);
assert_eq!(3, exec_summary.num_iterations);
}
@@ -673,18 +673,18 @@ mod tests {
executor.collect_statistics(&mut s);

assert_eq!(s.scanned_rows_per_range[0], 3);
// 0 is none because our output index is 1
assert!(s.summary_per_executor[0].is_none());
let exec_summary = s.summary_per_executor[1].as_ref().unwrap();
// 0 remains Default because our output index is 1
assert_eq!(s.summary_per_executor[0], ExecSummary::default());
let exec_summary = s.summary_per_executor[1];
assert_eq!(3, exec_summary.num_produced_rows);
assert_eq!(2, exec_summary.num_iterations);

executor.collect_statistics(&mut s);

// Collected statistics remain unchanged because of no newly generated delta statistics.
assert_eq!(s.scanned_rows_per_range[0], 3);
assert!(s.summary_per_executor[0].is_none());
let exec_summary = s.summary_per_executor[1].as_ref().unwrap();
assert_eq!(s.summary_per_executor[0], ExecSummary::default());
let exec_summary = s.summary_per_executor[1];
assert_eq!(3, exec_summary.num_produced_rows);
assert_eq!(2, exec_summary.num_iterations);

@@ -694,8 +694,8 @@ mod tests {
executor.collect_statistics(&mut s);

assert_eq!(s.scanned_rows_per_range[0], 2);
assert!(s.summary_per_executor[0].is_none());
let exec_summary = s.summary_per_executor[1].as_ref().unwrap();
assert_eq!(s.summary_per_executor[0], ExecSummary::default());
let exec_summary = s.summary_per_executor[1];
assert_eq!(2, exec_summary.num_produced_rows);
assert_eq!(1, exec_summary.num_iterations);
}
@@ -7,8 +7,9 @@ use crate::coprocessor::dag::exec_summary::ExecSummary;
///
/// Each batch executor aggregates and updates corresponding slots in this structure.
pub struct BatchExecuteStatistics {
/// The execution summary of each executor.
pub summary_per_executor: Vec<Option<ExecSummary>>,
/// The execution summary of each executor. If execution summary is not needed, it will
/// be zero sized.
pub summary_per_executor: Vec<ExecSummary>,

/// For each range given in the request, how many rows are scanned.
pub scanned_rows_per_range: Vec<usize>,
@@ -18,20 +19,22 @@ pub struct BatchExecuteStatistics {
}

impl BatchExecuteStatistics {
/// Creates a new statistics instance.
///
/// If execution summary does not need to be collected, it is safe to pass 0 to the `executors`
/// argument, which will avoid one allocation.
pub fn new(executors: usize, ranges: usize) -> Self {
let mut summary_per_executor = Vec::new();
summary_per_executor.resize_with(executors, || None);

Self {
summary_per_executor,
summary_per_executor: vec![ExecSummary::default(); executors],
scanned_rows_per_range: vec![0; ranges],
cf_stats: crate::storage::Statistics::default(),
}
}

/// Clears the statistics instance.
pub fn clear(&mut self) {
for item in self.summary_per_executor.iter_mut() {
*item = None;
*item = ExecSummary::default();
}
for item in self.scanned_rows_per_range.iter_mut() {
*item = 0;
@@ -50,6 +50,9 @@ pub struct BatchDAGHandler {
/// Traditional metric interface.
// TODO: Deprecate it in Coprocessor DAG v2.
metrics: ExecutorMetrics,

/// Whether or not execution summary need to be collected.
collect_exec_summary: bool,
}

impl BatchDAGHandler {
@@ -58,16 +61,17 @@ impl BatchDAGHandler {
out_most_executor: Box<dyn BatchExecutor>,
output_offsets: Vec<u32>,
config: Arc<EvalConfig>,
ranges_len: usize,
executors_len: usize,
statistics: BatchExecuteStatistics,
collect_exec_summary: bool,
) -> Self {
Self {
deadline,
out_most_executor,
output_offsets,
config,
statistics: BatchExecuteStatistics::new(executors_len, ranges_len),
statistics,
metrics: ExecutorMetrics::default(),
collect_exec_summary,
}
}
}
@@ -141,23 +145,22 @@ impl RequestHandler for BatchDAGHandler {
.map(|v| *v as i64)
.collect(),
);
sel_resp.set_execution_summaries(RepeatedField::from_vec(
self.statistics

if self.collect_exec_summary {
let summaries = self
.statistics
.summary_per_executor
.iter()
.map(|summary| {
let mut ret = ExecutorExecutionSummary::new();
if let Some(summary) = summary {
ret.set_num_iterations(summary.num_iterations as u64);
ret.set_num_produced_rows(summary.num_produced_rows as u64);
ret.set_time_processed_ns(
summary.time_processed_ms as u64 * 1_000_000,
);
}
ret.set_num_iterations(summary.num_iterations as u64);
ret.set_num_produced_rows(summary.num_produced_rows as u64);
ret.set_time_processed_ns(summary.time_processed_ns as u64);
ret
})
.collect(),
));
.collect();
sel_resp.set_execution_summaries(RepeatedField::from_vec(summaries));
}

sel_resp.set_warnings(warnings.warnings.into());
sel_resp.set_warning_count(warnings.warning_cnt as i64);
@@ -261,9 +261,10 @@ impl DAGBuilder {
) -> Result<super::batch_handler::BatchDAGHandler> {
let ranges_len = ranges.len();
let executors_len = req.get_executors().len();
let collect_exec_summary = req.get_collect_execution_summaries();

let config = Arc::new(config);
let out_most_executor = if req.get_collect_execution_summaries() {
let out_most_executor = if collect_exec_summary {
super::builder::DAGBuilder::build_batch::<_, ExecSummaryCollectorEnabled>(
req.take_executors().into_vec(),
store,
@@ -297,8 +298,15 @@ impl DAGBuilder {
out_most_executor,
output_offsets,
config,
ranges_len,
executors_len,
BatchExecuteStatistics::new(
if collect_exec_summary {
executors_len
} else {
0 // Avoid allocation for executor summaries when it is not needed
},
ranges_len,
),
collect_exec_summary,
))
}

@@ -1,10 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

/// Execution summaries to support `EXPLAIN ANALYZE` statements.
#[derive(Default)]
/// Execution summaries to support `EXPLAIN ANALYZE` statements. We don't use
/// `ExecutorExecutionSummary` directly since it is less efficient.
#[derive(Debug, Default, Copy, Clone, Add, AddAssign, PartialEq, Eq)]
pub struct ExecSummary {
/// Total time cost in this executor.
pub time_processed_ms: usize,
pub time_processed_ns: usize,

/// How many rows this executor produced totally.
pub num_produced_rows: usize,
@@ -13,15 +14,6 @@ pub struct ExecSummary {
pub num_iterations: usize,
}

impl ExecSummary {
#[inline]
pub fn merge_into(self, target: &mut Self) {
target.time_processed_ms += self.time_processed_ms;
target.num_produced_rows += self.num_produced_rows;
target.num_iterations += self.num_iterations;
}
}

/// A trait for all execution summary collectors.
pub trait ExecSummaryCollector: Send {
type DurationRecorder;
@@ -41,7 +33,7 @@ pub trait ExecSummaryCollector: Send {
fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize);

/// Takes and appends current execution summary into `target`.
fn collect_into(&mut self, target: &mut [Option<ExecSummary>]);
fn collect_into(&mut self, target: &mut [ExecSummary]);
}

/// A normal `ExecSummaryCollector` that simply collects execution summaries.
@@ -71,18 +63,14 @@ impl ExecSummaryCollector for ExecSummaryCollectorEnabled {
#[inline]
fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize) {
self.counts.num_produced_rows += rows;
let elapsed_time = tikv_util::time::duration_to_ms(dr.elapsed()) as usize;
self.counts.time_processed_ms += elapsed_time;
let elapsed_time = tikv_util::time::duration_to_nanos(dr.elapsed()) as usize;
self.counts.time_processed_ns += elapsed_time;
}

#[inline]
fn collect_into(&mut self, target: &mut [Option<ExecSummary>]) {
fn collect_into(&mut self, target: &mut [ExecSummary]) {
let current_summary = std::mem::replace(&mut self.counts, ExecSummary::default());
if let Some(t) = &mut target[self.output_index] {
current_summary.merge_into(t);
} else {
target[self.output_index] = Some(current_summary);
}
target[self.output_index] += current_summary;
}
}

@@ -104,5 +92,5 @@ impl ExecSummaryCollector for ExecSummaryCollectorDisabled {
fn on_finish_iterate(&mut self, _dr: Self::DurationRecorder, _rows: usize) {}

#[inline]
fn collect_into(&mut self, _target: &mut [Option<ExecSummary>]) {}
fn collect_into(&mut self, _target: &mut [ExecSummary]) {}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.