Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coprocessor: v2 row format for decode #5725

Merged
merged 17 commits into from Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions components/tidb_query/src/codec/mod.rs
Expand Up @@ -22,6 +22,7 @@ pub mod error;
pub mod mysql;
mod overflow;
pub mod raw_datum;
pub mod row;
pub mod table;

pub use self::datum::Datum;
Expand Down
3 changes: 3 additions & 0 deletions components/tidb_query/src/codec/row.rs
@@ -0,0 +1,3 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

mod v2;
19 changes: 19 additions & 0 deletions components/tidb_query/src/codec/row/v2.rs
@@ -0,0 +1,19 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use bitflags::bitflags;

// Prior to v2, the first byte is not version code, but datum type.
// From v2, it's used for version code, and the value starts from 128, to be compatible.
pub const CODEC_VERSION: u8 = 128;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding a MAGIC would be better, since v2 conflicts to version: u8 = 128 semantically. By calling it a magic number it makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean MAGIC_CODE_VERSION?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes maybe.


bitflags! {
#[derive(Default)]
struct Flags: u8 {
const BIG = 1;
}
}

mod row_slice;

#[cfg(test)]
mod encoder;
273 changes: 273 additions & 0 deletions components/tidb_query/src/codec/row/v2/encoder.rs
@@ -0,0 +1,273 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

//! This `encoder` module is only used for test, so the implementation is very straightforward.
//!
//! According to https://github.com/pingcap/tidb/blob/master/docs/design/2018-07-19-row-format.md
//! The row format is:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we format this row with markdown?

//! | version | flag | number_of_non_null_values | number_of_null_values | non_null_value_ids
//! | null_value_ids | value_offsets | values
//! short spec of each field
//! version: 1 byte
//! flag: 1 byte, when there's id greater than 255 or the total size of the values is greater than 65535 , value is 1, otherwise 0
//! number of non-null values: 2 bytes
//! number of null values: 2 bytes
//! column ids: ids of non-null values + ids of null values, when flag == 1 (big), id is 4 bytes, otherwise 1 byte
//! non-null values offset: when big, offset is 4 bytes, otherwise 2 bytes

use crate::codec::{
data_type::ScalarValue,
mysql::{decimal::DecimalEncoder, json::JsonEncoder},
Error, Result,
};

use tidb_query_datatype::{FieldTypeAccessor, FieldTypeFlag};
use tipb::FieldType;

use codec::prelude::*;
use std::{i16, i32, i8, u16, u32, u8};

const MAX_I8: i64 = i8::MAX as i64;
const MIN_I8: i64 = i8::MIN as i64;
const MAX_I16: i64 = i16::MAX as i64;
const MIN_I16: i64 = i16::MIN as i64;
const MAX_I32: i64 = i32::MAX as i64;
const MIN_I32: i64 = i32::MIN as i64;

const MAX_U8: u64 = u8::MAX as u64;
const MAX_U16: u64 = u16::MAX as u64;
const MAX_U32: u64 = u32::MAX as u64;

pub struct Column {
id: i64,
value: ScalarValue,
ft: FieldType,
}

impl Column {
pub fn new(id: i64, value: impl Into<ScalarValue>) -> Self {
Column {
id,
ft: FieldType::default(),
value: value.into(),
}
}
pub fn new_unsigned(id: i64, value: impl Into<ScalarValue>) -> Self {
let mut ft = FieldType::default();
ft.as_mut_accessor().set_flag(FieldTypeFlag::UNSIGNED);
Column {
id,
ft,
value: value.into(),
}
}
}

pub trait RowEncoder: NumberEncoder {
fn write_row(&mut self, columns: Vec<Column>) -> Result<()> {
let mut is_big = false;
let mut null_ids = Vec::with_capacity(columns.len());
let mut non_null_ids = Vec::with_capacity(columns.len());
let mut non_null_cols = Vec::with_capacity(columns.len());

for col in columns {
if col.id > 255 {
is_big = true;
}

if col.value.is_none() {
null_ids.push(col.id);
} else {
non_null_cols.push(col);
}
}
non_null_cols.sort_by_key(|c| c.id);
null_ids.sort();

let mut offset_wtr = vec![];
let mut value_wtr = vec![];
let mut offsets = vec![];

for col in non_null_cols {
non_null_ids.push(col.id);
value_wtr.write_value(col)?;
offsets.push(value_wtr.len());
}
if value_wtr.len() > (u16::MAX as usize) {
is_big = true;
}

// encode begins
self.write_u8(super::CODEC_VERSION)?;
self.write_flag(is_big)?;
self.write_u16_le(non_null_ids.len() as u16)?;
self.write_u16_le(null_ids.len() as u16)?;

for id in non_null_ids {
self.write_id(is_big, id)?;
}
for id in null_ids {
self.write_id(is_big, id)?;
}
for offset in offsets {
offset_wtr.write_offset(is_big, offset)?;
}
self.write_bytes(&offset_wtr)?;
self.write_bytes(&value_wtr)?;
Ok(())
}

#[inline]
fn write_flag(&mut self, is_big: bool) -> codec::Result<()> {
let flag = if is_big {
super::Flags::BIG
} else {
super::Flags::default()
};
self.write_u8(flag.bits)
}

#[inline]
fn write_id(&mut self, is_big: bool, id: i64) -> codec::Result<()> {
if is_big {
self.write_u32_le(id as u32)
} else {
self.write_u8(id as u8)
}
}

#[inline]
fn write_offset(&mut self, is_big: bool, offset: usize) -> codec::Result<()> {
if is_big {
self.write_u32_le(offset as u32)
} else {
self.write_u16_le(offset as u16)
}
}
}

impl<T: BufferWriter> RowEncoder for T {}

trait DatumEncoder: NumberEncoder + DecimalEncoder + JsonEncoder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to implement it for the ScalarValue instead (since we are going to remove Datum very soon)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If using ScalarValue, how should I detect the Int is unsigned or not, by passing a ColumnInfo?
I saw somewhere was using ExprType of Expr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! You can pass in a FieldType to know it's signed / unsigned, as well as it's f64 or f32, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to ScalarValue, PTAL, thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the DatumEncoder to ScalarValueEncoder?

#[inline]
fn write_value(&mut self, col: Column) -> Result<()> {
match col.value {
ScalarValue::Int(Some(v)) if col.ft.is_unsigned() => {
self.encode_u64(v as u64).map_err(Error::from)
}
ScalarValue::Int(Some(v)) => self.encode_i64(v).map_err(Error::from),
ScalarValue::Decimal(Some(v)) => {
let (prec, frac) = v.prec_and_frac();
self.write_decimal(&v, prec, frac)?;
Ok(())
}
ScalarValue::Real(Some(v)) => self.encode_u64(v.to_bits()).map_err(Error::from),
ScalarValue::Bytes(Some(v)) => self.write_bytes(&v).map_err(Error::from),
ScalarValue::DateTime(Some(v)) => {
self.encode_u64(v.to_packed_u64()).map_err(Error::from)
}
ScalarValue::Duration(Some(v)) => self.encode_i64(v.to_nanos()).map_err(Error::from),
ScalarValue::Json(Some(v)) => self.write_json(&v),
_ => unreachable!(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe deserve to handle the NULL case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the NULL has been filtered, see L90.

}
}

#[allow(clippy::match_overlapping_arm)]
#[inline]
fn encode_i64(&mut self, v: i64) -> codec::Result<()> {
match v {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use a generic function call is_bounded_by to extract this kind of code. It is worth to have a try.

MIN_I8..=MAX_I8 => self.write_u8(v as i8 as u8),
MIN_I16..=MAX_I16 => self.write_i16_le(v as i16),
MIN_I32..=MAX_I32 => self.write_i32_le(v as i32),
_ => self.write_i64_le(v),
}
}

#[allow(clippy::match_overlapping_arm)]
#[inline]
fn encode_u64(&mut self, v: u64) -> codec::Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

match v {
0..=MAX_U8 => self.write_u8(v as u8),
0..=MAX_U16 => self.write_u16_le(v as u16),
0..=MAX_U32 => self.write_u32_le(v as u32),
_ => self.write_u64_le(v),
}
}
}
impl<T: BufferWriter> DatumEncoder for T {}

#[cfg(test)]
mod tests {
use super::{Column, RowEncoder};
use crate::codec::{
data_type::ScalarValue,
mysql::{duration::NANOS_PER_SEC, Decimal, Duration, Json, Time},
};
use std::str::FromStr;

#[test]
fn test_encode_unsigned() {
let cols = vec![
Column::new_unsigned(1, std::u64::MAX as i64),
Column::new(2, -1),
];
let exp: Vec<u8> = vec![
128, 0, 2, 0, 0, 0, 1, 2, 8, 0, 9, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255,
];
let mut buf = vec![];
buf.write_row(cols).unwrap();

assert_eq!(buf, exp);
}

#[test]
fn test_encode() {
let cols = vec![
Column::new(1, 1000),
Column::new(12, 2),
Column::new(33, ScalarValue::Int(None)),
Column::new_unsigned(3, 3),
Column::new(8, 32767),
Column::new(7, b"abc".to_vec()),
Column::new(9, 1.8),
Column::new(6, -1.8),
Column::new(
13,
Time::parse_utc_datetime("2018-01-19 03:14:07", 0).unwrap(),
),
Column::new(14, Decimal::from(1i64)),
Column::new(15, Json::from_str(r#"{"key":"value"}"#).unwrap()),
Column::new(16, Duration::from_nanos(NANOS_PER_SEC, 0).unwrap()),
];

let exp = vec![
128, 0, 11, 0, 1, 0, 1, 3, 6, 7, 8, 9, 12, 13, 14, 15, 16, 33, 2, 0, 3, 0, 11, 0, 14,
0, 16, 0, 24, 0, 25, 0, 33, 0, 36, 0, 65, 0, 69, 0, 232, 3, 3, 205, 204, 204, 204, 204,
204, 252, 191, 97, 98, 99, 255, 127, 205, 204, 204, 204, 204, 204, 252, 63, 2, 0, 0, 0,
135, 51, 230, 158, 25, 1, 0, 129, 1, 1, 0, 0, 0, 28, 0, 0, 0, 19, 0, 0, 0, 3, 0, 12,
22, 0, 0, 0, 107, 101, 121, 5, 118, 97, 108, 117, 101, 0, 202, 154, 59,
];
let mut buf = vec![];
buf.write_row(cols).unwrap();

assert_eq!(buf, exp);
}

#[test]
fn test_encode_big() {
let cols = vec![
Column::new(1, 1000),
Column::new(12, 2),
Column::new(335, ScalarValue::Int(None)),
Column::new(3, 3),
Column::new(8, 32767),
];
let exp = vec![
128, 1, 4, 0, 1, 0, 1, 0, 0, 0, 3, 0, 0, 0, 8, 0, 0, 0, 12, 0, 0, 0, 79, 1, 0, 0, 2, 0,
0, 0, 3, 0, 0, 0, 5, 0, 0, 0, 6, 0, 0, 0, 232, 3, 3, 255, 127, 2,
];
let mut buf = vec![];
buf.write_row(cols).unwrap();

assert_eq!(exp, buf);
}
}