Skip to content

Commit

Permalink
add array support
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Jul 16, 2019
1 parent 16b8175 commit 32386ec
Show file tree
Hide file tree
Showing 22 changed files with 658 additions and 88 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ exclude = ["test/*"]

[dependencies]
log = "0.4.6"
futures = "0.1.26"
tokio = "0.1.18"
tokio-timer = "0.2.10"
futures = "0.1.28"
tokio = "0.1.22"
tokio-timer = "0.2.11"
hostname = "^0.1"

chrono = "0.4"
Expand All @@ -33,5 +33,5 @@ url="^1.7"
lazy_static = "1.3.0"

[dev-dependencies]
env_logger = "0.6.0"
rand = "0.6.4"
env_logger = "0.6.2"
rand = "0.7.0"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ clickhouse-rs = "*"
* String, FixedString(N)
* UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
* Nullable(T)
* Array(UInt/Int/String/Date/DateTime)

## DNS

Expand Down
7 changes: 2 additions & 5 deletions clickhouse-rs-cityhash-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ extern crate cc;

fn main() {
let mut compiler = cc::Build::new();
compiler
.file("src/cc/city.cc")
.cpp(true)
.opt_level(3);
compiler.file("src/cc/city.cc").cpp(true).opt_level(3);

compiler.compile("libchcityhash.a");
}
}
1 change: 0 additions & 1 deletion clickhouse-rs-cityhash-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ fn test_city_hash_128() {
let actual = city_hash_128("abc");
assert_eq!(expected, actual)
}

44 changes: 44 additions & 0 deletions examples/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
extern crate clickhouse_rs;
extern crate futures;

use clickhouse_rs::{types::Block, Pool};
use futures::Future;
use std::env;

fn main() {
env::set_var("RUST_LOG", "clickhouse_rs=debug");
env_logger::init();

let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000".into());
let pool = Pool::new(database_url);

let ddl = "
CREATE TABLE array_table (
nums Array(UInt32),
text Array(String)
) Engine=Memory";

let query = "SELECT nums, text FROM array_table";

let block = Block::new()
.add_column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]])
.add_column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]);

let done = pool
.get_handle()
.and_then(move |c| c.execute("DROP TABLE IF EXISTS array_table"))
.and_then(move |c| c.execute(ddl))
.and_then(move |c| c.insert("array_table", block))
.and_then(move |c| c.query(query).fetch_all())
.and_then(move |(_, block): (_, Block)| {
for row in block.rows() {
let nums: Vec<u32> = row.get("nums")?;
let text: Vec<&str> = row.get("text")?;
println!("{:?},\t{:?}", nums, text);
}
Ok(())
})
.map_err(|err| eprintln!("database error: {}", err));

tokio::run(done)
}
33 changes: 12 additions & 21 deletions examples/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ extern crate futures;

use std::env;

use futures::Future;
use clickhouse_rs::{types::Block, Pool};
use futures::Future;

fn main() {
let ddl = "
Expand All @@ -16,22 +16,10 @@ fn main() {
) Engine=Memory";

let block = Block::new()
.add_column(
"text",
vec![[0, 159, 146, 150].as_ref(), b"ABCD"],
)
.add_column(
"fx_text",
vec![b"ABCD".as_ref(), &[0, 159, 146, 150]],
)
.add_column(
"opt_text",
vec![Some(vec![0, 159, 146, 150]), None]
)
.add_column(
"fx_opt_text",
vec![None, Some(vec![0, 159, 146, 150])]
);
.add_column("text", vec![[0, 159, 146, 150].as_ref(), b"ABCD"])
.add_column("fx_text", vec![b"ABCD".as_ref(), &[0, 159, 146, 150]])
.add_column("opt_text", vec![Some(vec![0, 159, 146, 150]), None])
.add_column("fx_opt_text", vec![None, Some(vec![0, 159, 146, 150])]);

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
Expand All @@ -41,12 +29,15 @@ fn main() {
.get_handle()
.and_then(move |c| c.execute(ddl))
.and_then(move |c| c.insert("test_blob", block))
.and_then(move |c| c.query("SELECT text, fx_text, opt_text, fx_opt_text FROM test_blob").fetch_all())
.and_then(move |c| {
c.query("SELECT text, fx_text, opt_text, fx_opt_text FROM test_blob")
.fetch_all()
})
.and_then(move |(_, block)| {
for row in block.rows() {
let text: &[u8] = row.get("text")?;
let fx_text: &[u8] = row.get("fx_text")?;
let opt_text: Option<&[u8]> = row.get("opt_text")?;
let text: &[u8] = row.get("text")?;
let fx_text: &[u8] = row.get("fx_text")?;
let opt_text: Option<&[u8]> = row.get("opt_text")?;
let fx_opt_text: Option<&[u8]> = row.get("fx_opt_text")?;
println!(
"{:?}\t{:?}\t{:?}\t{:?}",
Expand Down
5 changes: 4 additions & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ fn main() {
let block = Block::new()
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
.add_column(
"account_name",
vec![Some("foo"), None, None, None, Some("bar")],
);

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
Expand Down
10 changes: 7 additions & 3 deletions src/binary/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ impl Encoder {
}

pub fn string(&mut self, text: impl AsRef<str>) {
let str = text.as_ref().as_bytes();
self.uvarint(str.len() as u64);
self.write_bytes(str);
let bytes = text.as_ref().as_bytes();
self.byte_string(bytes);
}

pub fn byte_string(&mut self, source: impl AsRef<[u8]>) {
self.uvarint(source.as_ref().len() as u64);
self.write_bytes(source.as_ref());
}

pub fn write<T>(&mut self, value: T)
Expand Down
2 changes: 1 addition & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, io, mem, string::FromUtf8Error, str::Utf8Error};
use std::{borrow::Cow, io, mem, str::Utf8Error, string::FromUtf8Error};

use failure::*;
use tokio::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! * String, FixedString(N)
//! * UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
//! * Nullable(T)
//! * Array(UInt/Int/String/Date/DateTime)
//!
//! ### DNS
//!
Expand Down
115 changes: 115 additions & 0 deletions src/types/column/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{
column::{list::List, BoxColumnWrapper, ColumnData},
SqlType, Value, ValueRef,
},
};
use chrono_tz::Tz;

pub(crate) struct ArrayColumnData {
pub(crate) inner: Box<dyn ColumnData + Send + Sync>,
pub(crate) offsets: List<u64>,
}

impl ArrayColumnData {
pub(crate) fn load<R: ReadEx>(
reader: &mut R,
type_name: &str,
rows: usize,
tz: Tz,
) -> Result<Self, Error> {
let mut offsets = List::with_capacity(rows);
offsets.resize(rows, 0_u64);
reader.read_bytes(offsets.as_mut())?;

let size = match rows {
0 => 0,
_ => offsets.at(rows - 1) as usize,
};
let inner = ColumnData::load_data::<BoxColumnWrapper, _>(reader, type_name, size, tz)?;

Ok(ArrayColumnData { inner, offsets })
}
}

impl ColumnData for ArrayColumnData {
fn sql_type(&self) -> SqlType {
let inner_type = self.inner.sql_type();
SqlType::Array(inner_type.into())
}

fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
let mut offset = 0_u64;

for i in start..end {
offset = self.offsets.at(i);
encoder.write(offset);
}

self.inner.save(encoder, 0, offset as usize);
}

fn len(&self) -> usize {
self.offsets.len()
}

fn push(&mut self, value: Value) {
if let Value::Array(_, vs) = value {
let offsets_len = self.offsets.len();
let prev = if offsets_len == 0 {
0_usize
} else {
self.offsets.at(offsets_len - 1) as usize
};

self.offsets.push((prev + vs.len()) as u64);
for v in vs {
self.inner.push(v);
}
} else {
panic!("value should be an array")
}
}

fn at(&self, index: usize) -> ValueRef {
let sql_type = self.inner.sql_type();

let start = if index > 0 {
self.offsets.at(index - 1) as usize
} else {
0_usize
};
let end = self.offsets.at(index) as usize;
let mut vs = Vec::with_capacity(end);
for i in start..end {
let v = self.inner.at(i);
vs.push(v);
}
ValueRef::Array(sql_type, vs)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::types::Block;
use std::io::Cursor;

#[test]
fn test_write_and_read() {
let block = Block::new().add_column(
"vals",
vec![vec![7_u32, 8], vec![9, 1, 2], vec![3, 4, 5, 6]],
);

let mut encoder = Encoder::new();
block.write(&mut encoder, false);

let mut reader = Cursor::new(encoder.get_buffer_ref());
let rblock = Block::load(&mut reader, Tz::Zulu, false).unwrap();

assert_eq!(block, rblock);
}
}
53 changes: 52 additions & 1 deletion src/types/column/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{
column::{nullable::NullableColumnData, BoxColumnWrapper, ColumnWrapper, Either},
column::{
array::ArrayColumnData, nullable::NullableColumnData, BoxColumnWrapper, ColumnWrapper,
Either,
},
DateConverter, Marshal, SqlType, StatBuffer, Unmarshal, Value, ValueRef,
},
};
Expand Down Expand Up @@ -80,6 +83,54 @@ impl ColumnFrom for Vec<Date<Tz>> {
}
}

impl ColumnFrom for Vec<Vec<Date<Tz>>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let fake: Vec<Date<Tz>> = Vec::with_capacity(source.len());
let inner = Vec::column_from::<BoxColumnWrapper>(fake);
let sql_type = inner.sql_type();

let mut data = ArrayColumnData {
inner,
offsets: List::with_capacity(source.len()),
};

for vs in source {
let mut inner = Vec::with_capacity(vs.len());
for v in vs {
let value: Value = Value::Date(v);
inner.push(value);
}
data.push(Value::Array(sql_type, inner));
}

W::wrap(data)
}
}

impl ColumnFrom for Vec<Vec<DateTime<Tz>>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let fake: Vec<DateTime<Tz>> = Vec::with_capacity(source.len());
let inner = Vec::column_from::<BoxColumnWrapper>(fake);
let sql_type = inner.sql_type();

let mut data = ArrayColumnData {
inner,
offsets: List::with_capacity(source.len()),
};

for vs in source {
let mut inner = Vec::with_capacity(vs.len());
for v in vs {
let value: Value = Value::DateTime(v);
inner.push(value);
}
data.push(Value::Array(sql_type, inner));
}

W::wrap(data)
}
}

impl ColumnFrom for Vec<Option<DateTime<Tz>>> {
fn column_from<W: ColumnWrapper>(source: Self) -> <W as ColumnWrapper>::Wrapper {
let fake: Vec<DateTime<Tz>> = Vec::with_capacity(source.len());
Expand Down
Loading

0 comments on commit 32386ec

Please sign in to comment.