Skip to content

Commit

Permalink
FEAT(engine,runtime) nullable(typename) support related tensorbase#236
Browse files Browse the repository at this point in the history
Signed-off-by: pandaplusplus <pandaplusplus@gmail.com>
  • Loading branch information
pandaplusplus committed Aug 7, 2021
1 parent 141e384 commit 817cc17
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 65 deletions.
9 changes: 4 additions & 5 deletions crates/engine/src/datafusions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(crate) fn run(
} else {
&ptc
};
log::debug!("--- pc: {:?}", pc);
log::debug!("pc: {:?}", pc);
if (&cols).contains(pc) {
match ms.get_table_info_partition_keys_expr(tid)? {
Some(iv) if !tctx.where_str.is_empty() => {
Expand Down Expand Up @@ -122,7 +122,7 @@ pub(crate) fn run(
fields.push(Field::new(
cn,
btype_to_arrow_type(ci.data_type)?,
false,
ci.is_nullable,
));
} else {
return Err(EngineError::ColumnInfoNotExist);
Expand All @@ -148,14 +148,14 @@ pub(crate) fn run(
fields.push(Field::new(
cn.as_str(),
btype_to_arrow_type(ci.data_type)?,
false,
ci.is_nullable,
));
}
} else {
return Err(EngineError::UnsupportedQuery);
}
}
//log::debug!("[df][Schema] - fields: {:?}", fields);

let schema = Arc::new(Schema::new(fields));
let copasss = &mut qs.copasss;
let mut copass = Vec::new();
Expand All @@ -175,7 +175,6 @@ pub(crate) fn run(
let res: Vec<RecordBatch> = Vec::new();
return Ok(res);
}
// log::info!("query setup runtime(ms): {}", t.elapsed().as_millis());

let df = ctx.sql(raw_query)?;
let res = tokio::task::block_in_place(|| Handle::current().block_on(df.collect()));
Expand Down
69 changes: 34 additions & 35 deletions crates/engine/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::errs::EngineResult;
use crate::mysql::{col_to_bql_type, get_val_bytes_from_row};
use client::prelude::{Pool, ServerBlock};
use tokio::runtime::Handle;
use meta::types::BaseChunk;
use mysql::{prelude::Queryable, Pool as MyPool};
use tokio::runtime::Handle;

pub fn run(pool: &Pool, sql: &str) -> EngineResult<Vec<ServerBlock>> {
let res: EngineResult<_> = tokio::task::block_in_place(|| {
Expand All @@ -27,43 +27,42 @@ pub fn mysql_run(
sql: &str,
) -> EngineResult<(usize, usize, Vec<(Vec<u8>, BaseChunk)>)> {
let res: EngineResult<_> = tokio::task::block_in_place(|| {
Handle::current().block_on(
async move {
let mut conn = pool.get_conn()?;
let res = conn.query_iter(&sql)?;
let ncols = res.columns().as_ref().len();
let mut cols: Vec<(Vec<u8>, BaseChunk)> = vec![];
let mut nrows = 0;
Handle::current().block_on(async move {
let mut conn = pool.get_conn()?;
let res = conn.query_iter(&sql)?;
let ncols = res.columns().as_ref().len();
let mut cols: Vec<(Vec<u8>, BaseChunk)> = vec![];
let mut nrows = 0;

for c in res.columns().as_ref() {
let btype = col_to_bql_type(&c)?;
cols.push((
c.name_str().as_bytes().to_vec(),
BaseChunk {
btype,
size: 0,
data: vec![],
null_map: None,
offset_map: None,
lc_dict_data: None,
},
));
}
for c in res.columns().as_ref() {
let btype = col_to_bql_type(&c)?;
cols.push((
c.name_str().as_bytes().to_vec(),
BaseChunk {
btype,
size: 0,
data: vec![],
null_map: None,
offset_map: None,
lc_dict_data: None,
},
));
}

for row in res {
let r = row?;
for c in cols.iter_mut() {
c.1.data.extend(get_val_bytes_from_row(
&r,
&mut c.1.offset_map,
&mut c.1.size,
)?);
}
nrows += 1;
}
for row in res {
let r = row?;
for c in cols.iter_mut() {
c.1.data.extend(get_val_bytes_from_row(
&r,
&mut c.1.offset_map,
&mut c.1.size,
)?);
}
nrows += 1;
}

Ok((ncols, nrows, cols))
})
Ok((ncols, nrows, cols))
})
});

Ok(res?)
Expand Down
71 changes: 52 additions & 19 deletions crates/runtime/src/ch/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,29 +541,61 @@ impl TryFrom<RecordBatch> for Block {
&col.data().buffers()[0]
};
// log::debug!("cd.get_array_memory_size(): {}", cd.get_array_memory_size());
let (len_in_bytes, offsets) = if matches!(btype, BqlType::String) {
let arr = col
.as_any()
.downcast_ref::<array::LargeStringArray>()
.unwrap();
let ofs = arr
.value_offsets()
.last()
.copied()
.ok_or(BaseRtError::FailToUnwrapOpt)?;

(
ofs as usize,
Some(arr.value_offsets().iter().map(|o| *o as u32).collect()),
)
} else {
(btype.size_in_usize()? * col.len(), None)
};
let (len_in_bytes, offsets): (usize, Option<Vec<u32>>) =
if matches!(btype, BqlType::String) {
let arr = col
.as_any()
.downcast_ref::<array::LargeStringArray>()
.unwrap();
let ofs = arr
.value_offsets()
.last()
.copied()
.ok_or(BaseRtError::FailToUnwrapOpt)?;

(
ofs as usize,
Some(arr.value_offsets().iter().map(|o| *o as u32).collect()),
)
} else {
(btype.size_in_usize()? * col.len(), None)
};
let data = unsafe {
std::slice::from_raw_parts(buf.as_ptr(), len_in_bytes).to_vec()
};
blk.nrows = col.len(); //FIXME all rows are in same size

let null_map = if !matches!(btype, BqlType::String) {
// Todo: need to improve the performance
if fields[i].is_nullable() {
Some(
data.chunks(btype.size_in_usize()?)
.map(|c| if c.into_iter().all(|&b| b == 0) { 1 } else { 0 })
.collect::<Vec<_>>(),
)
} else {
None
}
} else {
if fields[i].is_nullable() {
let offsets = offsets.as_ref().unwrap();
let null_map = offsets[..(offsets.len() - 1)]
.iter()
.map(|offset| {
let offset = *offset as usize;
if data[offset..offset + 1][0] == 0 {
1
} else {
0
}
})
.collect::<Vec<_>>();
Some(null_map)
} else {
None
}
};

blk.columns.push(Column {
name,
data: BaseChunk {
Expand All @@ -572,7 +604,7 @@ impl TryFrom<RecordBatch> for Block {
// data: Vec::from_raw_parts(qcs.data, qclen_bytes, qclen_bytes),
// data: Vec::<u8>::with_capacity(qclen_bytes),
data,
null_map: None,
null_map,
offset_map: offsets,
// pub lc_dict_size: usize,
lc_dict_data: None,
Expand Down Expand Up @@ -605,6 +637,7 @@ impl BytesEncoder for Column {
bs.write_varbytes(
bytes_cat!(b"Nullable(", bs_nlltyp.as_slice(), b")").as_slice(),
);
bs.extend_from_slice(&self.data.null_map.as_ref().unwrap());
}
bs.extend_from_slice(&self.data.data);

Expand Down
10 changes: 8 additions & 2 deletions crates/runtime/src/mgmt.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use base::bytes_cat;
use base::{
codec::encode_ascii_bytes_vec_short,
datetimes::{parse_to_epoch, TimeZoneId},
Expand All @@ -10,13 +11,13 @@ use clap::{App, Arg};
use client::prelude::Pool;
use client::prelude::PoolBuilder;
use dashmap::DashMap;
use lang::parse::RemoteAddr;
use lang::parse::{
parse_command, parse_create_database, parse_create_table, parse_desc_table,
parse_drop_database, parse_drop_table, parse_insert_into, parse_optimize_table,
parse_show_create_table, parse_table_place, seek_to_sub_cmd, Pair, Rule,
TablePlaceKind, TablePlaceKindContext,
};
use lang::parse::RemoteAddr;
use meta::{
confs::Conf,
errs::MetaError,
Expand Down Expand Up @@ -564,7 +565,12 @@ impl<'a> BaseMgmtSys<'a> {
let mut name = Vec::with_capacity(len);
let mut dtype = Vec::with_capacity(len * 3);
for (name0, _, col_info) in col_infos.into_iter() {
let data = col_info.data_type.to_vec()?;
let mut data = col_info.data_type.to_vec()?;
data = if col_info.is_nullable {
bytes_cat!(b"Nullable(", &data, b")")
} else {
data
};
encode_ascii_bytes_vec_short(name0.as_bytes(), &mut name)?;
encode_ascii_bytes_vec_short(&data, &mut dtype)?;
}
Expand Down
6 changes: 2 additions & 4 deletions crates/runtime/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ pub fn query(
query_id.as_str(),
&mut qs,
)?;
log::debug!("query run time cost {:?}", timer.elapsed());

// log::debug!("res: {:?}", res);
// arrow::util::pretty::print_batches(&res)?;
log::debug!("query run time cost {:?}", timer.elapsed());

let mut blks = Vec::with_capacity(res.len());
for rb in res {
let blk = Block::try_from(rb)?;
log::debug!("blk: {:?}", blk);

blks.push(blk);
}

Expand Down

0 comments on commit 817cc17

Please sign in to comment.