From ab5d89faa1ce635770b87520bf18d6984a995237 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 29 Apr 2019 15:01:30 +0800 Subject: [PATCH 1/8] Rename and move ExecSummary to support non-batch executors Signed-off-by: Breezewish --- src/coprocessor/dag/aggr_fn/impl_avg.rs | 2 +- .../batch/executors/index_scan_executor.rs | 4 +- src/coprocessor/dag/batch/executors/limit.rs | 8 +- .../dag/batch/executors/selection_executor.rs | 6 +- .../batch/executors/table_scan_executor.rs | 4 +- .../dag/batch/executors/util/scan_executor.rs | 4 +- src/coprocessor/dag/batch/interface.rs | 3 +- src/coprocessor/dag/batch/statistics.rs | 109 +----------------- src/coprocessor/dag/builder.rs | 2 +- src/coprocessor/dag/exec_summary.rs | 108 +++++++++++++++++ src/coprocessor/dag/mod.rs | 1 + 11 files changed, 127 insertions(+), 124 deletions(-) create mode 100644 src/coprocessor/dag/exec_summary.rs diff --git a/src/coprocessor/dag/aggr_fn/impl_avg.rs b/src/coprocessor/dag/aggr_fn/impl_avg.rs index 7a28bbfd4a4..c3f051930b3 100644 --- a/src/coprocessor/dag/aggr_fn/impl_avg.rs +++ b/src/coprocessor/dag/aggr_fn/impl_avg.rs @@ -164,7 +164,7 @@ where if self.count == 0 { target[1].push(None); } else { - target[1].push(Some(self.sum.clone())) + target[1].push(Some(self.sum.clone())); } Ok(()) } diff --git a/src/coprocessor/dag/batch/executors/index_scan_executor.rs b/src/coprocessor/dag/batch/executors/index_scan_executor.rs index b892cc9ee59..745407129f7 100644 --- a/src/coprocessor/dag/batch/executors/index_scan_executor.rs +++ b/src/coprocessor/dag/batch/executors/index_scan_executor.rs @@ -27,7 +27,7 @@ pub struct BatchIndexScanExecutor( impl BatchIndexScanExecutor< - crate::coprocessor::dag::batch::statistics::ExecSummaryCollectorDisabled, + crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled, FixtureStore, > { @@ -252,7 +252,7 @@ mod tests { use crate::coprocessor::codec::mysql::Tz; use crate::coprocessor::codec::{datum, table, Datum}; - use crate::coprocessor::dag::batch::statistics::*; + use crate::coprocessor::dag::exec_summary::*; use crate::coprocessor::dag::expr::EvalConfig; use crate::coprocessor::util::convert_to_prefix_next; use crate::storage::{FixtureStore, Key}; diff --git a/src/coprocessor/dag/batch/executors/limit.rs b/src/coprocessor/dag/batch/executors/limit.rs index 84287abfa6d..4ffc8b58c8d 100644 --- a/src/coprocessor/dag/batch/executors/limit.rs +++ b/src/coprocessor/dag/batch/executors/limit.rs @@ -34,7 +34,7 @@ impl BatchExecutor for BatchLimitEx #[inline] fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult { - let timer = self.summary_collector.on_start_batch(); + let timer = self.summary_collector.on_start_iterate(); let mut result = self.src.next_batch(scan_rows); if result.data.rows_len() < self.remaining_rows { @@ -46,7 +46,7 @@ impl BatchExecutor for BatchLimitEx } self.summary_collector - .on_finish_batch(timer, result.data.rows_len()); + .on_finish_iterate(timer, result.data.rows_len()); result } @@ -64,9 +64,7 @@ mod tests { use super::*; use crate::coprocessor::codec::batch::{LazyBatchColumn, LazyBatchColumnVec}; use crate::coprocessor::codec::data_type::VectorValue; - use crate::coprocessor::dag::batch::statistics::{ - ExecSummaryCollectorDisabled, ExecSummaryCollectorEnabled, - }; + use crate::coprocessor::dag::exec_summary::*; use crate::coprocessor::dag::expr::EvalConfig; use cop_datatype::{EvalType, FieldTypeAccessor, FieldTypeTp}; use tipb::expression::FieldType; diff --git a/src/coprocessor/dag/batch/executors/selection_executor.rs b/src/coprocessor/dag/batch/executors/selection_executor.rs index 60905b920f6..3f7dc09b5a6 100644 --- a/src/coprocessor/dag/batch/executors/selection_executor.rs +++ b/src/coprocessor/dag/batch/executors/selection_executor.rs @@ -7,7 +7,7 @@ use tipb::expression::Expr; use tipb::expression::FieldType; use super::super::interface::*; -use crate::coprocessor::dag::batch::statistics::ExecSummaryCollectorDisabled; +use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext}; use crate::coprocessor::dag::rpn_expr::{RpnExpression, RpnExpressionBuilder}; use crate::coprocessor::{Error, Result}; @@ -126,10 +126,10 @@ impl BatchExecutor for BatchSelecti #[inline] fn next_batch(&mut self, scan_rows: usize) -> BatchExecuteResult { - let timer = self.summary_collector.on_start_batch(); + let timer = self.summary_collector.on_start_iterate(); let result = self.handle_next_batch(scan_rows); self.summary_collector - .on_finish_batch(timer, result.data.rows_len()); + .on_finish_iterate(timer, result.data.rows_len()); result } diff --git a/src/coprocessor/dag/batch/executors/table_scan_executor.rs b/src/coprocessor/dag/batch/executors/table_scan_executor.rs index 70079bedf8f..efd3dc36271 100644 --- a/src/coprocessor/dag/batch/executors/table_scan_executor.rs +++ b/src/coprocessor/dag/batch/executors/table_scan_executor.rs @@ -28,7 +28,7 @@ pub struct BatchTableScanExecutor( impl BatchTableScanExecutor< - crate::coprocessor::dag::batch::statistics::ExecSummaryCollectorDisabled, + crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled, FixtureStore, > { @@ -321,7 +321,7 @@ mod tests { use crate::coprocessor::codec::mysql::Tz; use crate::coprocessor::codec::{datum, table, Datum}; use crate::coprocessor::dag::batch::interface::BatchExecutor; - use crate::coprocessor::dag::batch::statistics::*; + use crate::coprocessor::dag::exec_summary::*; use crate::coprocessor::dag::expr::EvalConfig; use crate::coprocessor::util::convert_to_prefix_next; use crate::storage::{FixtureStore, Key}; diff --git a/src/coprocessor/dag/batch/executors/util/scan_executor.rs b/src/coprocessor/dag/batch/executors/util/scan_executor.rs index 61b2da748b3..5925a1e82a3 100644 --- a/src/coprocessor/dag/batch/executors/util/scan_executor.rs +++ b/src/coprocessor/dag/batch/executors/util/scan_executor.rs @@ -219,7 +219,7 @@ impl 0); - let timer = self.summary_collector.on_start_batch(); + let timer = self.summary_collector.on_start_iterate(); let mut data = self.imp.build_column_vec(scan_rows); let is_drained = self.fill_column_vec(scan_rows, &mut data); @@ -238,7 +238,7 @@ impl Self - where - Self: Sized; - - /// Returns an instance that will record elapsed duration and increase - /// the iterations counter. The instance should be later passed back to - /// `on_finish_batch` when processing of `next_batch` is completed. - fn on_start_batch(&mut self) -> Self::DurationRecorder; - - // Increases the process time and produced rows counter. - // It should be called when `next_batch` is completed. - fn on_finish_batch(&mut self, dr: Self::DurationRecorder, rows: usize); - - /// Takes and appends current execution summary into `target`. - fn collect_into(&mut self, target: &mut [Option]); -} - -/// A normal `ExecSummaryCollector` that simply collects execution summaries. -/// It acts like `collect = true`. -pub struct ExecSummaryCollectorEnabled { - output_index: usize, - counts: ExecSummary, -} - -impl ExecSummaryCollector for ExecSummaryCollectorEnabled { - type DurationRecorder = tikv_util::time::Instant; - - #[inline] - fn new(output_index: usize) -> ExecSummaryCollectorEnabled { - ExecSummaryCollectorEnabled { - output_index, - counts: Default::default(), - } - } - - #[inline] - fn on_start_batch(&mut self) -> Self::DurationRecorder { - self.counts.num_iterations += 1; - tikv_util::time::Instant::now_coarse() - } - - #[inline] - fn on_finish_batch(&mut self, dr: Self::DurationRecorder, rows: usize) { - self.counts.num_produced_rows += rows; - let elapsed_time = tikv_util::time::duration_to_ms(dr.elapsed()) as usize; - self.counts.time_processed_ms += elapsed_time; - } - - #[inline] - fn collect_into(&mut self, target: &mut [Option]) { - let current_summary = std::mem::replace(&mut self.counts, ExecSummary::default()); - if let Some(t) = &mut target[self.output_index] { - current_summary.merge_into(t); - } else { - target[self.output_index] = Some(current_summary); - } - } -} - -/// A `ExecSummaryCollector` that does not collect anything. Acts like `collect = false`. -pub struct ExecSummaryCollectorDisabled; - -impl ExecSummaryCollector for ExecSummaryCollectorDisabled { - type DurationRecorder = (); - - #[inline] - fn new(_output_index: usize) -> ExecSummaryCollectorDisabled { - ExecSummaryCollectorDisabled - } - - #[inline] - fn on_start_batch(&mut self) -> Self::DurationRecorder {} - - #[inline] - fn on_finish_batch(&mut self, _dr: Self::DurationRecorder, _rows: usize) {} - - #[inline] - fn collect_into(&mut self, _target: &mut [Option]) {} -} diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index 5c749595290..1aa825bdcf8 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -14,7 +14,7 @@ use super::executor::{ Executor, HashAggExecutor, LimitExecutor, ScanExecutor, SelectionExecutor, StreamAggExecutor, TopNExecutor, }; -use crate::coprocessor::dag::batch::statistics::*; +use crate::coprocessor::dag::exec_summary::*; use crate::coprocessor::dag::expr::{EvalConfig, Flag, SqlMode}; use crate::coprocessor::metrics::*; use crate::coprocessor::*; diff --git a/src/coprocessor/dag/exec_summary.rs b/src/coprocessor/dag/exec_summary.rs new file mode 100644 index 00000000000..8405365533a --- /dev/null +++ b/src/coprocessor/dag/exec_summary.rs @@ -0,0 +1,108 @@ +// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. + +/// Execution summaries to support `EXPLAIN ANALYZE` statements. +#[derive(Default)] +pub struct ExecSummary { + /// Total time cost in this executor. + pub time_processed_ms: usize, + + /// How many rows this executor produced totally. + pub num_produced_rows: usize, + + /// How many times executor's `next_batch()` is called. + pub num_iterations: usize, +} + +impl ExecSummary { + #[inline] + pub fn merge_into(self, target: &mut Self) { + target.time_processed_ms += self.time_processed_ms; + target.num_produced_rows += self.num_produced_rows; + target.num_iterations += self.num_iterations; + } +} + +/// A trait for all execution summary collectors. +pub trait ExecSummaryCollector: Send { + type DurationRecorder; + + /// Creates a new instance with specified output slot index. + fn new(output_index: usize) -> Self + where + Self: Sized; + + /// Returns an instance that will record elapsed duration and increase + /// the iterations counter. The instance should be later passed back to + /// `on_finish_iterate` when processing of `next_batch` is completed. + fn on_start_iterate(&mut self) -> Self::DurationRecorder; + + // Increases the process time and produced rows counter. + // It should be called when `next_batch` is completed. + fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize); + + /// Takes and appends current execution summary into `target`. + fn collect_into(&mut self, target: &mut [Option]); +} + +/// A normal `ExecSummaryCollector` that simply collects execution summaries. +/// It acts like `collect = true`. +pub struct ExecSummaryCollectorEnabled { + output_index: usize, + counts: ExecSummary, +} + +impl ExecSummaryCollector for ExecSummaryCollectorEnabled { + type DurationRecorder = tikv_util::time::Instant; + + #[inline] + fn new(output_index: usize) -> ExecSummaryCollectorEnabled { + ExecSummaryCollectorEnabled { + output_index, + counts: Default::default(), + } + } + + #[inline] + fn on_start_iterate(&mut self) -> Self::DurationRecorder { + self.counts.num_iterations += 1; + tikv_util::time::Instant::now_coarse() + } + + #[inline] + fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize) { + self.counts.num_produced_rows += rows; + let elapsed_time = tikv_util::time::duration_to_ms(dr.elapsed()) as usize; + self.counts.time_processed_ms += elapsed_time; + } + + #[inline] + fn collect_into(&mut self, target: &mut [Option]) { + let current_summary = std::mem::replace(&mut self.counts, ExecSummary::default()); + if let Some(t) = &mut target[self.output_index] { + current_summary.merge_into(t); + } else { + target[self.output_index] = Some(current_summary); + } + } +} + +/// A `ExecSummaryCollector` that does not collect anything. Acts like `collect = false`. +pub struct ExecSummaryCollectorDisabled; + +impl ExecSummaryCollector for ExecSummaryCollectorDisabled { + type DurationRecorder = (); + + #[inline] + fn new(_output_index: usize) -> ExecSummaryCollectorDisabled { + ExecSummaryCollectorDisabled + } + + #[inline] + fn on_start_iterate(&mut self) -> Self::DurationRecorder {} + + #[inline] + fn on_finish_iterate(&mut self, _dr: Self::DurationRecorder, _rows: usize) {} + + #[inline] + fn collect_into(&mut self, _target: &mut [Option]) {} +} diff --git a/src/coprocessor/dag/mod.rs b/src/coprocessor/dag/mod.rs index 6aaafdca6df..712d03c9822 100644 --- a/src/coprocessor/dag/mod.rs +++ b/src/coprocessor/dag/mod.rs @@ -26,6 +26,7 @@ pub mod aggr_fn; pub mod batch; pub mod batch_handler; pub mod builder; +pub mod exec_summary; pub mod executor; pub mod expr; pub mod handler; From 1256fe704d8773e21f4c87d6e1bec0b061561263 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 29 Apr 2019 17:54:06 +0800 Subject: [PATCH 2/8] Simplify exec summary to improve performance Signed-off-by: Breezewish --- src/coprocessor/dag/batch/executors/limit.rs | 4 +-- .../batch/executors/table_scan_executor.rs | 14 ++++---- src/coprocessor/dag/batch/statistics.rs | 17 ++++++---- src/coprocessor/dag/batch_handler.rs | 31 ++++++++++-------- src/coprocessor/dag/builder.rs | 14 ++++++-- src/coprocessor/dag/exec_summary.rs | 32 ++++++------------- 6 files changed, 57 insertions(+), 55 deletions(-) diff --git a/src/coprocessor/dag/batch/executors/limit.rs b/src/coprocessor/dag/batch/executors/limit.rs index 4ffc8b58c8d..fef8307fba7 100644 --- a/src/coprocessor/dag/batch/executors/limit.rs +++ b/src/coprocessor/dag/batch/executors/limit.rs @@ -195,7 +195,7 @@ mod tests { // Collected statistics remain unchanged until `next_batch` generated delta statistics. for _ in 0..2 { executor.collect_statistics(&mut s); - let exec_summary = s.summary_per_executor[1].as_ref().unwrap(); + let exec_summary = s.summary_per_executor[1]; assert_eq!(3, exec_summary.num_produced_rows); assert_eq!(2, exec_summary.num_iterations); } @@ -203,7 +203,7 @@ mod tests { // we get 1 row since the limit is 4 executor.next_batch(10); executor.collect_statistics(&mut s); - let exec_summary = s.summary_per_executor[1].as_ref().unwrap(); + let exec_summary = s.summary_per_executor[1]; assert_eq!(4, exec_summary.num_produced_rows); assert_eq!(3, exec_summary.num_iterations); } diff --git a/src/coprocessor/dag/batch/executors/table_scan_executor.rs b/src/coprocessor/dag/batch/executors/table_scan_executor.rs index efd3dc36271..ef6d44d92f7 100644 --- a/src/coprocessor/dag/batch/executors/table_scan_executor.rs +++ b/src/coprocessor/dag/batch/executors/table_scan_executor.rs @@ -673,9 +673,9 @@ mod tests { executor.collect_statistics(&mut s); assert_eq!(s.scanned_rows_per_range[0], 3); - // 0 is none because our output index is 1 - assert!(s.summary_per_executor[0].is_none()); - let exec_summary = s.summary_per_executor[1].as_ref().unwrap(); + // 0 remains Default because our output index is 1 + assert_eq!(s.summary_per_executor[0], ExecSummary::default()); + let exec_summary = s.summary_per_executor[1]; assert_eq!(3, exec_summary.num_produced_rows); assert_eq!(2, exec_summary.num_iterations); @@ -683,8 +683,8 @@ mod tests { // Collected statistics remain unchanged because of no newly generated delta statistics. assert_eq!(s.scanned_rows_per_range[0], 3); - assert!(s.summary_per_executor[0].is_none()); - let exec_summary = s.summary_per_executor[1].as_ref().unwrap(); + assert_eq!(s.summary_per_executor[0], ExecSummary::default()); + let exec_summary = s.summary_per_executor[1]; assert_eq!(3, exec_summary.num_produced_rows); assert_eq!(2, exec_summary.num_iterations); @@ -694,8 +694,8 @@ mod tests { executor.collect_statistics(&mut s); assert_eq!(s.scanned_rows_per_range[0], 2); - assert!(s.summary_per_executor[0].is_none()); - let exec_summary = s.summary_per_executor[1].as_ref().unwrap(); + assert_eq!(s.summary_per_executor[0], ExecSummary::default()); + let exec_summary = s.summary_per_executor[1]; assert_eq!(2, exec_summary.num_produced_rows); assert_eq!(1, exec_summary.num_iterations); } diff --git a/src/coprocessor/dag/batch/statistics.rs b/src/coprocessor/dag/batch/statistics.rs index 291afcdf78b..19d5b3b29c6 100644 --- a/src/coprocessor/dag/batch/statistics.rs +++ b/src/coprocessor/dag/batch/statistics.rs @@ -7,8 +7,9 @@ use crate::coprocessor::dag::exec_summary::ExecSummary; /// /// Each batch executor aggregates and updates corresponding slots in this structure. pub struct BatchExecuteStatistics { - /// The execution summary of each executor. - pub summary_per_executor: Vec>, + /// The execution summary of each executor. If execution summary is not needed, it will + /// be zero sized. + pub summary_per_executor: Vec, /// For each range given in the request, how many rows are scanned. pub scanned_rows_per_range: Vec, @@ -18,20 +19,22 @@ pub struct BatchExecuteStatistics { } impl BatchExecuteStatistics { + /// Creates a new statistics instance. + /// + /// If execution summary does not need to be collected, it is safe to pass 0 to the `executors` + /// argument, which will avoid one allocation. pub fn new(executors: usize, ranges: usize) -> Self { - let mut summary_per_executor = Vec::new(); - summary_per_executor.resize_with(executors, || None); - Self { - summary_per_executor, + summary_per_executor: vec![ExecSummary::default(); executors], scanned_rows_per_range: vec![0; ranges], cf_stats: crate::storage::Statistics::default(), } } + /// Clears the statistics instance. pub fn clear(&mut self) { for item in self.summary_per_executor.iter_mut() { - *item = None; + *item = ExecSummary::default(); } for item in self.scanned_rows_per_range.iter_mut() { *item = 0; diff --git a/src/coprocessor/dag/batch_handler.rs b/src/coprocessor/dag/batch_handler.rs index e24aedd6bc1..04eb2a3385d 100644 --- a/src/coprocessor/dag/batch_handler.rs +++ b/src/coprocessor/dag/batch_handler.rs @@ -50,6 +50,9 @@ pub struct BatchDAGHandler { /// Traditional metric interface. // TODO: Deprecate it in Coprocessor DAG v2. metrics: ExecutorMetrics, + + /// Whether or not execution summary need to be collected. + collect_exec_summary: bool, } impl BatchDAGHandler { @@ -58,16 +61,17 @@ impl BatchDAGHandler { out_most_executor: Box, output_offsets: Vec, config: Arc, - ranges_len: usize, - executors_len: usize, + statistics: BatchExecuteStatistics, + collect_exec_summary: bool, ) -> Self { Self { deadline, out_most_executor, output_offsets, config, - statistics: BatchExecuteStatistics::new(executors_len, ranges_len), + statistics, metrics: ExecutorMetrics::default(), + collect_exec_summary, } } } @@ -141,23 +145,22 @@ impl RequestHandler for BatchDAGHandler { .map(|v| *v as i64) .collect(), ); - sel_resp.set_execution_summaries(RepeatedField::from_vec( - self.statistics + + if self.collect_exec_summary { + let summaries = self + .statistics .summary_per_executor .iter() .map(|summary| { let mut ret = ExecutorExecutionSummary::new(); - if let Some(summary) = summary { - ret.set_num_iterations(summary.num_iterations as u64); - ret.set_num_produced_rows(summary.num_produced_rows as u64); - ret.set_time_processed_ns( - summary.time_processed_ms as u64 * 1_000_000, - ); - } + ret.set_num_iterations(summary.num_iterations as u64); + ret.set_num_produced_rows(summary.num_produced_rows as u64); + ret.set_time_processed_ns(summary.time_processed_ns as u64); ret }) - .collect(), - )); + .collect(); + sel_resp.set_execution_summaries(RepeatedField::from_vec(summaries)); + } sel_resp.set_warnings(warnings.warnings.into()); sel_resp.set_warning_count(warnings.warning_cnt as i64); diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index 1aa825bdcf8..b6039b7b1c2 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -261,9 +261,10 @@ impl DAGBuilder { ) -> Result { let ranges_len = ranges.len(); let executors_len = req.get_executors().len(); + let collect_exec_summary = req.get_collect_execution_summaries(); let config = Arc::new(config); - let out_most_executor = if req.get_collect_execution_summaries() { + let out_most_executor = if collect_exec_summary { super::builder::DAGBuilder::build_batch::<_, ExecSummaryCollectorEnabled>( req.take_executors().into_vec(), store, @@ -297,8 +298,15 @@ impl DAGBuilder { out_most_executor, output_offsets, config, - ranges_len, - executors_len, + BatchExecuteStatistics::new( + if collect_exec_summary { + executors_len + } else { + 0 // Avoid allocation for executor summaries when it is not needed + }, + ranges_len, + ), + collect_exec_summary, )) } diff --git a/src/coprocessor/dag/exec_summary.rs b/src/coprocessor/dag/exec_summary.rs index 8405365533a..0128533ae55 100644 --- a/src/coprocessor/dag/exec_summary.rs +++ b/src/coprocessor/dag/exec_summary.rs @@ -1,10 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -/// Execution summaries to support `EXPLAIN ANALYZE` statements. -#[derive(Default)] +/// Execution summaries to support `EXPLAIN ANALYZE` statements. We don't use +/// `ExecutorExecutionSummary` directly since it is less efficient. +#[derive(Debug, Default, Copy, Clone, Add, AddAssign, PartialEq, Eq)] pub struct ExecSummary { /// Total time cost in this executor. - pub time_processed_ms: usize, + pub time_processed_ns: usize, /// How many rows this executor produced totally. pub num_produced_rows: usize, @@ -13,15 +14,6 @@ pub struct ExecSummary { pub num_iterations: usize, } -impl ExecSummary { - #[inline] - pub fn merge_into(self, target: &mut Self) { - target.time_processed_ms += self.time_processed_ms; - target.num_produced_rows += self.num_produced_rows; - target.num_iterations += self.num_iterations; - } -} - /// A trait for all execution summary collectors. pub trait ExecSummaryCollector: Send { type DurationRecorder; @@ -41,7 +33,7 @@ pub trait ExecSummaryCollector: Send { fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize); /// Takes and appends current execution summary into `target`. - fn collect_into(&mut self, target: &mut [Option]); + fn collect_into(&mut self, target: &mut [ExecSummary]); } /// A normal `ExecSummaryCollector` that simply collects execution summaries. @@ -71,18 +63,14 @@ impl ExecSummaryCollector for ExecSummaryCollectorEnabled { #[inline] fn on_finish_iterate(&mut self, dr: Self::DurationRecorder, rows: usize) { self.counts.num_produced_rows += rows; - let elapsed_time = tikv_util::time::duration_to_ms(dr.elapsed()) as usize; - self.counts.time_processed_ms += elapsed_time; + let elapsed_time = tikv_util::time::duration_to_nanos(dr.elapsed()) as usize; + self.counts.time_processed_ns += elapsed_time; } #[inline] - fn collect_into(&mut self, target: &mut [Option]) { + fn collect_into(&mut self, target: &mut [ExecSummary]) { let current_summary = std::mem::replace(&mut self.counts, ExecSummary::default()); - if let Some(t) = &mut target[self.output_index] { - current_summary.merge_into(t); - } else { - target[self.output_index] = Some(current_summary); - } + target[self.output_index] += current_summary; } } @@ -104,5 +92,5 @@ impl ExecSummaryCollector for ExecSummaryCollectorDisabled { fn on_finish_iterate(&mut self, _dr: Self::DurationRecorder, _rows: usize) {} #[inline] - fn collect_into(&mut self, _target: &mut [Option]) {} + fn collect_into(&mut self, _target: &mut [ExecSummary]) {} } From b688c4ed94f43f25adbfe823515717bf6d5cfd52 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 29 Apr 2019 17:55:06 +0800 Subject: [PATCH 3/8] Collect execution summary in normal table scan and index scan Signed-off-by: Breezewish --- benches/coprocessor_executors/mod.rs | 5 ++ src/coprocessor/dag/builder.rs | 38 ++++++++--- src/coprocessor/dag/executor/aggregation.rs | 61 +++++++++++++---- src/coprocessor/dag/executor/index_scan.rs | 76 ++++++++++++++++----- src/coprocessor/dag/executor/limit.rs | 6 ++ src/coprocessor/dag/executor/mod.rs | 14 +++- src/coprocessor/dag/executor/scan.rs | 34 +++++++-- src/coprocessor/dag/executor/selection.rs | 5 ++ src/coprocessor/dag/executor/table_scan.rs | 41 ++++++++--- src/coprocessor/dag/executor/topn.rs | 10 ++- src/coprocessor/dag/handler.rs | 32 ++++++++- src/coprocessor/statistics/analyze.rs | 9 ++- 12 files changed, 262 insertions(+), 69 deletions(-) diff --git a/benches/coprocessor_executors/mod.rs b/benches/coprocessor_executors/mod.rs index d7615432484..71d6efb5222 100644 --- a/benches/coprocessor_executors/mod.rs +++ b/benches/coprocessor_executors/mod.rs @@ -10,6 +10,7 @@ use tipb::executor::{IndexScan, TableScan}; use test_coprocessor::*; use tikv::coprocessor::codec::Datum; +use tikv::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; use tikv::coprocessor::dag::executor::Executor; use tikv::storage::RocksEngine; @@ -24,6 +25,7 @@ fn bench_table_scan_next( b.iter_with_setup( || { let mut executor = TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, meta.clone(), ranges.to_vec(), store.to_fixture_store(), @@ -414,6 +416,7 @@ fn bench_table_scan_multi_point_range(c: &mut Criterion) { ranges.push(table.get_record_range_one(i)); } let mut executor = TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, meta.clone(), ranges, store.to_fixture_store(), @@ -470,6 +473,7 @@ fn bench_table_scan_multi_rows(c: &mut Criterion) { b.iter_with_setup( || { let mut executor = TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, meta.clone(), vec![table.get_record_range_all()], store.to_fixture_store(), @@ -503,6 +507,7 @@ fn bench_index_scan_next( b.iter_with_setup( || { let mut executor = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, meta.clone(), ranges.to_vec(), store.to_fixture_store(), diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index b6039b7b1c2..10d62c579be 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -149,7 +149,7 @@ impl DAGBuilder { /// Builds a normal executor pipeline. /// /// Normal executors iterate rows one by one. - pub fn build_normal( + pub fn build_normal( exec_descriptors: Vec, store: S, ranges: Vec, @@ -160,7 +160,7 @@ impl DAGBuilder { let first = exec_descriptors .next() .ok_or_else(|| Error::Other(box_err!("has no executor")))?; - let mut src = Self::build_normal_first_executor(first, store, ranges, collect)?; + let mut src = Self::build_normal_first_executor::<_, C>(first, store, ranges, collect)?; for mut exec in exec_descriptors { let curr: Box = match exec.get_tp() { ExecType::TypeTableScan | ExecType::TypeIndexScan => { @@ -195,7 +195,7 @@ impl DAGBuilder { /// other executors and never receive rows from other executors. /// /// The inner-most executor must be a table scan executor or an index scan executor. - fn build_normal_first_executor( + fn build_normal_first_executor( mut first: executor::Executor, store: S, ranges: Vec, @@ -204,6 +204,7 @@ impl DAGBuilder { match first.get_tp() { ExecType::TypeTableScan => { let ex = Box::new(ScanExecutor::table_scan( + C::new(0), first.take_tbl_scan(), ranges, store, @@ -214,6 +215,7 @@ impl DAGBuilder { ExecType::TypeIndexScan => { let unique = first.get_idx_scan().get_unique(); let ex = Box::new(ScanExecutor::index_scan( + C::new(0), first.take_idx_scan(), ranges, store, @@ -237,18 +239,32 @@ impl DAGBuilder { deadline: Deadline, batch_row_limit: usize, ) -> Result { - let executor = Self::build_normal( - req.take_executors().into_vec(), - store, - ranges, - Arc::new(eval_cfg), - req.get_collect_range_counts(), - )?; + let executors_len = req.get_executors().len(); + + let executor = if req.get_collect_execution_summaries() { + Self::build_normal::<_, ExecSummaryCollectorEnabled>( + req.take_executors().into_vec(), + store, + ranges, + Arc::new(eval_cfg), + req.get_collect_range_counts(), + )? + } else { + Self::build_normal::<_, ExecSummaryCollectorDisabled>( + req.take_executors().into_vec(), + store, + ranges, + Arc::new(eval_cfg), + req.get_collect_range_counts(), + )? + }; Ok(super::DAGRequestHandler::new( deadline, executor, req.take_output_offsets(), batch_row_limit, + executors_len, + req.get_collect_execution_summaries(), )) } @@ -341,7 +357,7 @@ impl DAGBuilder { let build_batch_result = super::builder::DAGBuilder::check_build_batch(req.get_executors()); if let Err(e) = build_batch_result { - debug!("Coprocessor request cannot be batched"; "reason" => %e); + info!("Coprocessor request cannot be batched"; "reason" => %e); } else { is_batch = true; } diff --git a/src/coprocessor/dag/executor/aggregation.rs b/src/coprocessor/dag/executor/aggregation.rs index a79ec58397b..ac37271b3b4 100644 --- a/src/coprocessor/dag/executor/aggregation.rs +++ b/src/coprocessor/dag/executor/aggregation.rs @@ -10,6 +10,7 @@ use tipb::expression::{Expr, ExprType}; use tikv_util::collections::{OrderMap, OrderMapEntry}; use crate::coprocessor::codec::datum::{self, Datum}; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::*; @@ -139,6 +140,10 @@ impl AggExecutor { } } + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } + fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } @@ -248,6 +253,10 @@ impl Executor for HashAggExecutor { fn get_len_of_columns(&self) -> usize { self.inner.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.inner.collect_execution_summaries(target); + } } impl Executor for StreamAggExecutor { @@ -296,6 +305,10 @@ impl Executor for StreamAggExecutor { fn get_len_of_columns(&self) -> usize { self.inner.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.inner.collect_execution_summaries(target); + } } // StreamAggExecutor deals with the aggregation functions. @@ -393,17 +406,17 @@ mod tests { use tipb::expression::{Expr, ExprType}; use tipb::schema::ColumnInfo; - use crate::coprocessor::codec::datum::{self, Datum}; - use crate::coprocessor::codec::mysql::decimal::Decimal; - use crate::coprocessor::codec::table; - use crate::storage::SnapshotStore; - use tikv_util::collections::HashMap; - use super::super::index_scan::tests::IndexTestWrapper; use super::super::index_scan::IndexScanExecutor; use super::super::tests::*; use super::*; + use crate::coprocessor::codec::datum::{self, Datum}; + use crate::coprocessor::codec::mysql::decimal::Decimal; + use crate::coprocessor::codec::table; + use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; use crate::coprocessor::dag::scanner::tests::Data; + use crate::storage::SnapshotStore; + use tikv_util::collections::HashMap; fn build_group_by(col_ids: &[i64]) -> Vec { let mut group_by = Vec::with_capacity(col_ids.len()); @@ -494,9 +507,15 @@ mod tests { let mut wrapper = IndexTestWrapper::new(unique, idx_data); let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let is_executor = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true) - .unwrap(); + let is_executor = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + unique, + true, + ) + .unwrap(); // init the stream aggregation executor let mut agg_ect = StreamAggExecutor::new( Arc::new(EvalConfig::default()), @@ -526,9 +545,15 @@ mod tests { let mut wrapper = IndexTestWrapper::new(unique, idx_data); let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let is_executor = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true) - .unwrap(); + let is_executor = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + unique, + true, + ) + .unwrap(); // init the stream aggregation executor let mut agg_ect = StreamAggExecutor::new( Arc::new(EvalConfig::default()), @@ -576,9 +601,15 @@ mod tests { let mut wrapper = IndexTestWrapper::new(unique, idx_data); let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let is_executor = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true) - .unwrap(); + let is_executor = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + unique, + true, + ) + .unwrap(); // init the stream aggregation executor let mut agg_ect = StreamAggExecutor::new( Arc::new(EvalConfig::default()), diff --git a/src/coprocessor/dag/executor/index_scan.rs b/src/coprocessor/dag/executor/index_scan.rs index c31baa9d296..7190ed507ca 100644 --- a/src/coprocessor/dag/executor/index_scan.rs +++ b/src/coprocessor/dag/executor/index_scan.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use super::{scan::InnerExecutor, Row, ScanExecutor}; use crate::coprocessor::codec::table; +use crate::coprocessor::dag::exec_summary::ExecSummaryCollector; use crate::coprocessor::{util, Result}; use crate::storage::Store; use kvproto::coprocessor::KeyRange; @@ -81,8 +82,9 @@ impl InnerExecutor for IndexInnerExecutor { } } -impl ScanExecutor { +impl ScanExecutor { pub fn index_scan( + summary_collector: C, mut meta: IndexScan, key_ranges: Vec, store: S, @@ -91,10 +93,19 @@ impl ScanExecutor { ) -> Result { let columns = meta.get_columns().to_vec(); let inner = IndexInnerExecutor::new(&mut meta, unique); - Self::new(inner, meta.get_desc(), columns, key_ranges, store, collect) + Self::new( + summary_collector, + inner, + meta.get_desc(), + columns, + key_ranges, + store, + collect, + ) } pub fn index_scan_with_cols_len( + summary_collector: C, cols: i64, key_ranges: Vec, store: S, @@ -105,11 +116,19 @@ impl ScanExecutor { pk_col: None, unique: false, }; - Self::new(inner, false, vec![], key_ranges, store, false) + Self::new( + summary_collector, + inner, + false, + vec![], + key_ranges, + store, + false, + ) } } -pub type IndexScanExecutor = ScanExecutor; +pub type IndexScanExecutor = ScanExecutor; #[cfg(test)] pub mod tests { @@ -127,6 +146,7 @@ pub mod tests { use super::super::tests::*; use super::*; + use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; use crate::coprocessor::dag::executor::Executor; use crate::coprocessor::dag::scanner::tests::Data; @@ -292,9 +312,15 @@ pub mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut scanner = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, false, false) - .unwrap(); + let mut scanner = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + false, + false, + ) + .unwrap(); for handle in 0..KEY_NUMBER / 2 { let row = scanner.next().unwrap().unwrap().take_origin(); @@ -347,9 +373,15 @@ pub mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut scanner = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true) - .unwrap(); + let mut scanner = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + unique, + true, + ) + .unwrap(); for handle in 0..KEY_NUMBER { let row = scanner.next().unwrap().unwrap().take_origin(); assert_eq!(row.handle, handle as i64); @@ -400,9 +432,15 @@ pub mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut scanner = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, false) - .unwrap(); + let mut scanner = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + unique, + false, + ) + .unwrap(); for tid in 0..KEY_NUMBER { let handle = KEY_NUMBER - tid - 1; @@ -425,9 +463,15 @@ pub mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut scanner = - IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, false, false) - .unwrap(); + let mut scanner = IndexScanExecutor::index_scan( + ExecSummaryCollectorDisabled, + wrapper.scan, + wrapper.ranges, + store, + false, + false, + ) + .unwrap(); for handle in 0..KEY_NUMBER { let row = scanner.next().unwrap().unwrap().take_origin(); diff --git a/src/coprocessor/dag/executor/limit.rs b/src/coprocessor/dag/executor/limit.rs index 611ca622b34..5d31d09a8ef 100644 --- a/src/coprocessor/dag/executor/limit.rs +++ b/src/coprocessor/dag/executor/limit.rs @@ -3,6 +3,8 @@ use tipb::executor::Limit; use super::ExecutorMetrics; + +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::executor::{Executor, Row}; use crate::coprocessor::dag::expr::EvalWarnings; use crate::coprocessor::Result; @@ -58,6 +60,10 @@ impl<'a> Executor for LimitExecutor<'a> { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/executor/mod.rs b/src/coprocessor/dag/executor/mod.rs index 5bd29c905e7..1dd82b68dca 100644 --- a/src/coprocessor/dag/executor/mod.rs +++ b/src/coprocessor/dag/executor/mod.rs @@ -13,6 +13,7 @@ use tikv_util::collections::HashSet; use crate::coprocessor::codec::datum::{self, Datum, DatumEncoder}; use crate::coprocessor::codec::table::{self, RowColsDict}; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalContext, EvalWarnings}; use crate::coprocessor::util; use crate::coprocessor::*; @@ -238,6 +239,7 @@ pub trait Executor { fn next(&mut self) -> Result>; fn collect_output_counts(&mut self, counts: &mut Vec); fn collect_metrics_into(&mut self, metrics: &mut ExecutorMetrics); + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]); fn get_len_of_columns(&self) -> usize; /// Only executors with eval computation need to implement `take_eval_warnings` @@ -261,6 +263,7 @@ pub trait Executor { pub mod tests { use super::{Executor, TableScanExecutor}; use crate::coprocessor::codec::{table, Datum}; + use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; use crate::storage::kv::{Engine, Modify, RocksEngine, RocksSnapshot, TestEngineBuilder}; use crate::storage::mvcc::MvccTxn; use crate::storage::SnapshotStore; @@ -407,6 +410,15 @@ pub mod tests { let (snapshot, start_ts) = test_store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - Box::new(TableScanExecutor::table_scan(table_scan, key_ranges, store, true).unwrap()) + Box::new( + TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, + table_scan, + key_ranges, + store, + true, + ) + .unwrap(), + ) } } diff --git a/src/coprocessor/dag/executor/scan.rs b/src/coprocessor/dag/executor/scan.rs index 64d4dde1ff4..1ca27888512 100644 --- a/src/coprocessor/dag/executor/scan.rs +++ b/src/coprocessor/dag/executor/scan.rs @@ -2,12 +2,14 @@ use std::{iter::Peekable, mem, sync::Arc, vec::IntoIter}; +use kvproto::coprocessor::KeyRange; +use tipb::schema::ColumnInfo; + use super::{Executor, ExecutorMetrics, Row}; use crate::coprocessor::codec::table; +use crate::coprocessor::dag::exec_summary::{ExecSummary, ExecSummaryCollector}; use crate::coprocessor::{Error, Result}; use crate::storage::{Key, Store}; -use kvproto::coprocessor::KeyRange; -use tipb::schema::ColumnInfo; // an InnerExecutor is used in ScanExecutor, // hold the different logics between table scan and index scan @@ -29,7 +31,8 @@ pub trait InnerExecutor { } // Executor for table scan and index scan -pub struct ScanExecutor { +pub struct ScanExecutor { + summary_collector: C, store: S, desc: bool, key_ranges: Peekable>, @@ -43,8 +46,9 @@ pub struct ScanExecutor { first_collect: bool, } -impl ScanExecutor { +impl ScanExecutor { pub fn new( + summary_collector: C, inner: T, desc: bool, columns: Vec, @@ -59,6 +63,7 @@ impl ScanExecutor { let counts = if collect { Some(Vec::default()) } else { None }; Ok(Self { + summary_collector, inner, store, desc, @@ -114,10 +119,8 @@ impl ScanExecutor { ) .map_err(Error::from) } -} -impl Executor for ScanExecutor { - fn next(&mut self) -> Result> { + fn next_impl(&mut self) -> Result> { loop { if let Some(row) = self.get_row_from_range_scanner()? { self.inc_last_count(); @@ -147,6 +150,19 @@ impl Executor for ScanExecutor { return Ok(None); } } +} + +impl Executor for ScanExecutor { + fn next(&mut self) -> Result> { + let timer = self.summary_collector.on_start_iterate(); + let ret = self.next_impl(); + if let Ok(Some(_)) = ret { + self.summary_collector.on_finish_iterate(timer, 1); + } else { + self.summary_collector.on_finish_iterate(timer, 0); + } + ret + } fn collect_output_counts(&mut self, counts: &mut Vec) { if let Some(cur_counts) = self.counts.as_mut() { @@ -170,6 +186,10 @@ impl Executor for ScanExecutor { } } + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.summary_collector.collect_into(target); + } + fn get_len_of_columns(&self) -> usize { self.columns.len() } diff --git a/src/coprocessor/dag/executor/selection.rs b/src/coprocessor/dag/executor/selection.rs index bffd04d808a..a18bad5e11a 100644 --- a/src/coprocessor/dag/executor/selection.rs +++ b/src/coprocessor/dag/executor/selection.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use tipb::executor::Selection; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::Result; @@ -78,6 +79,10 @@ impl Executor for SelectionExecutor { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/executor/table_scan.rs b/src/coprocessor/dag/executor/table_scan.rs index 071a274afb4..dd13d1133ab 100644 --- a/src/coprocessor/dag/executor/table_scan.rs +++ b/src/coprocessor/dag/executor/table_scan.rs @@ -5,6 +5,7 @@ use tikv_util::collections::HashSet; use super::{scan::InnerExecutor, Row, ScanExecutor}; use crate::coprocessor::codec::table; +use crate::coprocessor::dag::exec_summary::ExecSummaryCollector; use crate::coprocessor::{util, Result}; use crate::storage::Store; use kvproto::coprocessor::KeyRange; @@ -55,8 +56,9 @@ impl InnerExecutor for TableInnerExecutor { } } -impl ScanExecutor { +impl ScanExecutor { pub fn table_scan( + summary_collector: C, mut meta: TableScan, key_ranges: Vec, store: S, @@ -64,6 +66,7 @@ impl ScanExecutor { ) -> Result { let inner = TableInnerExecutor::new(&meta); Self::new( + summary_collector, inner, meta.get_desc(), meta.take_columns().to_vec(), @@ -74,7 +77,7 @@ impl ScanExecutor { } } -pub type TableScanExecutor = ScanExecutor; +pub type TableScanExecutor = ScanExecutor; #[cfg(test)] mod tests { @@ -91,6 +94,7 @@ mod tests { tests::{get_range, TestStore}, Executor, }; + use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; const TABLE_ID: i64 = 1; const KEY_NUMBER: usize = 10; @@ -143,9 +147,14 @@ mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut table_scanner = - super::TableScanExecutor::table_scan(wrapper.table_scan, wrapper.ranges, store, true) - .unwrap(); + let mut table_scanner = super::TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, + wrapper.table_scan, + wrapper.ranges, + store, + true, + ) + .unwrap(); let row = table_scanner.next().unwrap().unwrap().take_origin(); assert_eq!(row.handle, handle as i64); @@ -180,9 +189,14 @@ mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut table_scanner = - super::TableScanExecutor::table_scan(wrapper.table_scan, wrapper.ranges, store, true) - .unwrap(); + let mut table_scanner = super::TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, + wrapper.table_scan, + wrapper.ranges, + store, + true, + ) + .unwrap(); for handle in 0..KEY_NUMBER { let row = table_scanner.next().unwrap().unwrap().take_origin(); @@ -216,9 +230,14 @@ mod tests { let (snapshot, start_ts) = wrapper.store.get_snapshot(); let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true); - let mut table_scanner = - super::TableScanExecutor::table_scan(wrapper.table_scan, wrapper.ranges, store, true) - .unwrap(); + let mut table_scanner = super::TableScanExecutor::table_scan( + ExecSummaryCollectorDisabled, + wrapper.table_scan, + wrapper.ranges, + store, + true, + ) + .unwrap(); for tid in 0..KEY_NUMBER { let handle = KEY_NUMBER - tid - 1; diff --git a/src/coprocessor/dag/executor/topn.rs b/src/coprocessor/dag/executor/topn.rs index 4a248ef027d..abf47ef6d7b 100644 --- a/src/coprocessor/dag/executor/topn.rs +++ b/src/coprocessor/dag/executor/topn.rs @@ -8,13 +8,13 @@ use std::vec::IntoIter; use tipb::executor::TopN; use tipb::expression::ByItem; +use super::topn_heap::TopNHeap; +use super::{Executor, ExecutorMetrics, ExprColumnRefVisitor, Row}; use crate::coprocessor::codec::datum::Datum; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::Result; -use super::topn_heap::TopNHeap; -use super::{Executor, ExecutorMetrics, ExprColumnRefVisitor, Row}; - struct OrderBy { items: Arc>, exprs: Vec, @@ -147,6 +147,10 @@ impl Executor for TopNExecutor { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/handler.rs b/src/coprocessor/dag/handler.rs index 7f14ed8f718..8a39fdcf756 100644 --- a/src/coprocessor/dag/handler.rs +++ b/src/coprocessor/dag/handler.rs @@ -2,18 +2,23 @@ use kvproto::coprocessor::{KeyRange, Response}; use protobuf::{Message, RepeatedField}; +use tipb::executor::ExecutorExecutionSummary; use tipb::select::{Chunk, SelectResponse, StreamResponse}; -use crate::coprocessor::*; - use super::executor::{Executor, ExecutorMetrics}; +use crate::coprocessor::dag::exec_summary::ExecSummary; +use crate::coprocessor::*; + /// Handles Coprocessor DAG requests. pub struct DAGRequestHandler { deadline: Deadline, executor: Box, output_offsets: Vec, batch_row_limit: usize, + /// To construct ExecutionSummary target. + number_of_executors: usize, + collect_execution_summary: bool, } impl DAGRequestHandler { @@ -22,12 +27,16 @@ impl DAGRequestHandler { executor: Box, output_offsets: Vec, batch_row_limit: usize, + number_of_executors: usize, + collect_execution_summary: bool, ) -> Self { Self { deadline, executor, output_offsets, batch_row_limit, + number_of_executors, + collect_execution_summary, } } @@ -79,6 +88,25 @@ impl RequestHandler for DAGRequestHandler { } self.executor .collect_output_counts(sel_resp.mut_output_counts()); + + if self.collect_execution_summary { + let mut summary_per_executor = + vec![ExecSummary::default(); self.number_of_executors]; + self.executor + .collect_execution_summaries(&mut summary_per_executor); + let summaries = summary_per_executor + .iter() + .map(|summary| { + let mut ret = ExecutorExecutionSummary::new(); + ret.set_num_iterations(summary.num_iterations as u64); + ret.set_num_produced_rows(summary.num_produced_rows as u64); + ret.set_time_processed_ns(summary.time_processed_ns as u64); + ret + }) + .collect(); + sel_resp.set_execution_summaries(RepeatedField::from_vec(summaries)); + } + let data = box_try!(sel_resp.write_to_bytes()); resp.set_data(data); return Ok(resp); diff --git a/src/coprocessor/statistics/analyze.rs b/src/coprocessor/statistics/analyze.rs index 290a1cf4f85..b480f369e38 100644 --- a/src/coprocessor/statistics/analyze.rs +++ b/src/coprocessor/statistics/analyze.rs @@ -19,6 +19,7 @@ use crate::coprocessor::*; use super::cmsketch::CMSketch; use super::fmsketch::FMSketch; use super::histogram::Histogram; +use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; // `AnalyzeContext` is used to handle `AnalyzeReq` pub struct AnalyzeContext { @@ -72,7 +73,7 @@ impl AnalyzeContext { // it would build a histogram and count-min sketch of index values. fn handle_index( req: AnalyzeIndexReq, - scanner: &mut IndexScanExecutor>, + scanner: &mut IndexScanExecutor>, ) -> Result> { let mut hist = Histogram::new(req.get_bucket_size() as usize); let mut cms = CMSketch::new( @@ -105,6 +106,7 @@ impl RequestHandler for AnalyzeContext { AnalyzeType::TypeIndex => { let req = self.req.take_idx_req(); let mut scanner = ScanExecutor::index_scan_with_cols_len( + ExecSummaryCollectorDisabled, i64::from(req.get_num_columns()), mem::replace(&mut self.ranges, Vec::new()), self.snap.take().unwrap(), @@ -145,7 +147,7 @@ impl RequestHandler for AnalyzeContext { } struct SampleBuilder { - data: TableScanExecutor>, + data: TableScanExecutor>, // the number of columns need to be sampled. It equals to cols.len() // if cols[0] is not pk handle, or it should be cols.len() - 1. col_len: usize, @@ -177,7 +179,8 @@ impl SampleBuilder { let mut meta = TableScan::new(); meta.set_columns(cols_info); - let table_scanner = ScanExecutor::table_scan(meta, ranges, snap, false)?; + let table_scanner = + ScanExecutor::table_scan(ExecSummaryCollectorDisabled, meta, ranges, snap, false)?; Ok(Self { data: table_scanner, col_len, From 38a65a5e8bfc14d550225bda43a88779e0f5ae9c Mon Sep 17 00:00:00 2001 From: Breezewish Date: Mon, 29 Apr 2019 18:17:09 +0800 Subject: [PATCH 4/8] Support exec summary in limit Signed-off-by: Breezewish --- src/coprocessor/dag/builder.rs | 11 ++++++++++- src/coprocessor/dag/executor/limit.rs | 22 +++++++++++++++------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index 10d62c579be..26d740a53f9 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -160,8 +160,13 @@ impl DAGBuilder { let first = exec_descriptors .next() .ok_or_else(|| Error::Other(box_err!("has no executor")))?; + let mut src = Self::build_normal_first_executor::<_, C>(first, store, ranges, collect)?; + let mut summary_slot_index = 0; + for mut exec in exec_descriptors { + summary_slot_index += 1; + let curr: Box = match exec.get_tp() { ExecType::TypeTableScan | ExecType::TypeIndexScan => { return Err(box_err!("got too much *scan exec, should be only one")); @@ -184,7 +189,11 @@ impl DAGBuilder { ExecType::TypeTopN => { Box::new(TopNExecutor::new(exec.take_topN(), Arc::clone(&ctx), src)?) } - ExecType::TypeLimit => Box::new(LimitExecutor::new(exec.take_limit(), src)), + ExecType::TypeLimit => Box::new(LimitExecutor::new( + C::new(summary_slot_index), + exec.take_limit(), + src, + )), }; src = curr; } diff --git a/src/coprocessor/dag/executor/limit.rs b/src/coprocessor/dag/executor/limit.rs index 5d31d09a8ef..bf2c42b7248 100644 --- a/src/coprocessor/dag/executor/limit.rs +++ b/src/coprocessor/dag/executor/limit.rs @@ -4,22 +4,24 @@ use tipb::executor::Limit; use super::ExecutorMetrics; -use crate::coprocessor::dag::exec_summary::ExecSummary; +use crate::coprocessor::dag::exec_summary::{ExecSummary, ExecSummaryCollector}; use crate::coprocessor::dag::executor::{Executor, Row}; use crate::coprocessor::dag::expr::EvalWarnings; use crate::coprocessor::Result; /// Retrieves rows from the source executor and only produces part of the rows. -pub struct LimitExecutor<'a> { +pub struct LimitExecutor { + summary_collector: C, limit: u64, cursor: u64, - src: Box, + src: Box, first_collect: bool, } -impl<'a> LimitExecutor<'a> { - pub fn new(limit: Limit, src: Box) -> LimitExecutor<'_> { +impl LimitExecutor { + pub fn new(summary_collector: C, limit: Limit, src: Box) -> Self { LimitExecutor { + summary_collector, limit: limit.get_limit(), cursor: 0, src, @@ -28,15 +30,19 @@ impl<'a> LimitExecutor<'a> { } } -impl<'a> Executor for LimitExecutor<'a> { +impl Executor for LimitExecutor { fn next(&mut self) -> Result> { + let timer = self.summary_collector.on_start_iterate(); if self.cursor >= self.limit { + self.summary_collector.on_finish_iterate(timer, 0); return Ok(None); } if let Some(row) = self.src.next()? { self.cursor += 1; + self.summary_collector.on_finish_iterate(timer, 1); Ok(Some(row)) } else { + self.summary_collector.on_finish_iterate(timer, 0); Ok(None) } } @@ -63,6 +69,7 @@ impl<'a> Executor for LimitExecutor<'a> { fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { self.src.collect_execution_summaries(target); + self.summary_collector.collect_into(target); } } @@ -73,6 +80,7 @@ mod tests { use super::super::tests::*; use super::*; + use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled; #[test] fn test_limit_executor() { @@ -102,7 +110,7 @@ mod tests { let limit = 5; limit_meta.set_limit(limit); // init topn executor - let mut limit_ect = LimitExecutor::new(limit_meta, ts_ect); + let mut limit_ect = LimitExecutor::new(ExecSummaryCollectorDisabled, limit_meta, ts_ect); let mut limit_rows = Vec::with_capacity(limit as usize); while let Some(row) = limit_ect.next().unwrap() { limit_rows.push(row.take_origin()); From e7f34d845b7581cdeb2699accc369e0afc1553f7 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Tue, 30 Apr 2019 10:41:27 +0800 Subject: [PATCH 5/8] Rename according to comments Signed-off-by: Breezewish --- src/coprocessor/dag/batch/statistics.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/coprocessor/dag/batch/statistics.rs b/src/coprocessor/dag/batch/statistics.rs index 19d5b3b29c6..f6373a150d8 100644 --- a/src/coprocessor/dag/batch/statistics.rs +++ b/src/coprocessor/dag/batch/statistics.rs @@ -23,10 +23,10 @@ impl BatchExecuteStatistics { /// /// If execution summary does not need to be collected, it is safe to pass 0 to the `executors` /// argument, which will avoid one allocation. - pub fn new(executors: usize, ranges: usize) -> Self { + pub fn new(executors_len: usize, ranges_len: usize) -> Self { Self { - summary_per_executor: vec![ExecSummary::default(); executors], - scanned_rows_per_range: vec![0; ranges], + summary_per_executor: vec![ExecSummary::default(); executors_len], + scanned_rows_per_range: vec![0; ranges_len], cf_stats: crate::storage::Statistics::default(), } } From d5adda1a53663d4bf7c24ceb76747d3fc25c4a9a Mon Sep 17 00:00:00 2001 From: Breezewish Date: Sun, 5 May 2019 12:50:30 +0800 Subject: [PATCH 6/8] Add execution summary framework for non-batch executors Signed-off-by: Breezewish --- src/coprocessor/dag/builder.rs | 36 ++++++++++++++------- src/coprocessor/dag/executor/aggregation.rs | 13 ++++++++ src/coprocessor/dag/executor/limit.rs | 6 ++++ src/coprocessor/dag/executor/mod.rs | 2 ++ src/coprocessor/dag/executor/scan.rs | 10 ++++-- src/coprocessor/dag/executor/selection.rs | 5 +++ src/coprocessor/dag/executor/topn.rs | 10 ++++-- src/coprocessor/dag/handler.rs | 32 ++++++++++++++++-- 8 files changed, 96 insertions(+), 18 deletions(-) diff --git a/src/coprocessor/dag/builder.rs b/src/coprocessor/dag/builder.rs index b6039b7b1c2..75ad5db3772 100644 --- a/src/coprocessor/dag/builder.rs +++ b/src/coprocessor/dag/builder.rs @@ -149,7 +149,7 @@ impl DAGBuilder { /// Builds a normal executor pipeline. /// /// Normal executors iterate rows one by one. - pub fn build_normal( + pub fn build_normal( exec_descriptors: Vec, store: S, ranges: Vec, @@ -160,7 +160,7 @@ impl DAGBuilder { let first = exec_descriptors .next() .ok_or_else(|| Error::Other(box_err!("has no executor")))?; - let mut src = Self::build_normal_first_executor(first, store, ranges, collect)?; + let mut src = Self::build_normal_first_executor::<_, C>(first, store, ranges, collect)?; for mut exec in exec_descriptors { let curr: Box = match exec.get_tp() { ExecType::TypeTableScan | ExecType::TypeIndexScan => { @@ -195,7 +195,7 @@ impl DAGBuilder { /// other executors and never receive rows from other executors. /// /// The inner-most executor must be a table scan executor or an index scan executor. - fn build_normal_first_executor( + fn build_normal_first_executor( mut first: executor::Executor, store: S, ranges: Vec, @@ -237,18 +237,32 @@ impl DAGBuilder { deadline: Deadline, batch_row_limit: usize, ) -> Result { - let executor = Self::build_normal( - req.take_executors().into_vec(), - store, - ranges, - Arc::new(eval_cfg), - req.get_collect_range_counts(), - )?; + let executors_len = req.get_executors().len(); + + let executor = if req.get_collect_execution_summaries() { + Self::build_normal::<_, ExecSummaryCollectorEnabled>( + req.take_executors().into_vec(), + store, + ranges, + Arc::new(eval_cfg), + req.get_collect_range_counts(), + )? + } else { + Self::build_normal::<_, ExecSummaryCollectorDisabled>( + req.take_executors().into_vec(), + store, + ranges, + Arc::new(eval_cfg), + req.get_collect_range_counts(), + )? + }; Ok(super::DAGRequestHandler::new( deadline, executor, req.take_output_offsets(), batch_row_limit, + executors_len, + req.get_collect_execution_summaries(), )) } @@ -341,7 +355,7 @@ impl DAGBuilder { let build_batch_result = super::builder::DAGBuilder::check_build_batch(req.get_executors()); if let Err(e) = build_batch_result { - debug!("Coprocessor request cannot be batched"; "reason" => %e); + info!("Coprocessor request cannot be batched"; "reason" => %e); } else { is_batch = true; } diff --git a/src/coprocessor/dag/executor/aggregation.rs b/src/coprocessor/dag/executor/aggregation.rs index a79ec58397b..aee4cdafb09 100644 --- a/src/coprocessor/dag/executor/aggregation.rs +++ b/src/coprocessor/dag/executor/aggregation.rs @@ -10,6 +10,7 @@ use tipb::expression::{Expr, ExprType}; use tikv_util::collections::{OrderMap, OrderMapEntry}; use crate::coprocessor::codec::datum::{self, Datum}; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::*; @@ -139,6 +140,10 @@ impl AggExecutor { } } + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } + fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } @@ -248,6 +253,10 @@ impl Executor for HashAggExecutor { fn get_len_of_columns(&self) -> usize { self.inner.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.inner.collect_execution_summaries(target); + } } impl Executor for StreamAggExecutor { @@ -296,6 +305,10 @@ impl Executor for StreamAggExecutor { fn get_len_of_columns(&self) -> usize { self.inner.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.inner.collect_execution_summaries(target); + } } // StreamAggExecutor deals with the aggregation functions. diff --git a/src/coprocessor/dag/executor/limit.rs b/src/coprocessor/dag/executor/limit.rs index 611ca622b34..5d31d09a8ef 100644 --- a/src/coprocessor/dag/executor/limit.rs +++ b/src/coprocessor/dag/executor/limit.rs @@ -3,6 +3,8 @@ use tipb::executor::Limit; use super::ExecutorMetrics; + +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::executor::{Executor, Row}; use crate::coprocessor::dag::expr::EvalWarnings; use crate::coprocessor::Result; @@ -58,6 +60,10 @@ impl<'a> Executor for LimitExecutor<'a> { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/executor/mod.rs b/src/coprocessor/dag/executor/mod.rs index 5bd29c905e7..118d7631da4 100644 --- a/src/coprocessor/dag/executor/mod.rs +++ b/src/coprocessor/dag/executor/mod.rs @@ -13,6 +13,7 @@ use tikv_util::collections::HashSet; use crate::coprocessor::codec::datum::{self, Datum, DatumEncoder}; use crate::coprocessor::codec::table::{self, RowColsDict}; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalContext, EvalWarnings}; use crate::coprocessor::util; use crate::coprocessor::*; @@ -238,6 +239,7 @@ pub trait Executor { fn next(&mut self) -> Result>; fn collect_output_counts(&mut self, counts: &mut Vec); fn collect_metrics_into(&mut self, metrics: &mut ExecutorMetrics); + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]); fn get_len_of_columns(&self) -> usize; /// Only executors with eval computation need to implement `take_eval_warnings` diff --git a/src/coprocessor/dag/executor/scan.rs b/src/coprocessor/dag/executor/scan.rs index 64d4dde1ff4..c7d097f3106 100644 --- a/src/coprocessor/dag/executor/scan.rs +++ b/src/coprocessor/dag/executor/scan.rs @@ -2,12 +2,14 @@ use std::{iter::Peekable, mem, sync::Arc, vec::IntoIter}; +use kvproto::coprocessor::KeyRange; +use tipb::schema::ColumnInfo; + use super::{Executor, ExecutorMetrics, Row}; use crate::coprocessor::codec::table; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::{Error, Result}; use crate::storage::{Key, Store}; -use kvproto::coprocessor::KeyRange; -use tipb::schema::ColumnInfo; // an InnerExecutor is used in ScanExecutor, // hold the different logics between table scan and index scan @@ -170,6 +172,10 @@ impl Executor for ScanExecutor { } } + fn collect_execution_summaries(&mut self, _target: &mut [ExecSummary]) { + // Do nothing for now. + } + fn get_len_of_columns(&self) -> usize { self.columns.len() } diff --git a/src/coprocessor/dag/executor/selection.rs b/src/coprocessor/dag/executor/selection.rs index bffd04d808a..a18bad5e11a 100644 --- a/src/coprocessor/dag/executor/selection.rs +++ b/src/coprocessor/dag/executor/selection.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use tipb::executor::Selection; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::Result; @@ -78,6 +79,10 @@ impl Executor for SelectionExecutor { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/executor/topn.rs b/src/coprocessor/dag/executor/topn.rs index 4a248ef027d..abf47ef6d7b 100644 --- a/src/coprocessor/dag/executor/topn.rs +++ b/src/coprocessor/dag/executor/topn.rs @@ -8,13 +8,13 @@ use std::vec::IntoIter; use tipb::executor::TopN; use tipb::expression::ByItem; +use super::topn_heap::TopNHeap; +use super::{Executor, ExecutorMetrics, ExprColumnRefVisitor, Row}; use crate::coprocessor::codec::datum::Datum; +use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::expr::{EvalConfig, EvalContext, EvalWarnings, Expression}; use crate::coprocessor::Result; -use super::topn_heap::TopNHeap; -use super::{Executor, ExecutorMetrics, ExprColumnRefVisitor, Row}; - struct OrderBy { items: Arc>, exprs: Vec, @@ -147,6 +147,10 @@ impl Executor for TopNExecutor { fn get_len_of_columns(&self) -> usize { self.src.get_len_of_columns() } + + fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) { + self.src.collect_execution_summaries(target); + } } #[cfg(test)] diff --git a/src/coprocessor/dag/handler.rs b/src/coprocessor/dag/handler.rs index 7f14ed8f718..8a39fdcf756 100644 --- a/src/coprocessor/dag/handler.rs +++ b/src/coprocessor/dag/handler.rs @@ -2,18 +2,23 @@ use kvproto::coprocessor::{KeyRange, Response}; use protobuf::{Message, RepeatedField}; +use tipb::executor::ExecutorExecutionSummary; use tipb::select::{Chunk, SelectResponse, StreamResponse}; -use crate::coprocessor::*; - use super::executor::{Executor, ExecutorMetrics}; +use crate::coprocessor::dag::exec_summary::ExecSummary; +use crate::coprocessor::*; + /// Handles Coprocessor DAG requests. pub struct DAGRequestHandler { deadline: Deadline, executor: Box, output_offsets: Vec, batch_row_limit: usize, + /// To construct ExecutionSummary target. + number_of_executors: usize, + collect_execution_summary: bool, } impl DAGRequestHandler { @@ -22,12 +27,16 @@ impl DAGRequestHandler { executor: Box, output_offsets: Vec, batch_row_limit: usize, + number_of_executors: usize, + collect_execution_summary: bool, ) -> Self { Self { deadline, executor, output_offsets, batch_row_limit, + number_of_executors, + collect_execution_summary, } } @@ -79,6 +88,25 @@ impl RequestHandler for DAGRequestHandler { } self.executor .collect_output_counts(sel_resp.mut_output_counts()); + + if self.collect_execution_summary { + let mut summary_per_executor = + vec![ExecSummary::default(); self.number_of_executors]; + self.executor + .collect_execution_summaries(&mut summary_per_executor); + let summaries = summary_per_executor + .iter() + .map(|summary| { + let mut ret = ExecutorExecutionSummary::new(); + ret.set_num_iterations(summary.num_iterations as u64); + ret.set_num_produced_rows(summary.num_produced_rows as u64); + ret.set_time_processed_ns(summary.time_processed_ns as u64); + ret + }) + .collect(); + sel_resp.set_execution_summaries(RepeatedField::from_vec(summaries)); + } + let data = box_try!(sel_resp.write_to_bytes()); resp.set_data(data); return Ok(resp); From 9e49f43ec37ea509c9e5212d816029a4b4dd2789 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Sun, 5 May 2019 14:44:55 +0800 Subject: [PATCH 7/8] Address comments: - Rename to collect_exec_summary - Remove unnecessary empty line Signed-off-by: Breezewish --- src/coprocessor/dag/executor/limit.rs | 1 - src/coprocessor/dag/handler.rs | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/coprocessor/dag/executor/limit.rs b/src/coprocessor/dag/executor/limit.rs index 5d31d09a8ef..ffe80eb5dba 100644 --- a/src/coprocessor/dag/executor/limit.rs +++ b/src/coprocessor/dag/executor/limit.rs @@ -3,7 +3,6 @@ use tipb::executor::Limit; use super::ExecutorMetrics; - use crate::coprocessor::dag::exec_summary::ExecSummary; use crate::coprocessor::dag::executor::{Executor, Row}; use crate::coprocessor::dag::expr::EvalWarnings; diff --git a/src/coprocessor/dag/handler.rs b/src/coprocessor/dag/handler.rs index 8a39fdcf756..aa88be450db 100644 --- a/src/coprocessor/dag/handler.rs +++ b/src/coprocessor/dag/handler.rs @@ -18,7 +18,7 @@ pub struct DAGRequestHandler { batch_row_limit: usize, /// To construct ExecutionSummary target. number_of_executors: usize, - collect_execution_summary: bool, + collect_exec_summary: bool, } impl DAGRequestHandler { @@ -28,7 +28,7 @@ impl DAGRequestHandler { output_offsets: Vec, batch_row_limit: usize, number_of_executors: usize, - collect_execution_summary: bool, + collect_exec_summary: bool, ) -> Self { Self { deadline, @@ -36,7 +36,7 @@ impl DAGRequestHandler { output_offsets, batch_row_limit, number_of_executors, - collect_execution_summary, + collect_exec_summary, } } @@ -89,7 +89,7 @@ impl RequestHandler for DAGRequestHandler { self.executor .collect_output_counts(sel_resp.mut_output_counts()); - if self.collect_execution_summary { + if self.collect_exec_summary { let mut summary_per_executor = vec![ExecSummary::default(); self.number_of_executors]; self.executor From 198233d5294a7cfd5483ded60d9e1f2234ab4c20 Mon Sep 17 00:00:00 2001 From: Breezewish Date: Sun, 5 May 2019 18:14:26 +0800 Subject: [PATCH 8/8] Remove empty line Signed-off-by: Breezewish --- src/coprocessor/dag/executor/limit.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/coprocessor/dag/executor/limit.rs b/src/coprocessor/dag/executor/limit.rs index bf2c42b7248..2e073b63ef3 100644 --- a/src/coprocessor/dag/executor/limit.rs +++ b/src/coprocessor/dag/executor/limit.rs @@ -3,7 +3,6 @@ use tipb::executor::Limit; use super::ExecutorMetrics; - use crate::coprocessor::dag::exec_summary::{ExecSummary, ExecSummaryCollector}; use crate::coprocessor::dag::executor::{Executor, Row}; use crate::coprocessor::dag::expr::EvalWarnings;