Skip to content

Commit

Permalink
Support exec summary for normal table / index scan executor. (tikv#4598)
Browse files Browse the repository at this point in the history
Signed-off-by: Breezewish <breezewish@pingcap.com>
  • Loading branch information
breezewish authored and jswh committed May 27, 2019
1 parent 8c5bf74 commit 9a5b08e
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 55 deletions.
5 changes: 5 additions & 0 deletions benches/coprocessor_executors/mod.rs
Expand Up @@ -10,6 +10,7 @@ use tipb::executor::{IndexScan, TableScan};

use test_coprocessor::*;
use tikv::coprocessor::codec::Datum;
use tikv::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled;
use tikv::coprocessor::dag::executor::Executor;
use tikv::storage::RocksEngine;

Expand All @@ -24,6 +25,7 @@ fn bench_table_scan_next(
b.iter_with_setup(
|| {
let mut executor = TableScanExecutor::table_scan(
ExecSummaryCollectorDisabled,
meta.clone(),
ranges.to_vec(),
store.to_fixture_store(),
Expand Down Expand Up @@ -414,6 +416,7 @@ fn bench_table_scan_multi_point_range(c: &mut Criterion) {
ranges.push(table.get_record_range_one(i));
}
let mut executor = TableScanExecutor::table_scan(
ExecSummaryCollectorDisabled,
meta.clone(),
ranges,
store.to_fixture_store(),
Expand Down Expand Up @@ -470,6 +473,7 @@ fn bench_table_scan_multi_rows(c: &mut Criterion) {
b.iter_with_setup(
|| {
let mut executor = TableScanExecutor::table_scan(
ExecSummaryCollectorDisabled,
meta.clone(),
vec![table.get_record_range_all()],
store.to_fixture_store(),
Expand Down Expand Up @@ -503,6 +507,7 @@ fn bench_index_scan_next(
b.iter_with_setup(
|| {
let mut executor = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
meta.clone(),
ranges.to_vec(),
store.to_fixture_store(),
Expand Down
4 changes: 3 additions & 1 deletion src/coprocessor/dag/builder.rs
Expand Up @@ -195,7 +195,7 @@ impl DAGBuilder {
/// other executors and never receive rows from other executors.
///
/// The inner-most executor must be a table scan executor or an index scan executor.
fn build_normal_first_executor<S: Store + 'static, _C: ExecSummaryCollector + 'static>(
fn build_normal_first_executor<S: Store + 'static, C: ExecSummaryCollector + 'static>(
mut first: executor::Executor,
store: S,
ranges: Vec<KeyRange>,
Expand All @@ -204,6 +204,7 @@ impl DAGBuilder {
match first.get_tp() {
ExecType::TypeTableScan => {
let ex = Box::new(ScanExecutor::table_scan(
C::new(0),
first.take_tbl_scan(),
ranges,
store,
Expand All @@ -214,6 +215,7 @@ impl DAGBuilder {
ExecType::TypeIndexScan => {
let unique = first.get_idx_scan().get_unique();
let ex = Box::new(ScanExecutor::index_scan(
C::new(0),
first.take_idx_scan(),
ranges,
store,
Expand Down
48 changes: 33 additions & 15 deletions src/coprocessor/dag/executor/aggregation.rs
Expand Up @@ -406,17 +406,17 @@ mod tests {
use tipb::expression::{Expr, ExprType};
use tipb::schema::ColumnInfo;

use crate::coprocessor::codec::datum::{self, Datum};
use crate::coprocessor::codec::mysql::decimal::Decimal;
use crate::coprocessor::codec::table;
use crate::storage::SnapshotStore;
use tikv_util::collections::HashMap;

use super::super::index_scan::tests::IndexTestWrapper;
use super::super::index_scan::IndexScanExecutor;
use super::super::tests::*;
use super::*;
use crate::coprocessor::codec::datum::{self, Datum};
use crate::coprocessor::codec::mysql::decimal::Decimal;
use crate::coprocessor::codec::table;
use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled;
use crate::coprocessor::dag::scanner::tests::Data;
use crate::storage::SnapshotStore;
use tikv_util::collections::HashMap;

fn build_group_by(col_ids: &[i64]) -> Vec<Expr> {
let mut group_by = Vec::with_capacity(col_ids.len());
Expand Down Expand Up @@ -507,9 +507,15 @@ mod tests {
let mut wrapper = IndexTestWrapper::new(unique, idx_data);
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);
let is_executor =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true)
.unwrap();
let is_executor = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
unique,
true,
)
.unwrap();
// init the stream aggregation executor
let mut agg_ect = StreamAggExecutor::new(
Arc::new(EvalConfig::default()),
Expand Down Expand Up @@ -539,9 +545,15 @@ mod tests {
let mut wrapper = IndexTestWrapper::new(unique, idx_data);
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);
let is_executor =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true)
.unwrap();
let is_executor = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
unique,
true,
)
.unwrap();
// init the stream aggregation executor
let mut agg_ect = StreamAggExecutor::new(
Arc::new(EvalConfig::default()),
Expand Down Expand Up @@ -589,9 +601,15 @@ mod tests {
let mut wrapper = IndexTestWrapper::new(unique, idx_data);
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);
let is_executor =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true)
.unwrap();
let is_executor = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
unique,
true,
)
.unwrap();
// init the stream aggregation executor
let mut agg_ect = StreamAggExecutor::new(
Arc::new(EvalConfig::default()),
Expand Down
76 changes: 60 additions & 16 deletions src/coprocessor/dag/executor/index_scan.rs
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

use super::{scan::InnerExecutor, Row, ScanExecutor};
use crate::coprocessor::codec::table;
use crate::coprocessor::dag::exec_summary::ExecSummaryCollector;
use crate::coprocessor::{util, Result};
use crate::storage::Store;
use kvproto::coprocessor::KeyRange;
Expand Down Expand Up @@ -81,8 +82,9 @@ impl InnerExecutor for IndexInnerExecutor {
}
}

impl<S: Store> ScanExecutor<S, IndexInnerExecutor> {
impl<C: ExecSummaryCollector, S: Store> ScanExecutor<C, S, IndexInnerExecutor> {
pub fn index_scan(
summary_collector: C,
mut meta: IndexScan,
key_ranges: Vec<KeyRange>,
store: S,
Expand All @@ -91,10 +93,19 @@ impl<S: Store> ScanExecutor<S, IndexInnerExecutor> {
) -> Result<Self> {
let columns = meta.get_columns().to_vec();
let inner = IndexInnerExecutor::new(&mut meta, unique);
Self::new(inner, meta.get_desc(), columns, key_ranges, store, collect)
Self::new(
summary_collector,
inner,
meta.get_desc(),
columns,
key_ranges,
store,
collect,
)
}

pub fn index_scan_with_cols_len(
summary_collector: C,
cols: i64,
key_ranges: Vec<KeyRange>,
store: S,
Expand All @@ -105,11 +116,19 @@ impl<S: Store> ScanExecutor<S, IndexInnerExecutor> {
pk_col: None,
unique: false,
};
Self::new(inner, false, vec![], key_ranges, store, false)
Self::new(
summary_collector,
inner,
false,
vec![],
key_ranges,
store,
false,
)
}
}

pub type IndexScanExecutor<S> = ScanExecutor<S, IndexInnerExecutor>;
pub type IndexScanExecutor<C, S> = ScanExecutor<C, S, IndexInnerExecutor>;

#[cfg(test)]
pub mod tests {
Expand All @@ -127,6 +146,7 @@ pub mod tests {

use super::super::tests::*;
use super::*;
use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled;
use crate::coprocessor::dag::executor::Executor;
use crate::coprocessor::dag::scanner::tests::Data;

Expand Down Expand Up @@ -292,9 +312,15 @@ pub mod tests {
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);

let mut scanner =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, false, false)
.unwrap();
let mut scanner = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
false,
false,
)
.unwrap();

for handle in 0..KEY_NUMBER / 2 {
let row = scanner.next().unwrap().unwrap().take_origin();
Expand Down Expand Up @@ -347,9 +373,15 @@ pub mod tests {

let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);
let mut scanner =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, true)
.unwrap();
let mut scanner = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
unique,
true,
)
.unwrap();
for handle in 0..KEY_NUMBER {
let row = scanner.next().unwrap().unwrap().take_origin();
assert_eq!(row.handle, handle as i64);
Expand Down Expand Up @@ -400,9 +432,15 @@ pub mod tests {
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);

let mut scanner =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, unique, false)
.unwrap();
let mut scanner = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
unique,
false,
)
.unwrap();

for tid in 0..KEY_NUMBER {
let handle = KEY_NUMBER - tid - 1;
Expand All @@ -425,9 +463,15 @@ pub mod tests {
let (snapshot, start_ts) = wrapper.store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);

let mut scanner =
IndexScanExecutor::index_scan(wrapper.scan, wrapper.ranges, store, false, false)
.unwrap();
let mut scanner = IndexScanExecutor::index_scan(
ExecSummaryCollectorDisabled,
wrapper.scan,
wrapper.ranges,
store,
false,
false,
)
.unwrap();

for handle in 0..KEY_NUMBER {
let row = scanner.next().unwrap().unwrap().take_origin();
Expand Down
12 changes: 11 additions & 1 deletion src/coprocessor/dag/executor/mod.rs
Expand Up @@ -263,6 +263,7 @@ pub trait Executor {
pub mod tests {
use super::{Executor, TableScanExecutor};
use crate::coprocessor::codec::{table, Datum};
use crate::coprocessor::dag::exec_summary::ExecSummaryCollectorDisabled;
use crate::storage::kv::{Engine, Modify, RocksEngine, RocksSnapshot, TestEngineBuilder};
use crate::storage::mvcc::MvccTxn;
use crate::storage::SnapshotStore;
Expand Down Expand Up @@ -409,6 +410,15 @@ pub mod tests {

let (snapshot, start_ts) = test_store.get_snapshot();
let store = SnapshotStore::new(snapshot, start_ts, IsolationLevel::SI, true);
Box::new(TableScanExecutor::table_scan(table_scan, key_ranges, store, true).unwrap())
Box::new(
TableScanExecutor::table_scan(
ExecSummaryCollectorDisabled,
table_scan,
key_ranges,
store,
true,
)
.unwrap(),
)
}
}
30 changes: 22 additions & 8 deletions src/coprocessor/dag/executor/scan.rs
Expand Up @@ -7,7 +7,7 @@ use tipb::schema::ColumnInfo;

use super::{Executor, ExecutorMetrics, Row};
use crate::coprocessor::codec::table;
use crate::coprocessor::dag::exec_summary::ExecSummary;
use crate::coprocessor::dag::exec_summary::{ExecSummary, ExecSummaryCollector};
use crate::coprocessor::{Error, Result};
use crate::storage::{Key, Store};

Expand All @@ -31,7 +31,8 @@ pub trait InnerExecutor {
}

// Executor for table scan and index scan
pub struct ScanExecutor<S: Store, T: InnerExecutor> {
pub struct ScanExecutor<C: ExecSummaryCollector, S: Store, T: InnerExecutor> {
summary_collector: C,
store: S,
desc: bool,
key_ranges: Peekable<IntoIter<KeyRange>>,
Expand All @@ -45,8 +46,9 @@ pub struct ScanExecutor<S: Store, T: InnerExecutor> {
first_collect: bool,
}

impl<S: Store, T: InnerExecutor> ScanExecutor<S, T> {
impl<C: ExecSummaryCollector, S: Store, T: InnerExecutor> ScanExecutor<C, S, T> {
pub fn new(
summary_collector: C,
inner: T,
desc: bool,
columns: Vec<ColumnInfo>,
Expand All @@ -61,6 +63,7 @@ impl<S: Store, T: InnerExecutor> ScanExecutor<S, T> {
let counts = if collect { Some(Vec::default()) } else { None };

Ok(Self {
summary_collector,
inner,
store,
desc,
Expand Down Expand Up @@ -116,10 +119,8 @@ impl<S: Store, T: InnerExecutor> ScanExecutor<S, T> {
)
.map_err(Error::from)
}
}

impl<S: Store, T: InnerExecutor> Executor for ScanExecutor<S, T> {
fn next(&mut self) -> Result<Option<Row>> {
fn next_impl(&mut self) -> Result<Option<Row>> {
loop {
if let Some(row) = self.get_row_from_range_scanner()? {
self.inc_last_count();
Expand Down Expand Up @@ -149,6 +150,19 @@ impl<S: Store, T: InnerExecutor> Executor for ScanExecutor<S, T> {
return Ok(None);
}
}
}

impl<C: ExecSummaryCollector, S: Store, T: InnerExecutor> Executor for ScanExecutor<C, S, T> {
fn next(&mut self) -> Result<Option<Row>> {
let timer = self.summary_collector.on_start_iterate();
let ret = self.next_impl();
if let Ok(Some(_)) = ret {
self.summary_collector.on_finish_iterate(timer, 1);
} else {
self.summary_collector.on_finish_iterate(timer, 0);
}
ret
}

fn collect_output_counts(&mut self, counts: &mut Vec<i64>) {
if let Some(cur_counts) = self.counts.as_mut() {
Expand All @@ -172,8 +186,8 @@ impl<S: Store, T: InnerExecutor> Executor for ScanExecutor<S, T> {
}
}

fn collect_execution_summaries(&mut self, _target: &mut [ExecSummary]) {
// Do nothing for now.
fn collect_execution_summaries(&mut self, target: &mut [ExecSummary]) {
self.summary_collector.collect_into(target);
}

fn get_len_of_columns(&self) -> usize {
Expand Down

0 comments on commit 9a5b08e

Please sign in to comment.