From c772ec291d0c6dc58361cc57f04170c98857285f Mon Sep 17 00:00:00 2001 From: Wenxuan Shi Date: Fri, 1 Mar 2019 18:06:30 +0800 Subject: [PATCH] Implements batch executor handler (#4242) Signed-off-by: Breezewish --- src/coprocessor/dag/batch_handler.rs | 176 ++++++++++++++++++++ src/coprocessor/dag/executor/aggregation.rs | 2 +- src/coprocessor/dag/executor/selection.rs | 2 +- src/coprocessor/dag/executor/topn.rs | 4 +- src/coprocessor/dag/expr/ctx.rs | 10 +- src/coprocessor/dag/mod.rs | 1 + 6 files changed, 187 insertions(+), 8 deletions(-) create mode 100644 src/coprocessor/dag/batch_handler.rs diff --git a/src/coprocessor/dag/batch_handler.rs b/src/coprocessor/dag/batch_handler.rs new file mode 100644 index 00000000000..cf6a54c2927 --- /dev/null +++ b/src/coprocessor/dag/batch_handler.rs @@ -0,0 +1,176 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use protobuf::{Message, RepeatedField}; + +use kvproto::coprocessor::Response; +use tipb::select::{Chunk, SelectResponse}; + +use super::batch_executor::interface::{ + BatchExecuteStatistics, BatchExecutor, BatchExecutorContext, +}; +use super::executor::ExecutorMetrics; +use crate::coprocessor::dag::expr::EvalWarnings; +use crate::coprocessor::*; + +// TODO: The value is chosen according to some very subjective experience, which is not tuned +// carefully. We need to benchmark to find a best value. Also we may consider accepting this value +// from TiDB side. +const BATCH_INITIAL_SIZE: usize = 32; + +// TODO: This value is chosen based on MonetDB/X100's research without our own benchmarks. +const BATCH_MAX_SIZE: usize = 1024; + +// TODO: Maybe there can be some better strategy. Needs benchmarks and tunes. +const BATCH_GROW_FACTOR: usize = 2; + +/// Must be built from DAGRequestHandler. +pub struct BatchDAGHandler { + /// The deadline of this handler. For each check point (e.g. each iteration) we need to check + /// whether or not the deadline is exceeded and break the process if so. + // TODO: Deprecate it using a better deadline mechanism. + deadline: Deadline, + + out_most_executor: Box, + + /// The offset of the columns need to be outputted. For example, TiDB may only needs a subset + /// of the columns in the result so that unrelated columns don't need to be encoded and + /// returned back. + output_offsets: Vec, + + executor_context: BatchExecutorContext, + + /// Accumulated statistics. + // TODO: Currently we return statistics only once, so these statistics are accumulated only + // once. However in future when we introduce reenterable DAG processor, these statistics may + // be accumulated and returned several times during the life time of the request. At that time + // we may remove this field. + statistics: BatchExecuteStatistics, + + /// Traditional metric interface. + // TODO: Deprecate it in Coprocessor DAG v2. + metrics: ExecutorMetrics, +} + +impl BatchDAGHandler { + pub fn new( + deadline: Deadline, + out_most_executor: Box, + output_offsets: Vec, + executor_context: BatchExecutorContext, + ranges_len: usize, + ) -> Self { + Self { + deadline, + out_most_executor, + output_offsets, + executor_context, + statistics: BatchExecuteStatistics::new(ranges_len), + metrics: ExecutorMetrics::default(), + } + } +} + +impl RequestHandler for BatchDAGHandler { + fn handle_request(&mut self) -> Result { + let mut chunks = vec![]; + let mut batch_size = BATCH_INITIAL_SIZE; + let mut warnings = EvalWarnings::new(self.executor_context.config.max_warning_cnt); + + loop { + self.deadline.check_if_exceeded()?; + + let mut result = self.out_most_executor.next_batch(batch_size); + + let is_drained; + + // Check error first, because it means that we should directly respond error. + match result.is_drained { + Err(Error::Eval(err)) => { + let mut resp = Response::new(); + let mut sel_resp = SelectResponse::new(); + sel_resp.set_error(err); + let data = box_try!(sel_resp.write_to_bytes()); + resp.set_data(data); + return Ok(resp); + } + Err(e) => return Err(e), + Ok(f) => is_drained = f, + } + + warnings.merge(&mut result.warnings); + + // Notice that rows_len == 0 doesn't mean that it is drained. + if result.data.rows_len() > 0 { + let mut chunk = Chunk::new(); + { + let data = chunk.mut_rows_data(); + data.reserve(result.data.maximum_encoded_size(&self.output_offsets)?); + // TODO: We should encode using the out-most executor's own schema. Let's + // change it when we have aggregation executor. + result.data.encode( + &self.output_offsets, + &self.executor_context.columns_info, + data, + )?; + } + chunks.push(chunk); + } + + if is_drained { + self.out_most_executor + .collect_statistics(&mut self.statistics); + self.metrics.cf_stats.add(&self.statistics.cf_stats); + + let mut resp = Response::new(); + let mut sel_resp = SelectResponse::new(); + sel_resp.set_chunks(RepeatedField::from_vec(chunks)); + // TODO: output_counts should not be i64. Let's fix it in Coprocessor DAG V2. + sel_resp.set_output_counts( + self.statistics + .scanned_rows_per_range + .iter() + .map(|v| *v as i64) + .collect(), + ); + + sel_resp.set_warnings(RepeatedField::from_vec(warnings.warnings)); + sel_resp.set_warning_count(warnings.warning_cnt as i64); + + let data = box_try!(sel_resp.write_to_bytes()); + resp.set_data(data); + + // Not really useful here, because we only collect it once. But when we change it + // in future, hope we will not forget it. + self.statistics.clear(); + + return Ok(resp); + } + + // Grow batch size + if batch_size < BATCH_MAX_SIZE { + batch_size *= BATCH_GROW_FACTOR; + if batch_size > BATCH_MAX_SIZE { + batch_size = BATCH_MAX_SIZE + } + } + } + } + + fn collect_metrics_into(&mut self, target_metrics: &mut ExecutorMetrics) { + // FIXME: This interface will be broken in streaming mode. + target_metrics.merge(&mut self.metrics); + + // Notice: Exec count is collected during building the batch handler. + } +} diff --git a/src/coprocessor/dag/executor/aggregation.rs b/src/coprocessor/dag/executor/aggregation.rs index ae4ef839e8a..ec75e480061 100644 --- a/src/coprocessor/dag/executor/aggregation.rs +++ b/src/coprocessor/dag/executor/aggregation.rs @@ -143,7 +143,7 @@ impl AggExecutor { fn take_eval_warnings(&mut self) -> Option { if let Some(mut warnings) = self.src.take_eval_warnings() { - warnings.merge(self.ctx.take_warnings()); + warnings.merge(&mut self.ctx.take_warnings()); Some(warnings) } else { Some(self.ctx.take_warnings()) diff --git a/src/coprocessor/dag/executor/selection.rs b/src/coprocessor/dag/executor/selection.rs index a1c1ed2477b..69c184d3d33 100644 --- a/src/coprocessor/dag/executor/selection.rs +++ b/src/coprocessor/dag/executor/selection.rs @@ -79,7 +79,7 @@ impl Executor for SelectionExecutor { fn take_eval_warnings(&mut self) -> Option { if let Some(mut warnings) = self.src.take_eval_warnings() { - warnings.merge(self.ctx.take_warnings()); + warnings.merge(&mut self.ctx.take_warnings()); Some(warnings) } else { Some(self.ctx.take_warnings()) diff --git a/src/coprocessor/dag/executor/topn.rs b/src/coprocessor/dag/executor/topn.rs index 6910e51e222..9ae0f0c7a12 100644 --- a/src/coprocessor/dag/executor/topn.rs +++ b/src/coprocessor/dag/executor/topn.rs @@ -146,8 +146,8 @@ impl Executor for TopNExecutor { fn take_eval_warnings(&mut self) -> Option { if let Some(mut warnings) = self.src.take_eval_warnings() { - if let Some(topn_warnings) = self.eval_warnings.take() { - warnings.merge(topn_warnings); + if let Some(mut topn_warnings) = self.eval_warnings.take() { + warnings.merge(&mut topn_warnings); } Some(warnings) } else { diff --git a/src/coprocessor/dag/expr/ctx.rs b/src/coprocessor/dag/expr/ctx.rs index 1af949be04e..de63e0554a1 100644 --- a/src/coprocessor/dag/expr/ctx.rs +++ b/src/coprocessor/dag/expr/ctx.rs @@ -50,7 +50,7 @@ pub const MODE_ERROR_FOR_DIVISION_BY_ZERO: u64 = 27; const DEFAULT_MAX_WARNING_CNT: usize = 64; -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] pub struct EvalConfig { /// timezone to use when parse/calculate time. pub tz: Tz, @@ -62,6 +62,8 @@ pub struct EvalConfig { pub in_select_stmt: bool, pub pad_char_to_full_length: bool, pub divided_by_zero_as_warning: bool, + // TODO: max warning count is not really a EvalConfig. Instead it is a ExecutionConfig, because + // warning is a executor stuff instead of a evaluation stuff. pub max_warning_cnt: usize, pub sql_mode: u64, /// if the session is in strict mode. @@ -207,7 +209,7 @@ impl EvalConfig { } // Warning details caused in eval computation. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct EvalWarnings { // max number of warnings to return. max_warning_cnt: usize, @@ -218,7 +220,7 @@ pub struct EvalWarnings { } impl EvalWarnings { - fn new(max_warning_cnt: usize) -> EvalWarnings { + pub fn new(max_warning_cnt: usize) -> EvalWarnings { EvalWarnings { max_warning_cnt, warning_cnt: 0, @@ -233,7 +235,7 @@ impl EvalWarnings { } } - pub fn merge(&mut self, mut other: EvalWarnings) { + pub fn merge(&mut self, other: &mut EvalWarnings) { self.warning_cnt += other.warning_cnt; if self.warnings.len() >= self.max_warning_cnt { return; diff --git a/src/coprocessor/dag/mod.rs b/src/coprocessor/dag/mod.rs index 6e3566c3cf9..224f7335751 100644 --- a/src/coprocessor/dag/mod.rs +++ b/src/coprocessor/dag/mod.rs @@ -34,6 +34,7 @@ //! Obviously, this kind of executor must not be the first executor in the pipeline. pub mod batch_executor; +pub mod batch_handler; mod builder; pub mod executor; pub mod expr;