Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds BatchTableScanExecutor #4351

Merged
merged 20 commits into from Mar 21, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5e42ff6
Introduce context for AsMySQLBool and remove BatchExecutorContext
breezewish Mar 11, 2019
9828486
Extract table scanner
breezewish Mar 11, 2019
8db232b
Merge branch 'master' into ___batch_extract/table_scan
breezewish Mar 11, 2019
19c7c1a
Merge remote-tracking branch 'origin/master' into ___batch_extract/ta…
breezewish Mar 13, 2019
a95363a
Add new test cases from parent PR
breezewish Mar 13, 2019
2e37d05
Fix typo
breezewish Mar 13, 2019
e1b433f
Merge master and upstream change
breezewish Mar 18, 2019
7321cdd
Merge remote-tracking branch 'origin/master' into ___batch_extract/ta…
breezewish Mar 18, 2019
b22f1ee
Merge branch 'master' into ___batch_extract/table_scan
breezewish Mar 19, 2019
29a21f9
Address comments to remove some unnecessary code
breezewish Mar 19, 2019
1b3b854
Merge branch '___batch_extract/table_scan' of github.com:breeswish/ti…
breezewish Mar 19, 2019
d4c456b
Improve performance
breezewish Mar 19, 2019
7b805ab
Add test cases for lock scenarios
breezewish Mar 19, 2019
54c9d5e
Merge branch 'master' into ___batch_extract/table_scan
breezewish Mar 20, 2019
6d8daf9
Place truncate function inside LazyColumnVec
breezewish Mar 20, 2019
c30058a
Merge two cases together
breezewish Mar 20, 2019
b093664
Merge branch 'master' into ___batch_extract/table_scan
breezewish Mar 20, 2019
a491e00
Make truncate column more strict
breezewish Mar 21, 2019
2d0ac06
Add comments for warnings
breezewish Mar 21, 2019
74cfd46
Merge branch 'master' into ___batch_extract/table_scan
breezewish Mar 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/coprocessor/codec/batch/lazy_column_vec.rs
Expand Up @@ -61,7 +61,7 @@ impl LazyBatchColumnVec {
self.columns[i].append(&mut other[i]);
}

self.debug_assert_columns_equal_length();
self.assert_columns_equal_length();
}

/// Ensures that a column at specified `column_index` is decoded and returns a reference
Expand Down Expand Up @@ -105,14 +105,12 @@ impl LazyBatchColumnVec {
self.columns[0].len()
}

/// Debug asserts that all columns have equal length.
/// Asserts that all columns have equal length.
#[inline]
pub fn debug_assert_columns_equal_length(&self) {
if cfg!(debug_assertions) {
let len = self.rows_len();
for column in &self.columns {
debug_assert_eq!(len, column.len());
}
pub fn assert_columns_equal_length(&self) {
breezewish marked this conversation as resolved.
Show resolved Hide resolved
let len = self.rows_len();
for column in &self.columns {
assert_eq!(len, column.len());
}
}

Expand Down Expand Up @@ -145,7 +143,7 @@ impl LazyBatchColumnVec {
col.retain_by_index(&mut f);
}

self.debug_assert_columns_equal_length();
self.assert_columns_equal_length();
}

/// Returns maximum encoded size.
Expand Down Expand Up @@ -223,7 +221,7 @@ mod tests {
lazy_col.push_raw(raw_datum);
}

columns.debug_assert_columns_equal_length();
columns.assert_columns_equal_length();
}

/// Pushes a raw row via a datum vector.
Expand Down
8 changes: 8 additions & 0 deletions src/coprocessor/dag/batch_executor/mod.rs
Expand Up @@ -13,3 +13,11 @@

pub mod interface;
pub mod statistics;

mod ranges_iter;
mod scan_executor;
mod table_scan_executor;

pub mod executors {
pub use super::table_scan_executor::BatchTableScanExecutor;
}
234 changes: 234 additions & 0 deletions src/coprocessor/dag/batch_executor/ranges_iter.rs
@@ -0,0 +1,234 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Needed because we have not introduced IndexScan so far and not all features of this module is
// used.
#![allow(unused)]
use kvproto::coprocessor::KeyRange;

#[derive(PartialEq, Clone, Debug)]
pub enum IterStatus {
/// All ranges are consumed.
Drained,

/// Last range is drained or this iteration is a fresh start so that caller should scan
/// on a new range.
NewNonPointRange(KeyRange),

/// Similar to `NewNonPointRange`, but the new range is a point range, whose key will be placed
/// in `start_key`. The content of other fields should be discarded.
NewPointRange(KeyRange),

/// Last non-point range is not drained and the caller should continue scanning without changing
/// the scan range.
Continue,
}

/// A trait that determines how to deal with possible point ranges.
pub trait PointRangePolicy: Send + Sync + 'static {
fn is_point_range(&self, range: &KeyRange) -> bool;
}

/// A policy that respects point ranges.
///
/// Notice that this exists because currently we use prefix_next() result as end_key to mark a range
/// as point range. Once we have other clearer notation, e.g. a flag, we won't need these policy any
/// more.
pub struct PointRangeEnable;

/// A policy that conditional respects point ranges.
///
/// TODO: Maybe better to be `PointRangeDisable`.
pub struct PointRangeConditional(bool);

impl PointRangeConditional {
pub fn new(enable: bool) -> Self {
PointRangeConditional(enable)
}
}

impl PointRangePolicy for PointRangeConditional {
#[inline]
fn is_point_range(&self, range: &KeyRange) -> bool {
crate::coprocessor::util::is_point(range) && self.0
}
}

impl PointRangePolicy for PointRangeEnable {
#[inline]
fn is_point_range(&self, range: &KeyRange) -> bool {
crate::coprocessor::util::is_point(range)
}
}

/// An iterator like structure that produces user key ranges.
///
/// For each `next()`, it produces one of the following:
/// - a new non-point range
/// - a new point range
/// - a flag indicating continuing last non-point range
/// - a flag indicating that all ranges are consumed
///
/// If a new non-point or point range is returned, caller can then scan unknown amount of key(s)
/// within this new range. The caller must inform the structure so that it will emit a new range
/// next time by calling `notify_drained()` after current non-point range is drained. Point range is
/// regarded as drained automatically after calling `next()`. Multiple `notify_drained()` without
/// `next()` will have no effect.
pub struct RangesIterator<T: PointRangePolicy> {
/// Whether or not we are processing a valid range. If we are not processing a range, or there
/// is no range any more, this field is `false`.
in_range: bool,

iter: std::vec::IntoIter<KeyRange>,

policy: T,
}

impl<T: PointRangePolicy> RangesIterator<T> {
#[inline]
pub fn new(user_key_ranges: Vec<KeyRange>, policy: T) -> Self {
Self {
in_range: false,
iter: user_key_ranges.into_iter(),
policy,
}
}

/// Continues iterating.
#[inline]
pub fn next(&mut self) -> IterStatus {
if self.in_range {
return IterStatus::Continue;
}
match self.iter.next() {
None => IterStatus::Drained,
Some(range) => {
if self.policy.is_point_range(&range) {
// No need to set `in_range = true` because point range always drains.
IterStatus::NewPointRange(range)
} else {
self.in_range = true;
IterStatus::NewNonPointRange(range)
}
}
}
}

/// Notifies that current range is drained.
#[inline]
pub fn notify_drained(&mut self) {
self.in_range = false;
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::sync::atomic;

static RANGE_INDEX: atomic::AtomicU64 = atomic::AtomicU64::new(1);

fn new_point() -> KeyRange {
use byteorder::{BigEndian, WriteBytesExt};

let v = RANGE_INDEX.fetch_add(1, atomic::Ordering::SeqCst);
let mut r = KeyRange::new();
r.mut_start().write_u64::<BigEndian>(v).unwrap();
// Enough to pass `util::is_point`.
r.mut_end().write_u64::<BigEndian>(v + 1).unwrap();
r
}

fn new_range() -> KeyRange {
use byteorder::{BigEndian, WriteBytesExt};

let v = RANGE_INDEX.fetch_add(2, atomic::Ordering::SeqCst);
let mut r = KeyRange::new();
r.mut_start().write_u64::<BigEndian>(v).unwrap();
// Enough to not pass `util::is_point`.
r.mut_end().write_u64::<BigEndian>(v + 2).unwrap();
r
}

#[test]
fn test_basic() {
// Empty
let mut c = RangesIterator::new(vec![], PointRangeEnable);
assert_eq!(c.next(), IterStatus::Drained);
assert_eq!(c.next(), IterStatus::Drained);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);
assert_eq!(c.next(), IterStatus::Drained);

// Pure non-point range
let ranges = vec![new_range(), new_range(), new_range()];
let mut c = RangesIterator::new(ranges.clone(), PointRangeEnable);
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[0].clone()));
assert_eq!(c.next(), IterStatus::Continue);
assert_eq!(c.next(), IterStatus::Continue);
c.notify_drained();
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[1].clone()));
assert_eq!(c.next(), IterStatus::Continue);
assert_eq!(c.next(), IterStatus::Continue);
c.notify_drained();
c.notify_drained(); // multiple consumes will not take effect
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[2].clone()));
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);

// Pure point range
let ranges = vec![new_point(), new_point(), new_point()];
let mut c = RangesIterator::new(ranges.clone(), PointRangeEnable);
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[0].clone()));
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[1].clone()));
c.notify_drained(); // no effect
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[2].clone()));
assert_eq!(c.next(), IterStatus::Drained);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);

// Mixed
let ranges = vec![new_point(), new_range(), new_point()];
let mut c = RangesIterator::new(ranges.clone(), PointRangeEnable);
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[0].clone()));
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[1].clone()));
assert_eq!(c.next(), IterStatus::Continue);
assert_eq!(c.next(), IterStatus::Continue);
c.notify_drained();
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[2].clone()));
assert_eq!(c.next(), IterStatus::Drained);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);

// Mixed
let ranges = vec![new_range(), new_point(), new_range(), new_range()];
let mut c = RangesIterator::new(ranges.clone(), PointRangeEnable);
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[0].clone()));
assert_eq!(c.next(), IterStatus::Continue);
assert_eq!(c.next(), IterStatus::Continue);
c.notify_drained();
assert_eq!(c.next(), IterStatus::NewPointRange(ranges[1].clone()));
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[2].clone()));
c.notify_drained();
assert_eq!(c.next(), IterStatus::NewNonPointRange(ranges[3].clone()));
assert_eq!(c.next(), IterStatus::Continue);
assert_eq!(c.next(), IterStatus::Continue);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);
c.notify_drained();
assert_eq!(c.next(), IterStatus::Drained);
}
}