Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cop: Handle requests with multi handle column correctly #5180

Merged
merged 12 commits into from Aug 7, 2019
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/tidb_query/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ time = "0.1"
quick-error = "1.2.2"
bitflags = "1.0.1"
derive_more = "0.11.0"
smallvec = "0.6"
indexmap = { version = "1.0", features = ["serde-1"] }
safemem = { version = "0.3", default-features = false }
flate2 = { version = "1.0", features = ["zlib"], default-features = false }
Expand Down
155 changes: 122 additions & 33 deletions components/tidb_query/src/batch/executors/table_scan_executor.rs
@@ -1,5 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use smallvec::SmallVec;
use std::sync::Arc;

use kvproto::coprocessor::KeyRange;
Expand All @@ -18,6 +19,8 @@ use crate::Result;

pub struct BatchTableScanExecutor<S: Storage>(ScanExecutor<S, TableScanExecutorImpl>);

type HandleIndicesVec = SmallVec<[usize; 2]>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the number 2 come from? Does TiDB pass at most 2 PK handles now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means if there are no more than 2 items in the vec, it will alloc the array on stack; otherwise on heap, like how Vec does. PKs passed from TiDB is excepted to be few, accroding to @breeswish , so I think it's ok to set it 2.


// We assign a dummy type `Box<dyn Storage<Statistics = ()>>` so that we can omit the type
// when calling `check_supported`.
impl BatchTableScanExecutor<Box<dyn Storage<Statistics = ()>>> {
Expand All @@ -38,7 +41,7 @@ impl<S: Storage> BatchTableScanExecutor<S> {
) -> Result<Self> {
let is_column_filled = vec![false; columns_info.len()];
let mut is_key_only = true;
let mut handle_index = None;
let mut handle_indices = HandleIndicesVec::new();
let mut schema = Vec::with_capacity(columns_info.len());
let mut columns_default_value = Vec::with_capacity(columns_info.len());
let mut column_id_index = HashMap::default();
Expand All @@ -51,10 +54,10 @@ impl<S: Storage> BatchTableScanExecutor<S> {
// - Prepare column default value (will be used to fill missing column later).
columns_default_value.push(ci.take_default_val());

// - Store the index of the PK handle.
// - Store the index of the PK handles.
// - Check whether or not we don't need KV values (iff PK handle is given).
if ci.get_pk_handle() {
handle_index = Some(index);
handle_indices.push(index);
} else {
is_key_only = false;
column_id_index.insert(ci.get_column_id(), index);
Expand All @@ -69,7 +72,7 @@ impl<S: Storage> BatchTableScanExecutor<S> {
schema,
columns_default_value,
column_id_index,
handle_index,
handle_indices,
is_column_filled,
};
let wrapper = ScanExecutor::new(ScanExecutorOptions {
Expand Down Expand Up @@ -124,8 +127,8 @@ struct TableScanExecutorImpl {
/// The output position in the schema giving the column id.
column_id_index: HashMap<i64, usize>,

/// The index in output row to put the handle.
handle_index: Option<usize>,
/// Vec of indices in output row to put the handle. The indices must be sorted in the vec.
handle_indices: HandleIndicesVec,

/// A vector of flags indicating whether corresponding column is filled in `next_batch`.
/// It is a struct level field in order to prevent repeated memory allocations since its length
Expand All @@ -149,32 +152,40 @@ impl ScanExecutorImpl for TableScanExecutorImpl {
let columns_len = self.schema.len();
let mut columns = Vec::with_capacity(columns_len);

if let Some(handle_index) = self.handle_index {
// PK is specified in schema. PK column should be decoded and the rest is in raw format
// like this:
// non-pk non-pk non-pk pk non-pk non-pk non-pk
// ^handle_index = 3
// ^columns_len = 7

// Columns before `handle_index` (if any) should be raw.
for _ in 0..handle_index {
// If there are any PK columns, for each of them, fill non-PK columns before it and push the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the following code instead of L155-191

        let mut last_handle_index = 0usize;
        for id in 0..columns_len {
            let is_handle = self
                .handle_indices
                .get(last_handle_index)
                .map_or(false, |handle_idx| *handle_idx == id);
            if is_handle {
                last_handle_index += 1;
                columns.push(LazyBatchColumn::decoded_with_capacity_and_tp(
                    scan_rows,
                    EvalType::Int,
                ));
            } else {
                columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as my first idea. It needs an indexing to the vector and a if for each column. You can see that the old code is trying to avoid it, although I don't think it matters much. @breeswish what's your idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code is branch-less in each iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@breeswish Yes that's what I considered in my new code. My question is, do you prefer @AndreMouche 's simlpler, more readable code with branches in each iteration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinions 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndreMouche Let's keep the current code? since the PR has been approved.

// PK column.
// For example, consider:
// non-pk non-pk non-pk pk non-pk non-pk pk pk non-pk non-pk
// handle_indices: ^3 ^6 ^7
// Each turn of the following loop will push this to `columns`:
// 1st turn: [non-pk, non-pk, non-pk, pk]
// 2nd turn: [non-pk, non-pk, pk]
// 3rd turn: [pk]
let mut last_index = 0usize;
for handle_index in &self.handle_indices {
// `handle_indices` is expected to be sorted.
assert!(*handle_index >= last_index);

// Fill last `handle_index - 1` columns.
for _ in last_index..*handle_index {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}
// For PK handle, we construct a decoded `VectorValue` because it is directly

// For PK handles, we construct a decoded `VectorValue` because it is directly
// stored as i64, without a datum flag, at the end of key.
columns.push(LazyBatchColumn::decoded_with_capacity_and_tp(
scan_rows,
EvalType::Int,
));
// Columns after `handle_index` (if any) should also be raw.
for _ in handle_index + 1..columns_len {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}
} else {
// PK is unspecified in schema. All column should be in raw format.
for _ in 0..columns_len {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}

last_index = *handle_index + 1;
}

// Then fill remaining columns after the last handle column. If there are no PK columns,
// the previous loop will be skipped and this loop will be run on 0..columns_len.
// For the example above, this loop will push: [non-pk, non-pk]
for _ in last_index..columns_len {
columns.push(LazyBatchColumn::raw_with_capacity(scan_rows));
}

assert_eq!(columns.len(), columns_len);
Expand All @@ -193,15 +204,17 @@ impl ScanExecutorImpl for TableScanExecutorImpl {
let columns_len = self.schema.len();
let mut decoded_columns = 0;

if let Some(handle_index) = self.handle_index {
if !self.handle_indices.is_empty() {
let handle_id = table::decode_handle(key)?;
// TODO: We should avoid calling `push_int` repeatedly. Instead we should specialize
// a `&mut Vec` first. However it is hard to program due to lifetime restriction.
columns[handle_index]
.mut_decoded()
.push_int(Some(handle_id));
decoded_columns += 1;
self.is_column_filled[handle_index] = true;
for handle_index in &self.handle_indices {
// TODO: We should avoid calling `push_int` repeatedly. Instead we should specialize
// a `&mut Vec` first. However it is hard to program due to lifetime restriction.
columns[*handle_index]
.mut_decoded()
.push_int(Some(handle_id));
decoded_columns += 1;
self.is_column_filled[*handle_index] = true;
}
}

if value.is_empty() || (value.len() == 1 && value[0] == datum::NIL_FLAG) {
Expand Down Expand Up @@ -284,6 +297,7 @@ impl ScanExecutorImpl for TableScanExecutorImpl {
mod tests {
use super::*;

use std::iter;
use std::sync::Arc;

use kvproto::coprocessor::KeyRange;
Expand Down Expand Up @@ -994,4 +1008,79 @@ mod tests {
assert_eq!(result.physical_columns.rows_len(), 0);
}
}

fn test_multi_handle_column_impl(columns_is_pk: &[bool]) {
const TABLE_ID: i64 = 42;

// This test makes a pk column with id = 1 and non-pk columns with id
// in 10 to 10 + columns_is_pk.len().
// PK columns will be set to column 1 and others will be set to column 10 + i, where i is
// the index of each column.

let mut columns_info = Vec::new();
for (i, is_pk) in columns_is_pk.iter().enumerate() {
let mut ci = ColumnInfo::default();
ci.as_mut_accessor().set_tp(FieldTypeTp::LongLong);
ci.set_pk_handle(*is_pk);
ci.set_column_id(if *is_pk { 1 } else { i as i64 + 10 });
columns_info.push(ci);
}

let mut schema = Vec::new();
schema.resize(columns_is_pk.len(), FieldTypeTp::LongLong.into());

let key = table::encode_row_key(TABLE_ID, 1);
let col_ids = (10..10 + schema.len() as i64).collect::<Vec<_>>();
let row = col_ids.iter().map(|i| Datum::I64(*i)).collect();
let value = table::encode_row(row, &col_ids).unwrap();

let mut key_range = KeyRange::default();
key_range.set_start(table::encode_row_key(TABLE_ID, std::i64::MIN));
key_range.set_end(table::encode_row_key(TABLE_ID, std::i64::MAX));

let store = FixtureStorage::new(iter::once((key, (Ok(value)))).collect());

let mut executor = BatchTableScanExecutor::new(
store.clone(),
Arc::new(EvalConfig::default()),
columns_info,
vec![key_range],
false,
)
.unwrap();

let mut result = executor.next_batch(10);
assert_eq!(result.is_drained.unwrap(), true);
assert_eq!(result.logical_rows.len(), 1);
assert_eq!(result.physical_columns.columns_len(), columns_is_pk.len());
for i in 0..columns_is_pk.len() {
result.physical_columns[i]
.ensure_all_decoded(&Tz::utc(), &schema[i])
.unwrap();
if columns_is_pk[i] {
assert_eq!(
result.physical_columns[i].decoded().as_int_slice(),
&[Some(1)]
);
} else {
assert_eq!(
result.physical_columns[i].decoded().as_int_slice(),
&[Some(i as i64 + 10)]
);
}
}
}

#[test]
fn test_multi_handle_column() {
test_multi_handle_column_impl(&[true]);
test_multi_handle_column_impl(&[false]);
test_multi_handle_column_impl(&[true, false]);
test_multi_handle_column_impl(&[false, true]);
test_multi_handle_column_impl(&[true, true]);
test_multi_handle_column_impl(&[true, false, true]);
test_multi_handle_column_impl(&[
false, false, false, true, false, false, true, true, false, false,
]);
}
}