Skip to content

Commit

Permalink
use vec executor instead of normal one
Browse files Browse the repository at this point in the history
Signed-off-by: Renkai <gaelookair@gmail.com>
  • Loading branch information
Renkai committed Jul 1, 2020
1 parent 860d25b commit ab7e20c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 7 deletions.
3 changes: 3 additions & 0 deletions components/tidb_query_datatype/src/codec/row/v2/compat_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ pub trait V1CompatibleEncoder: DatumFlagAndPayloadEncoder {
FieldTypeTp::Null => {
self.write_u8(datum::NIL_FLAG)?;
}
FieldTypeTp::Unspecified => {
self.write_bytes(src)?;
}
fp => {
return Err(Error::InvalidDataType(format!(
"Unsupported FieldType {:?}",
Expand Down
47 changes: 47 additions & 0 deletions components/tidb_query_vec_executors/src/index_scan_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,53 @@ impl<S: Storage> BatchIndexScanExecutor<S> {
})?;
Ok(Self(wrapper))
}

pub fn new_for_analyze(
storage: S,
config: Arc<EvalConfig>,
cols_len: usize,
key_ranges: Vec<KeyRange>,
is_backward: bool,
unique: bool,
) -> Result<Self> {
// Note 1: `unique = true` doesn't completely mean that it is a unique index scan. Instead
// it just means that we can use point-get for this index. In the following scenarios
// `unique` will be `false`:
// - scan from a non-unique index
// - scan from a unique index with like: where unique-index like xxx
//
// Note 2: Unlike table scan executor, the accepted `columns_info` of index scan executor is
// strictly stipulated. The order of columns in the schema must be the same as index data
// stored and if PK handle is needed it must be placed as the last one.
//
// Note 3: Currently TiDB may send multiple PK handles to TiKV (but only the last one is
// real). We accept this kind of request for compatibility considerations, but will be
// forbidden soon.
let decode_handle = false;
let schema: Vec<_> = (0..cols_len)
.into_iter()
.map(|_| field_type_with_unspecified_tp())
.collect();

let columns_id_without_handle: Vec<_> =
(0..cols_len).into_iter().map(|x| x as i64).collect();

let imp = IndexScanExecutorImpl {
context: EvalContext::new(config),
schema,
columns_id_without_handle,
decode_handle,
};
let wrapper = ScanExecutor::new(ScanExecutorOptions {
imp,
storage,
key_ranges,
is_backward,
is_key_only: false,
accept_point_range: unique,
})?;
Ok(Self(wrapper))
}
}

impl<S: Storage> BatchExecutor for BatchIndexScanExecutor<S> {
Expand Down
7 changes: 7 additions & 0 deletions components/tidb_query_vec_executors/src/util/scan_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tidb_query_common::storage::{IntervalRange, Range, Storage};
use tidb_query_common::Result;
use tidb_query_datatype::codec::batch::LazyBatchColumnVec;
use tidb_query_datatype::expr::EvalContext;
use tidb_query_datatype::FieldTypeTp;

/// Common interfaces for table scan and index scan implementations.
pub trait ScanExecutorImpl: Send {
Expand Down Expand Up @@ -137,6 +138,12 @@ pub fn field_type_from_column_info(ci: &ColumnInfo) -> FieldType {
field_type
}

pub fn field_type_with_unspecified_tp() -> FieldType {
let mut field_type = FieldType::default();
field_type.set_tp(FieldTypeTp::Unspecified as i32);
field_type
}

/// Checks whether the given columns info are supported.
pub fn check_columns_info_supported(columns_info: &[ColumnInfo]) -> Result<()> {
use std::convert::TryFrom;
Expand Down
62 changes: 55 additions & 7 deletions src/coprocessor/statistics/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use protobuf::Message;
use rand::rngs::ThreadRng;
use rand::{thread_rng, Rng};
use tidb_query_datatype::codec::datum;
use tidb_query_datatype::expr::EvalContext;
use tidb_query_datatype::expr::{EvalConfig, EvalContext};
use tidb_query_normal_executors::{Executor, IndexScanExecutor, ScanExecutor, TableScanExecutor};
use tipb::{self, AnalyzeColumnsReq, AnalyzeIndexReq, AnalyzeReq, AnalyzeType, TableScan};

Expand All @@ -18,6 +18,9 @@ use super::histogram::Histogram;
use crate::coprocessor::dag::TiKVStorage;
use crate::coprocessor::*;
use crate::storage::{Snapshot, SnapshotStore, Statistics};
use std::sync::Arc;
use tidb_query_vec_executors::interface::BatchExecutor;
use tidb_query_vec_executors::BatchIndexScanExecutor;

// `AnalyzeContext` is used to handle `AnalyzeReq`
pub struct AnalyzeContext<S: Snapshot> {
Expand Down Expand Up @@ -70,8 +73,10 @@ impl<S: Snapshot> AnalyzeContext<S> {
Ok(res_data)
}

//TODO remove fn before merge
// handle_index is used to handle `AnalyzeIndexReq`,
// it would build a histogram and count-min sketch of index values.
#[allow(unused)]
fn handle_index(
req: AnalyzeIndexReq,
scanner: &mut IndexScanExecutor<TiKVStorage<SnapshotStore<S>>>,
Expand Down Expand Up @@ -99,6 +104,48 @@ impl<S: Snapshot> AnalyzeContext<S> {
let dt = box_try!(res.write_to_bytes());
Ok(dt)
}

// handle_index is used to handle `AnalyzeIndexReq`,
// it would build a histogram and count-min sketch of index values.
fn batch_handle_index(
req: AnalyzeIndexReq,
scanner: &mut BatchIndexScanExecutor<TiKVStorage<SnapshotStore<S>>>,
) -> Result<Vec<u8>> {
let mut hist = Histogram::new(req.get_bucket_size() as usize);
let mut cms = CmSketch::new(
req.get_cmsketch_depth() as usize,
req.get_cmsketch_width() as usize,
);

let mut is_drained = false;
while !is_drained {
use std::ops::Index;

let batch = scanner.next_batch(100); //TODO use a more proper value
is_drained = batch.is_drained?;

for logical_row in batch.logical_rows {
let mut bytes = vec![];
for col in batch.physical_columns.as_slice() {
let buffer_vec = col.raw();
let data = buffer_vec.index(logical_row);
bytes.extend_from_slice(data);
if let Some(c) = cms.as_mut() {
c.insert(data);
}
}
hist.append(&bytes);
}
}

let mut res = tipb::AnalyzeIndexResp::default();
res.set_hist(hist.into_proto());
if let Some(c) = cms {
res.set_cms(c.into_proto());
}
let dt = box_try!(res.write_to_bytes());
Ok(dt)
}
}

#[async_trait]
Expand All @@ -107,17 +154,18 @@ impl<S: Snapshot> RequestHandler for AnalyzeContext<S> {
let ret = match self.req.get_tp() {
AnalyzeType::TypeIndex => {
let req = self.req.take_idx_req();
let mut scanner = ScanExecutor::index_scan_with_cols_len(
EvalContext::default(),
i64::from(req.get_num_columns()),
mem::replace(&mut self.ranges, Vec::new()),
let mut scanner = BatchIndexScanExecutor::new_for_analyze(
self.storage.take().unwrap(),
Arc::new(EvalConfig::default()),
req.get_num_columns() as usize,
mem::replace(&mut self.ranges, Vec::new()),
false,
false,
)?;
let res = AnalyzeContext::handle_index(req, &mut scanner);
let res = AnalyzeContext::batch_handle_index(req, &mut scanner);
scanner.collect_storage_stats(&mut self.storage_stats);
res
}

AnalyzeType::TypeColumn => {
let col_req = self.req.take_col_req();
let storage = self.storage.take().unwrap();
Expand Down

0 comments on commit ab7e20c

Please sign in to comment.