Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implements batch executor handler (#4242)
Signed-off-by: Breezewish <breezewish@pingcap.com>
  • Loading branch information
breezewish committed Mar 1, 2019
1 parent 371b664 commit c772ec2
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 8 deletions.
176 changes: 176 additions & 0 deletions src/coprocessor/dag/batch_handler.rs
@@ -0,0 +1,176 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use protobuf::{Message, RepeatedField};

use kvproto::coprocessor::Response;
use tipb::select::{Chunk, SelectResponse};

use super::batch_executor::interface::{
BatchExecuteStatistics, BatchExecutor, BatchExecutorContext,
};
use super::executor::ExecutorMetrics;
use crate::coprocessor::dag::expr::EvalWarnings;
use crate::coprocessor::*;

// TODO: The value is chosen according to some very subjective experience, which is not tuned
// carefully. We need to benchmark to find a best value. Also we may consider accepting this value
// from TiDB side.
const BATCH_INITIAL_SIZE: usize = 32;

// TODO: This value is chosen based on MonetDB/X100's research without our own benchmarks.
const BATCH_MAX_SIZE: usize = 1024;

// TODO: Maybe there can be some better strategy. Needs benchmarks and tunes.
const BATCH_GROW_FACTOR: usize = 2;

/// Must be built from DAGRequestHandler.
pub struct BatchDAGHandler {
/// The deadline of this handler. For each check point (e.g. each iteration) we need to check
/// whether or not the deadline is exceeded and break the process if so.
// TODO: Deprecate it using a better deadline mechanism.
deadline: Deadline,

out_most_executor: Box<dyn BatchExecutor>,

/// The offset of the columns need to be outputted. For example, TiDB may only needs a subset
/// of the columns in the result so that unrelated columns don't need to be encoded and
/// returned back.
output_offsets: Vec<u32>,

executor_context: BatchExecutorContext,

/// Accumulated statistics.
// TODO: Currently we return statistics only once, so these statistics are accumulated only
// once. However in future when we introduce reenterable DAG processor, these statistics may
// be accumulated and returned several times during the life time of the request. At that time
// we may remove this field.
statistics: BatchExecuteStatistics,

/// Traditional metric interface.
// TODO: Deprecate it in Coprocessor DAG v2.
metrics: ExecutorMetrics,
}

impl BatchDAGHandler {
pub fn new(
deadline: Deadline,
out_most_executor: Box<dyn BatchExecutor>,
output_offsets: Vec<u32>,
executor_context: BatchExecutorContext,
ranges_len: usize,
) -> Self {
Self {
deadline,
out_most_executor,
output_offsets,
executor_context,
statistics: BatchExecuteStatistics::new(ranges_len),
metrics: ExecutorMetrics::default(),
}
}
}

impl RequestHandler for BatchDAGHandler {
fn handle_request(&mut self) -> Result<Response> {
let mut chunks = vec![];
let mut batch_size = BATCH_INITIAL_SIZE;
let mut warnings = EvalWarnings::new(self.executor_context.config.max_warning_cnt);

loop {
self.deadline.check_if_exceeded()?;

let mut result = self.out_most_executor.next_batch(batch_size);

let is_drained;

// Check error first, because it means that we should directly respond error.
match result.is_drained {
Err(Error::Eval(err)) => {
let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
sel_resp.set_error(err);
let data = box_try!(sel_resp.write_to_bytes());
resp.set_data(data);
return Ok(resp);
}
Err(e) => return Err(e),
Ok(f) => is_drained = f,
}

warnings.merge(&mut result.warnings);

// Notice that rows_len == 0 doesn't mean that it is drained.
if result.data.rows_len() > 0 {
let mut chunk = Chunk::new();
{
let data = chunk.mut_rows_data();
data.reserve(result.data.maximum_encoded_size(&self.output_offsets)?);
// TODO: We should encode using the out-most executor's own schema. Let's
// change it when we have aggregation executor.
result.data.encode(
&self.output_offsets,
&self.executor_context.columns_info,
data,
)?;
}
chunks.push(chunk);
}

if is_drained {
self.out_most_executor
.collect_statistics(&mut self.statistics);
self.metrics.cf_stats.add(&self.statistics.cf_stats);

let mut resp = Response::new();
let mut sel_resp = SelectResponse::new();
sel_resp.set_chunks(RepeatedField::from_vec(chunks));
// TODO: output_counts should not be i64. Let's fix it in Coprocessor DAG V2.
sel_resp.set_output_counts(
self.statistics
.scanned_rows_per_range
.iter()
.map(|v| *v as i64)
.collect(),
);

sel_resp.set_warnings(RepeatedField::from_vec(warnings.warnings));
sel_resp.set_warning_count(warnings.warning_cnt as i64);

let data = box_try!(sel_resp.write_to_bytes());
resp.set_data(data);

// Not really useful here, because we only collect it once. But when we change it
// in future, hope we will not forget it.
self.statistics.clear();

return Ok(resp);
}

// Grow batch size
if batch_size < BATCH_MAX_SIZE {
batch_size *= BATCH_GROW_FACTOR;
if batch_size > BATCH_MAX_SIZE {
batch_size = BATCH_MAX_SIZE
}
}
}
}

fn collect_metrics_into(&mut self, target_metrics: &mut ExecutorMetrics) {
// FIXME: This interface will be broken in streaming mode.
target_metrics.merge(&mut self.metrics);

// Notice: Exec count is collected during building the batch handler.
}
}
2 changes: 1 addition & 1 deletion src/coprocessor/dag/executor/aggregation.rs
Expand Up @@ -143,7 +143,7 @@ impl AggExecutor {

fn take_eval_warnings(&mut self) -> Option<EvalWarnings> {
if let Some(mut warnings) = self.src.take_eval_warnings() {
warnings.merge(self.ctx.take_warnings());
warnings.merge(&mut self.ctx.take_warnings());
Some(warnings)
} else {
Some(self.ctx.take_warnings())
Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/dag/executor/selection.rs
Expand Up @@ -79,7 +79,7 @@ impl Executor for SelectionExecutor {

fn take_eval_warnings(&mut self) -> Option<EvalWarnings> {
if let Some(mut warnings) = self.src.take_eval_warnings() {
warnings.merge(self.ctx.take_warnings());
warnings.merge(&mut self.ctx.take_warnings());
Some(warnings)
} else {
Some(self.ctx.take_warnings())
Expand Down
4 changes: 2 additions & 2 deletions src/coprocessor/dag/executor/topn.rs
Expand Up @@ -146,8 +146,8 @@ impl Executor for TopNExecutor {

fn take_eval_warnings(&mut self) -> Option<EvalWarnings> {
if let Some(mut warnings) = self.src.take_eval_warnings() {
if let Some(topn_warnings) = self.eval_warnings.take() {
warnings.merge(topn_warnings);
if let Some(mut topn_warnings) = self.eval_warnings.take() {
warnings.merge(&mut topn_warnings);
}
Some(warnings)
} else {
Expand Down
10 changes: 6 additions & 4 deletions src/coprocessor/dag/expr/ctx.rs
Expand Up @@ -50,7 +50,7 @@ pub const MODE_ERROR_FOR_DIVISION_BY_ZERO: u64 = 27;

const DEFAULT_MAX_WARNING_CNT: usize = 64;

#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
pub struct EvalConfig {
/// timezone to use when parse/calculate time.
pub tz: Tz,
Expand All @@ -62,6 +62,8 @@ pub struct EvalConfig {
pub in_select_stmt: bool,
pub pad_char_to_full_length: bool,
pub divided_by_zero_as_warning: bool,
// TODO: max warning count is not really a EvalConfig. Instead it is a ExecutionConfig, because
// warning is a executor stuff instead of a evaluation stuff.
pub max_warning_cnt: usize,
pub sql_mode: u64,
/// if the session is in strict mode.
Expand Down Expand Up @@ -207,7 +209,7 @@ impl EvalConfig {
}

// Warning details caused in eval computation.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct EvalWarnings {
// max number of warnings to return.
max_warning_cnt: usize,
Expand All @@ -218,7 +220,7 @@ pub struct EvalWarnings {
}

impl EvalWarnings {
fn new(max_warning_cnt: usize) -> EvalWarnings {
pub fn new(max_warning_cnt: usize) -> EvalWarnings {
EvalWarnings {
max_warning_cnt,
warning_cnt: 0,
Expand All @@ -233,7 +235,7 @@ impl EvalWarnings {
}
}

pub fn merge(&mut self, mut other: EvalWarnings) {
pub fn merge(&mut self, other: &mut EvalWarnings) {
self.warning_cnt += other.warning_cnt;
if self.warnings.len() >= self.max_warning_cnt {
return;
Expand Down
1 change: 1 addition & 0 deletions src/coprocessor/dag/mod.rs
Expand Up @@ -34,6 +34,7 @@
//! Obviously, this kind of executor must not be the first executor in the pipeline.

pub mod batch_executor;
pub mod batch_handler;
mod builder;
pub mod executor;
pub mod expr;
Expand Down

0 comments on commit c772ec2

Please sign in to comment.