-
Notifications
You must be signed in to change notification settings - Fork 115
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
22 changed files
with
655 additions
and
85 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,4 +29,3 @@ fn test_city_hash_128() { | |
let actual = city_hash_128("abc"); | ||
assert_eq!(expected, actual) | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
extern crate clickhouse_rs; | ||
extern crate futures; | ||
|
||
use clickhouse_rs::{types::Block, Pool}; | ||
use futures::Future; | ||
use std::env; | ||
|
||
fn main() { | ||
env::set_var("RUST_LOG", "clickhouse_rs=debug"); | ||
env_logger::init(); | ||
|
||
let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000".into()); | ||
let pool = Pool::new(database_url); | ||
|
||
let ddl = " | ||
CREATE TABLE array_table ( | ||
nums Array(UInt32), | ||
text Array(String) | ||
) Engine=Memory"; | ||
|
||
let query = "SELECT nums, text FROM array_table"; | ||
|
||
let block = Block::new() | ||
.add_column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]]) | ||
.add_column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]); | ||
|
||
let done = pool | ||
.get_handle() | ||
.and_then(move |c| c.execute("DROP TABLE IF EXISTS array_table")) | ||
.and_then(move |c| c.execute(ddl)) | ||
.and_then(move |c| c.insert("array_table", block)) | ||
.and_then(move |c| c.query(query).fetch_all()) | ||
.and_then(move |(_, block): (_, Block)| { | ||
for row in block.rows() { | ||
let nums: Vec<u32> = row.get("nums")?; | ||
let text: Vec<&str> = row.get("text")?; | ||
println!("{:?},\t{:?}", nums, text); | ||
} | ||
Ok(()) | ||
}) | ||
.map_err(|err| eprintln!("database error: {}", err)); | ||
|
||
tokio::run(done) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
use crate::{ | ||
binary::{Encoder, ReadEx}, | ||
errors::Error, | ||
types::{ | ||
column::{list::List, BoxColumnWrapper, ColumnData}, | ||
SqlType, Value, ValueRef, | ||
}, | ||
}; | ||
use chrono_tz::Tz; | ||
|
||
pub(crate) struct ArrayColumnData { | ||
pub(crate) inner: Box<dyn ColumnData + Send + Sync>, | ||
pub(crate) offsets: List<u64>, | ||
} | ||
|
||
impl ArrayColumnData { | ||
pub(crate) fn load<R: ReadEx>( | ||
reader: &mut R, | ||
type_name: &str, | ||
rows: usize, | ||
tz: Tz, | ||
) -> Result<Self, Error> { | ||
let mut offsets = List::with_capacity(rows); | ||
offsets.resize(rows, 0_u64); | ||
reader.read_bytes(offsets.as_mut())?; | ||
|
||
let size = match rows { | ||
0 => 0, | ||
_ => offsets.at(rows - 1) as usize, | ||
}; | ||
let inner = ColumnData::load_data::<BoxColumnWrapper, _>(reader, type_name, size, tz)?; | ||
|
||
Ok(ArrayColumnData { inner, offsets }) | ||
} | ||
} | ||
|
||
impl ColumnData for ArrayColumnData { | ||
fn sql_type(&self) -> SqlType { | ||
let inner_type = self.inner.sql_type(); | ||
SqlType::Array(inner_type.into()) | ||
} | ||
|
||
fn save(&self, encoder: &mut Encoder, start: usize, end: usize) { | ||
let mut offset = 0_u64; | ||
|
||
for i in start..end { | ||
offset = self.offsets.at(i); | ||
encoder.write(offset); | ||
} | ||
|
||
self.inner.save(encoder, 0, offset as usize); | ||
} | ||
|
||
fn len(&self) -> usize { | ||
self.offsets.len() | ||
} | ||
|
||
fn push(&mut self, value: Value) { | ||
if let Value::Array(_, vs) = value { | ||
let offsets_len = self.offsets.len(); | ||
let prev = if offsets_len == 0 { | ||
0_usize | ||
} else { | ||
self.offsets.at(offsets_len - 1) as usize | ||
}; | ||
|
||
self.offsets.push((prev + vs.len()) as u64); | ||
for v in vs { | ||
self.inner.push(v); | ||
} | ||
} else { | ||
panic!("value should be an array") | ||
} | ||
} | ||
|
||
fn at(&self, index: usize) -> ValueRef { | ||
let sql_type = self.inner.sql_type(); | ||
|
||
let start = if index > 0 { | ||
self.offsets.at(index - 1) as usize | ||
} else { | ||
0_usize | ||
}; | ||
let end = self.offsets.at(index) as usize; | ||
let mut vs = Vec::with_capacity(end); | ||
for i in start..end { | ||
let v = self.inner.at(i); | ||
vs.push(v); | ||
} | ||
ValueRef::Array(sql_type, vs) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use super::*; | ||
use crate::types::Block; | ||
use std::io::Cursor; | ||
|
||
#[test] | ||
fn test_write_and_read() { | ||
let block = Block::new().add_column( | ||
"vals", | ||
vec![vec![7_u32, 8], vec![9, 1, 2], vec![3, 4, 5, 6]], | ||
); | ||
|
||
let mut encoder = Encoder::new(); | ||
block.write(&mut encoder, false); | ||
|
||
let mut reader = Cursor::new(encoder.get_buffer_ref()); | ||
let rblock = Block::load(&mut reader, Tz::Zulu, false).unwrap(); | ||
|
||
assert_eq!(block, rblock); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.