Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Types for Batch Execution #3660

Merged
merged 32 commits into from Nov 17, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7a6f726
initial
breezewish Sep 30, 2018
1b6d343
some refinement
breezewish Sep 30, 2018
903b8d9
improve push_datum performance
breezewish Sep 30, 2018
c81cedb
compile pass
breezewish Oct 5, 2018
c1416a3
Fix unit tests
breezewish Oct 5, 2018
8a030a9
Merge remote-tracking branch 'origin/master' into _batch/2_infra_typedef
breezewish Oct 6, 2018
5e9b2e4
Merge branch 'master' into _batch/2_infra_typedef
breezewish Oct 8, 2018
a6e52dd
BatchExecution Primitive Types
breezewish Oct 10, 2018
4f91419
as_mut_accessor()
breezewish Oct 10, 2018
a8666c7
Merge commit '06b3adb7c6b0a4343200083f8887021f7c541936' into _batch/2…
breezewish Oct 10, 2018
cab910f
as_mut_accessor()
breezewish Oct 10, 2018
37184fc
utilize as_mut_accessor()
breezewish Oct 11, 2018
c281d1c
Merge remote-tracking branch 'origin/master' into _batch/2_infra_type…
breezewish Oct 11, 2018
260208b
Merge remote-tracking branch 'origin/master' into _batch/2_infra_typedef
breezewish Oct 12, 2018
962afcb
Merge branch '_batch/2_infra_typedef' into _batch/2_infra_typedef_and…
breezewish Oct 12, 2018
e86309a
Merge upstream
breezewish Oct 17, 2018
9349b19
fix build issues
breezewish Oct 17, 2018
17725e7
Merge branch '_batch/2_infra_typedef' into _batch/2_infra_typedef_and…
breezewish Oct 17, 2018
0e14b11
Merge remote-tracking branch 'origin/master' into _batch/2_infra_typedef
breezewish Nov 6, 2018
433db8d
Merge branch '_batch/2_infra_typedef' into _batch/2_infra_typedef_and…
breezewish Nov 6, 2018
d2a9179
Merge branch 'master' into _batch/2_infra_typedef
breezewish Nov 7, 2018
bab0766
Merge branch 'master' into _batch/2_infra_typedef
solotzg Nov 8, 2018
bfb80d0
Merge branch 'master' into _batch/2_infra_typedef
breezewish Nov 8, 2018
310bb7f
Merge remote-tracking branch 'breeswish/_batch/2_infra_typedef' into …
breezewish Nov 8, 2018
ec7846f
Merge remote-tracking branch 'origin/master' into _batch/2_infra_type…
breezewish Nov 8, 2018
4dabd23
Merge branch 'master' into _batch/2_infra_typedef_and_batch
breezewish Nov 12, 2018
6615999
Implement a Clone that preserves capacity
breezewish Nov 15, 2018
6e7a715
Address comments and add more tests
breezewish Nov 15, 2018
570c051
Merge branch 'master' into _batch/2_infra_typedef_and_batch
breezewish Nov 15, 2018
5c34e7b
Merge branch 'master' into _batch/2_infra_typedef_and_batch
breezewish Nov 16, 2018
1bb93c5
Merge branch 'master' into _batch/2_infra_typedef_and_batch
breezewish Nov 16, 2018
084cf16
Merge branch 'master' into _batch/2_infra_typedef_and_batch
solotzg Nov 17, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 29 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Expand Up @@ -78,9 +78,12 @@ crossbeam-channel = "0.2"
crossbeam = "0.2"
fxhash = "0.2"
derive_more = "0.11.0"
num = "0.2.0"
num = "0.2"
num-traits = "0.2"
hex = "0.3"
rust-crypto = "^0.2"
cop_datatype = { path = "components/cop_datatype" }
smallvec = { version = "0.6", features = ["union"] }

[replace]
"raft:0.3.1" = { git = "https://github.com/pingcap/raft-rs.git" }
Expand Down Expand Up @@ -130,6 +133,7 @@ members = [
"components/test_coprocessor",
"components/test_util",
"components/codec",
"components/cop_datatype",
]

[profile.dev]
Expand Down
2 changes: 2 additions & 0 deletions benches/benches.rs
Expand Up @@ -19,6 +19,7 @@ extern crate crossbeam_channel;
extern crate kvproto;
extern crate log;
extern crate mio;
extern crate num_traits;
extern crate protobuf;
extern crate raft;
extern crate rand;
Expand All @@ -27,6 +28,7 @@ extern crate tempdir;
extern crate test;
extern crate tipb;

extern crate cop_datatype;
extern crate test_storage;
extern crate test_util;
extern crate tikv;
Expand Down
47 changes: 25 additions & 22 deletions benches/coprocessor/codec/chunk/arrow.rs
Expand Up @@ -11,11 +11,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow::array;
use arrow::datatypes::{self, DataType, Field};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
use tikv::coprocessor::codec::mysql::types;

use cop_datatype::prelude::*;
use cop_datatype::{FieldTypeFlag, FieldTypeTp};
use tikv::coprocessor::codec::Datum;
use tipb::expression::FieldType;

Expand All @@ -24,21 +27,21 @@ pub struct Chunk {
}

impl Chunk {
pub fn get_datum(&self, col_id: usize, row_id: usize, tp: &FieldType) -> Datum {
pub fn get_datum(&self, col_id: usize, row_id: usize, field_type: &FieldType) -> Datum {
if let Some(bitmap) = self.data.column(col_id).validity_bitmap() {
if !bitmap.is_set(row_id) {
return Datum::Null;
}
}

match tp.get_tp() as u8 {
types::TINY
| types::SHORT
| types::INT24
| types::LONG
| types::LONG_LONG
| types::YEAR => {
if types::has_unsigned_flag(tp.get_flag()) {
match field_type.tp() {
FieldTypeTp::Tiny
| FieldTypeTp::Short
| FieldTypeTp::Int24
| FieldTypeTp::Long
| FieldTypeTp::LongLong
| FieldTypeTp::Year => {
if field_type.flag().contains(FieldTypeFlag::UNSIGNED) {
let data = self
.data
.column(col_id)
Expand All @@ -58,7 +61,7 @@ impl Chunk {
Datum::I64(*data.get(row_id))
}
}
types::FLOAT | types::DOUBLE => {
FieldTypeTp::Float | FieldTypeTp::Double => {
let data = self
.data
.column(col_id)
Expand Down Expand Up @@ -86,21 +89,21 @@ impl ChunkBuilder {
pub fn build(self, tps: &[FieldType]) -> Chunk {
let mut fields = Vec::with_capacity(tps.len());
let mut arrays: Vec<Arc<array::Array>> = Vec::with_capacity(tps.len());
for (tp, column) in tps.iter().zip(self.columns.into_iter()) {
let (field, data) = match tp.get_tp() as u8 {
types::TINY
| types::SHORT
| types::INT24
| types::LONG
| types::LONG_LONG
| types::YEAR => {
if types::has_unsigned_flag(tp.get_flag()) {
for (field_type, column) in tps.iter().zip(self.columns.into_iter()) {
let (field, data) = match field_type.tp() {
FieldTypeTp::Tiny
| FieldTypeTp::Short
| FieldTypeTp::Int24
| FieldTypeTp::Long
| FieldTypeTp::LongLong
| FieldTypeTp::Year => {
if field_type.flag().contains(FieldTypeFlag::UNSIGNED) {
column.into_u64_array()
} else {
column.into_i64_array()
}
}
types::FLOAT | types::DOUBLE => column.into_f64_array(),
FieldTypeTp::Float | FieldTypeTp::Double => column.into_f64_array(),
_ => unreachable!(),
};
fields.push(field);
Expand Down
37 changes: 25 additions & 12 deletions benches/coprocessor/codec/chunk/mod.rs
Expand Up @@ -15,28 +15,29 @@ mod arrow;

use test::Bencher;

use cop_datatype::{FieldTypeAccessor, FieldTypeTp};
use tipb::expression::FieldType;

use tikv::coprocessor::codec::chunk::{Chunk, ChunkEncoder};
use tikv::coprocessor::codec::datum::Datum;
use tikv::coprocessor::codec::mysql::*;

fn field_type(tp: u8) -> FieldType {
fn field_type(tp: FieldTypeTp) -> FieldType {
let mut fp = FieldType::new();
fp.set_tp(i32::from(tp));
fp.as_mut_accessor().set_tp(tp);
fp
}

#[bench]
fn bench_encode_chunk(b: &mut Bencher) {
let rows = 1024;
let fields = vec![
field_type(types::LONG_LONG),
field_type(types::LONG_LONG),
field_type(types::VARCHAR),
field_type(types::VARCHAR),
field_type(types::NEW_DECIMAL),
field_type(types::JSON),
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::VarChar),
field_type(FieldTypeTp::VarChar),
field_type(FieldTypeTp::NewDecimal),
field_type(FieldTypeTp::JSON),
];
let mut chunk = Chunk::new(&fields, rows);
for row_id in 0..rows {
Expand All @@ -61,7 +62,10 @@ fn bench_encode_chunk(b: &mut Bencher) {
#[bench]
fn bench_chunk_build_tidb(b: &mut Bencher) {
let rows = 1024;
let fields = vec![field_type(types::LONG_LONG), field_type(types::LONG_LONG)];
let fields = vec![
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::LongLong),
];

b.iter(|| {
let mut chunk = Chunk::new(&fields, rows);
Expand All @@ -75,7 +79,10 @@ fn bench_chunk_build_tidb(b: &mut Bencher) {
#[bench]
fn bench_chunk_build_offical(b: &mut Bencher) {
let rows = 1024;
let fields = vec![field_type(types::LONG_LONG), field_type(types::LONG_LONG)];
let fields = vec![
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::LongLong),
];

b.iter(|| {
let mut chunk = arrow::ChunkBuilder::new(fields.len(), rows);
Expand All @@ -90,7 +97,10 @@ fn bench_chunk_build_offical(b: &mut Bencher) {
#[bench]
fn bench_chunk_iter_tidb(b: &mut Bencher) {
let rows = 1024;
let fields = vec![field_type(types::LONG_LONG), field_type(types::DOUBLE)];
let fields = vec![
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::Double),
];
let mut chunk = Chunk::new(&fields, rows);
for row_id in 0..rows {
if row_id & 1 == 0 {
Expand Down Expand Up @@ -123,7 +133,10 @@ fn bench_chunk_iter_tidb(b: &mut Bencher) {
#[bench]
fn bench_chunk_iter_offical(b: &mut Bencher) {
let rows = 1024;
let fields = vec![field_type(types::LONG_LONG), field_type(types::DOUBLE)];
let fields = vec![
field_type(FieldTypeTp::LongLong),
field_type(FieldTypeTp::Double),
];
let mut chunk = arrow::ChunkBuilder::new(fields.len(), rows);
for row_id in 0..rows {
if row_id & 1 == 0 {
Expand Down
15 changes: 15 additions & 0 deletions components/cop_datatype/Cargo.toml
@@ -0,0 +1,15 @@
[package]
name = "cop_datatype"
version = "0.0.1"
publish = false
description = "TiKV Coprocessor data types"

[lib]
path = "lib.rs"

[dependencies]
tipb = { git = "https://github.com/pingcap/tipb.git" }
bitflags = "1.0"
enum-primitive-derive = "0.1"
num-traits = "0.2"
quick-error = "1.2"