diff --git a/crates/engine/src/datafusions.rs b/crates/engine/src/datafusions.rs index 7a0341a1..9409ff0b 100644 --- a/crates/engine/src/datafusions.rs +++ b/crates/engine/src/datafusions.rs @@ -80,7 +80,7 @@ pub(crate) fn run( } else { &ptc }; - log::debug!("--- pc: {:?}", pc); + log::debug!("pc: {:?}", pc); if (&cols).contains(pc) { match ms.get_table_info_partition_keys_expr(tid)? { Some(iv) if !tctx.where_str.is_empty() => { @@ -122,7 +122,7 @@ pub(crate) fn run( fields.push(Field::new( cn, btype_to_arrow_type(ci.data_type)?, - false, + ci.is_nullable, )); } else { return Err(EngineError::ColumnInfoNotExist); @@ -148,14 +148,14 @@ pub(crate) fn run( fields.push(Field::new( cn.as_str(), btype_to_arrow_type(ci.data_type)?, - false, + ci.is_nullable, )); } } else { return Err(EngineError::UnsupportedQuery); } } - //log::debug!("[df][Schema] - fields: {:?}", fields); + let schema = Arc::new(Schema::new(fields)); let copasss = &mut qs.copasss; let mut copass = Vec::new(); @@ -175,7 +175,6 @@ pub(crate) fn run( let res: Vec = Vec::new(); return Ok(res); } - // log::info!("query setup runtime(ms): {}", t.elapsed().as_millis()); let df = ctx.sql(raw_query)?; let res = tokio::task::block_in_place(|| Handle::current().block_on(df.collect())); diff --git a/crates/engine/src/remote.rs b/crates/engine/src/remote.rs index 22548e54..f998483d 100644 --- a/crates/engine/src/remote.rs +++ b/crates/engine/src/remote.rs @@ -1,9 +1,9 @@ use crate::errs::EngineResult; use crate::mysql::{col_to_bql_type, get_val_bytes_from_row}; use client::prelude::{Pool, ServerBlock}; -use tokio::runtime::Handle; use meta::types::BaseChunk; use mysql::{prelude::Queryable, Pool as MyPool}; +use tokio::runtime::Handle; pub fn run(pool: &Pool, sql: &str) -> EngineResult> { let res: EngineResult<_> = tokio::task::block_in_place(|| { @@ -27,43 +27,42 @@ pub fn mysql_run( sql: &str, ) -> EngineResult<(usize, usize, Vec<(Vec, BaseChunk)>)> { let res: EngineResult<_> = tokio::task::block_in_place(|| { - Handle::current().block_on( - async move { - let mut conn = pool.get_conn()?; - let res = conn.query_iter(&sql)?; - let ncols = res.columns().as_ref().len(); - let mut cols: Vec<(Vec, BaseChunk)> = vec![]; - let mut nrows = 0; + Handle::current().block_on(async move { + let mut conn = pool.get_conn()?; + let res = conn.query_iter(&sql)?; + let ncols = res.columns().as_ref().len(); + let mut cols: Vec<(Vec, BaseChunk)> = vec![]; + let mut nrows = 0; - for c in res.columns().as_ref() { - let btype = col_to_bql_type(&c)?; - cols.push(( - c.name_str().as_bytes().to_vec(), - BaseChunk { - btype, - size: 0, - data: vec![], - null_map: None, - offset_map: None, - lc_dict_data: None, - }, - )); - } + for c in res.columns().as_ref() { + let btype = col_to_bql_type(&c)?; + cols.push(( + c.name_str().as_bytes().to_vec(), + BaseChunk { + btype, + size: 0, + data: vec![], + null_map: None, + offset_map: None, + lc_dict_data: None, + }, + )); + } - for row in res { - let r = row?; - for c in cols.iter_mut() { - c.1.data.extend(get_val_bytes_from_row( - &r, - &mut c.1.offset_map, - &mut c.1.size, - )?); - } - nrows += 1; - } + for row in res { + let r = row?; + for c in cols.iter_mut() { + c.1.data.extend(get_val_bytes_from_row( + &r, + &mut c.1.offset_map, + &mut c.1.size, + )?); + } + nrows += 1; + } - Ok((ncols, nrows, cols)) - }) + Ok((ncols, nrows, cols)) + }) }); Ok(res?) diff --git a/crates/runtime/src/ch/blocks.rs b/crates/runtime/src/ch/blocks.rs index c5643d9c..0cef6ddd 100644 --- a/crates/runtime/src/ch/blocks.rs +++ b/crates/runtime/src/ch/blocks.rs @@ -541,29 +541,61 @@ impl TryFrom for Block { &col.data().buffers()[0] }; // log::debug!("cd.get_array_memory_size(): {}", cd.get_array_memory_size()); - let (len_in_bytes, offsets) = if matches!(btype, BqlType::String) { - let arr = col - .as_any() - .downcast_ref::() - .unwrap(); - let ofs = arr - .value_offsets() - .last() - .copied() - .ok_or(BaseRtError::FailToUnwrapOpt)?; - - ( - ofs as usize, - Some(arr.value_offsets().iter().map(|o| *o as u32).collect()), - ) - } else { - (btype.size_in_usize()? * col.len(), None) - }; + let (len_in_bytes, offsets): (usize, Option>) = + if matches!(btype, BqlType::String) { + let arr = col + .as_any() + .downcast_ref::() + .unwrap(); + let ofs = arr + .value_offsets() + .last() + .copied() + .ok_or(BaseRtError::FailToUnwrapOpt)?; + + ( + ofs as usize, + Some(arr.value_offsets().iter().map(|o| *o as u32).collect()), + ) + } else { + (btype.size_in_usize()? * col.len(), None) + }; let data = unsafe { std::slice::from_raw_parts(buf.as_ptr(), len_in_bytes).to_vec() }; blk.nrows = col.len(); //FIXME all rows are in same size + let null_map = if !matches!(btype, BqlType::String) { + // Todo: need to improve the performance + if fields[i].is_nullable() { + Some( + data.chunks(btype.size_in_usize()?) + .map(|c| if c.into_iter().all(|&b| b == 0) { 1 } else { 0 }) + .collect::>(), + ) + } else { + None + } + } else { + if fields[i].is_nullable() { + let offsets = offsets.as_ref().unwrap(); + let null_map = offsets[..(offsets.len() - 1)] + .iter() + .map(|offset| { + let offset = *offset as usize; + if data[offset..offset + 1][0] == 0 { + 1 + } else { + 0 + } + }) + .collect::>(); + Some(null_map) + } else { + None + } + }; + blk.columns.push(Column { name, data: BaseChunk { @@ -572,7 +604,7 @@ impl TryFrom for Block { // data: Vec::from_raw_parts(qcs.data, qclen_bytes, qclen_bytes), // data: Vec::::with_capacity(qclen_bytes), data, - null_map: None, + null_map, offset_map: offsets, // pub lc_dict_size: usize, lc_dict_data: None, @@ -605,6 +637,7 @@ impl BytesEncoder for Column { bs.write_varbytes( bytes_cat!(b"Nullable(", bs_nlltyp.as_slice(), b")").as_slice(), ); + bs.extend_from_slice(&self.data.null_map.as_ref().unwrap()); } bs.extend_from_slice(&self.data.data); diff --git a/crates/runtime/src/mgmt.rs b/crates/runtime/src/mgmt.rs index 412e856e..ccf0598f 100644 --- a/crates/runtime/src/mgmt.rs +++ b/crates/runtime/src/mgmt.rs @@ -1,3 +1,4 @@ +use base::bytes_cat; use base::{ codec::encode_ascii_bytes_vec_short, datetimes::{parse_to_epoch, TimeZoneId}, @@ -10,13 +11,13 @@ use clap::{App, Arg}; use client::prelude::Pool; use client::prelude::PoolBuilder; use dashmap::DashMap; +use lang::parse::RemoteAddr; use lang::parse::{ parse_command, parse_create_database, parse_create_table, parse_desc_table, parse_drop_database, parse_drop_table, parse_insert_into, parse_optimize_table, parse_show_create_table, parse_table_place, seek_to_sub_cmd, Pair, Rule, TablePlaceKind, TablePlaceKindContext, }; -use lang::parse::RemoteAddr; use meta::{ confs::Conf, errs::MetaError, @@ -564,7 +565,12 @@ impl<'a> BaseMgmtSys<'a> { let mut name = Vec::with_capacity(len); let mut dtype = Vec::with_capacity(len * 3); for (name0, _, col_info) in col_infos.into_iter() { - let data = col_info.data_type.to_vec()?; + let mut data = col_info.data_type.to_vec()?; + data = if col_info.is_nullable { + bytes_cat!(b"Nullable(", &data, b")") + } else { + data + }; encode_ascii_bytes_vec_short(name0.as_bytes(), &mut name)?; encode_ascii_bytes_vec_short(&data, &mut dtype)?; } diff --git a/crates/runtime/src/read.rs b/crates/runtime/src/read.rs index d2c1f191..f78abca6 100644 --- a/crates/runtime/src/read.rs +++ b/crates/runtime/src/read.rs @@ -33,15 +33,13 @@ pub fn query( query_id.as_str(), &mut qs, )?; - log::debug!("query run time cost {:?}", timer.elapsed()); - // log::debug!("res: {:?}", res); - // arrow::util::pretty::print_batches(&res)?; + log::debug!("query run time cost {:?}", timer.elapsed()); let mut blks = Vec::with_capacity(res.len()); for rb in res { let blk = Block::try_from(rb)?; - log::debug!("blk: {:?}", blk); + blks.push(blk); }