From fc2d20569e002e8416dfef14eae2dd3ed53c0854 Mon Sep 17 00:00:00 2001 From: mikhail Date: Wed, 8 May 2019 15:55:37 +0300 Subject: [PATCH] Blob support --- Cargo.toml | 6 +- README.md | 4 +- examples/binary.rs | 61 +++++++++++++++++++ src/binary/read_ex.rs | 3 - src/errors.rs | 3 +- src/types/block/mod.rs | 63 +++---------------- src/types/column/fixed_string.rs | 16 ++--- src/types/column/mod.rs | 44 ++++++++++++++ src/types/column/string.rs | 33 +++++++++- src/types/column/string_pool.rs | 11 ++-- src/types/from_sql.rs | 12 ++++ src/types/value.rs | 100 ++++++++++++++++++++----------- src/types/value_ref.rs | 54 ++++++++++++++--- tests/clickhouse.rs | 71 ++++++++++++++++++++-- 14 files changed, 353 insertions(+), 128 deletions(-) create mode 100644 examples/binary.rs diff --git a/Cargo.toml b/Cargo.toml index 9f56e684..3aa7366b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "clickhouse-rs" -version = "0.1.11" +version = "0.1.12" authors = ["Mikhail Sukharev "] license = "MIT" homepage = "https://github.com/suharev7/clickhouse-rs" @@ -15,8 +15,8 @@ exclude = ["test/*"] [dependencies] log = "0.4.6" -futures = "0.1.25" -tokio = "0.1.13" +futures = "0.1.26" +tokio = "0.1.18" tokio-timer = "0.2.10" hostname = "^0.1" diff --git a/README.md b/README.md index 4e590ea1..5003fd10 100644 --- a/README.md +++ b/README.md @@ -71,8 +71,8 @@ pub fn main() { ) Engine=Memory"; let block = Block::new() - .add_column("customer_id", vec![1_u32, 3, 5, 7, 9]) - .add_column("amount", vec![2_u32, 4, 6, 8, 10]) + .add_column("customer_id", vec![1_u32, 3, 5, 7, 9]) + .add_column("amount", vec![2_u32, 4, 6, 8, 10]) .add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]); let pool = Pool::new(database_url); diff --git a/examples/binary.rs b/examples/binary.rs new file mode 100644 index 00000000..96823718 --- /dev/null +++ b/examples/binary.rs @@ -0,0 +1,61 @@ +extern crate clickhouse_rs; +extern crate futures; + +use std::env; + +use futures::Future; +use clickhouse_rs::{types::Block, Pool}; + +fn main() { + let ddl = " + CREATE TABLE IF NOT EXISTS test_blob ( + text String, + fx_text FixedString(4), + opt_text Nullable(String), + fx_opt_text Nullable(FixedString(4)) + ) Engine=Memory"; + + let block = Block::new() + .add_column( + "text", + vec![[0, 159, 146, 150].as_ref(), b"ABCD"], + ) + .add_column( + "fx_text", + vec![b"ABCD".as_ref(), &[0, 159, 146, 150]], + ) + .add_column( + "opt_text", + vec![Some(vec![0, 159, 146, 150]), None] + ) + .add_column( + "fx_opt_text", + vec![None, Some(vec![0, 159, 146, 150])] + ); + + let database_url = + env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into()); + let pool = Pool::new(database_url); + + let done = pool + .get_handle() + .and_then(move |c| c.execute(ddl)) + .and_then(move |c| c.insert("test_blob", block)) + .and_then(move |c| c.query("SELECT text, fx_text, opt_text, fx_opt_text FROM test_blob").fetch_all()) + .and_then(move |(_, block)| { + for row in block.rows() { + let text: &[u8] = row.get("text")?; + let fx_text: &[u8] = row.get("fx_text")?; + let opt_text: Option<&[u8]> = row.get("opt_text")?; + let fx_opt_text: Option<&[u8]> = row.get("fx_opt_text")?; + println!( + "{:?}\t{:?}\t{:?}\t{:?}", + text, fx_text, opt_text, fx_opt_text + ); + } + Ok(()) + }) + .map_err(|err| eprintln!("database error: {}", err)); + + tokio::run(done) +} diff --git a/src/binary/read_ex.rs b/src/binary/read_ex.rs index 59f03f1e..e85da11d 100644 --- a/src/binary/read_ex.rs +++ b/src/binary/read_ex.rs @@ -5,8 +5,6 @@ use crate::{ types::{column::StringPool, ClickhouseResult, StatBuffer, Unmarshal}, }; -use std::str; - pub(crate) trait ReadEx { fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()>; fn read_scalar(&mut self) -> ClickhouseResult @@ -81,7 +79,6 @@ where let str_len = self.read_uvarint()? as usize; let buffer = pool.allocate(str_len); self.read_bytes(buffer)?; - str::from_utf8(buffer)?; Ok(()) } } diff --git a/src/errors.rs b/src/errors.rs index 621853df..843dd1c7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, io, mem, string::FromUtf8Error}; +use std::{borrow::Cow, io, mem, string::FromUtf8Error, str::Utf8Error}; use failure::*; use tokio::prelude::*; @@ -7,7 +7,6 @@ use tokio_timer::Error as TimerError; use url::ParseError; use crate::types::Packet; -use std::str::Utf8Error; /// This type enumerates library errors. #[derive(Debug, Fail)] diff --git a/src/types/block/mod.rs b/src/types/block/mod.rs index bca8d32f..a0b5575f 100644 --- a/src/types/block/mod.rs +++ b/src/types/block/mod.rs @@ -1,7 +1,6 @@ use std::{ cmp, fmt, io::{Cursor, Read}, - sync::Arc, }; use byteorder::{LittleEndian, WriteBytesExt}; @@ -13,13 +12,8 @@ use crate::{ binary::{protocol, Encoder, ReadEx}, errors::{Error, FromSqlError}, types::{ - ClickhouseResult, - FromSql, - SqlType, - column::{ - {self, Column, ColumnFrom, ArcColumnWrapper}, - fixed_string::{FixedStringAdapter, NullableFixedStringAdapter} - } + column::{self, ArcColumnWrapper, Column, ColumnFrom}, + ClickhouseResult, FromSql, }, }; @@ -200,8 +194,8 @@ impl Block { impl Block { pub(crate) fn cast_to(self, header: &Block) -> Result { - let mut columns = self.columns; let info = self.info; + let mut columns = self.columns; columns.reverse(); if header.column_count() != columns.len() { @@ -209,54 +203,11 @@ impl Block { } let mut new_columns = Vec::with_capacity(columns.len()); - for column in header.columns().iter() { - let old_column = columns.pop().unwrap(); + for column in header.columns() { let dst_type = column.sql_type(); - let src_type = old_column.sql_type(); - - if dst_type == src_type { - new_columns.push(old_column); - continue; - } - - if let SqlType::FixedString(str_len) = dst_type { - if src_type == SqlType::String { - let name = old_column.name().to_owned(); - let adapter = FixedStringAdapter { - column: old_column, - str_len, - }; - new_columns.push(Column { - name, - data: Arc::new(adapter), - }); - continue; - } - } - - if let SqlType::Nullable(left_type) = dst_type { - if let SqlType::FixedString(str_len) = left_type { - if let SqlType::Nullable(right_type) = src_type { - if *right_type == SqlType::String { - let name = old_column.name().to_owned(); - let adapter = NullableFixedStringAdapter { - column: old_column, - str_len: *str_len, - }; - new_columns.push(Column { - name, - data: Arc::new(adapter), - }); - continue; - } - } - } - } - - return Err(Error::FromSql(FromSqlError::InvalidType { - src: src_type.to_string(), - dst: dst_type.to_string(), - })); + let old_column = columns.pop().unwrap(); + let new_column = old_column.cast_to(dst_type)?; + new_columns.push(new_column); } Ok(Block { diff --git a/src/types/column/fixed_string.rs b/src/types/column/fixed_string.rs index 226e1d1e..94f1f681 100644 --- a/src/types/column/fixed_string.rs +++ b/src/types/column/fixed_string.rs @@ -1,9 +1,9 @@ -use std::{cmp, str}; +use std::cmp; use crate::{ binary::{Encoder, ReadEx}, errors::Error, - types::{Column, SqlType, Value, ValueRef, from_sql::*}, + types::{from_sql::*, Column, SqlType, Value, ValueRef}, }; use super::column_data::ColumnData; @@ -74,7 +74,7 @@ impl ColumnData for FixedStringColumnData { fn at(&self, index: usize) -> ValueRef { let shift = index * self.str_len; let str_ref = &self.buffer[shift..shift + self.str_len]; - ValueRef::String(unsafe { str::from_utf8_unchecked(str_ref) }) + ValueRef::String(str_ref) } } @@ -86,9 +86,9 @@ impl ColumnData for FixedStringAdapter { fn save(&self, encoder: &mut Encoder, start: usize, end: usize) { let mut buffer = Vec::with_capacity(self.str_len); for index in start..end { - let string_ref = self.column.at(index).as_str().unwrap(); + let string_ref = self.column.at(index).as_bytes().unwrap(); buffer.resize(0, 0); - buffer.extend(string_ref.as_bytes()); + buffer.extend(string_ref); buffer.resize(self.str_len, 0); encoder.write_bytes(&buffer[..]); } @@ -115,9 +115,9 @@ impl ColumnData for NullableFixedStringAdapter { fn save(&self, encoder: &mut Encoder, start: usize, end: usize) { let size = end - start; let mut nulls = vec![0; size]; - let mut values: Vec> = vec![None; size]; + let mut values: Vec> = vec![None; size]; - for (i, index) in (start .. end).enumerate() { + for (i, index) in (start..end).enumerate() { values[i] = Option::from_sql(self.at(index)).unwrap(); if values[i].is_none() { nulls[i] = 1; @@ -130,7 +130,7 @@ impl ColumnData for NullableFixedStringAdapter { for value in values { buffer.resize(0, 0); if let Some(string_ref) = value { - buffer.extend(string_ref.as_bytes()); + buffer.extend(string_ref); } buffer.resize(self.str_len, 0); encoder.write_bytes(buffer.as_ref()); diff --git a/src/types/column/mod.rs b/src/types/column/mod.rs index 9ffdc78a..36460a85 100644 --- a/src/types/column/mod.rs +++ b/src/types/column/mod.rs @@ -14,6 +14,10 @@ pub use self::{ }; pub(crate) use self::string_pool::StringPool; +use crate::{ + errors::{Error, FromSqlError}, + types::column::fixed_string::{FixedStringAdapter, NullableFixedStringAdapter}, +}; mod chunk; mod column_data; @@ -132,6 +136,46 @@ impl Column { data: Arc::new(data), } } + + pub fn cast_to(self, dst_type: SqlType) -> ClickhouseResult { + let src_type = self.sql_type(); + + if dst_type == src_type { + return Ok(self); + } + + match (dst_type, src_type) { + (SqlType::FixedString(str_len), SqlType::String) => { + let name = self.name().to_owned(); + let adapter = FixedStringAdapter { + column: self, + str_len, + }; + Ok(Column { + name, + data: Arc::new(adapter), + }) + } + ( + SqlType::Nullable(SqlType::FixedString(str_len)), + SqlType::Nullable(SqlType::String), + ) => { + let name = self.name().to_owned(); + let adapter = NullableFixedStringAdapter { + column: self, + str_len: *str_len, + }; + Ok(Column { + name, + data: Arc::new(adapter), + }) + } + _ => Err(Error::FromSql(FromSqlError::InvalidType { + src: src_type.to_string(), + dst: dst_type.to_string(), + })), + } + } } pub fn new_column(name: &str, data: Arc<(ColumnData + Sync + Send + 'static)>) -> Column { diff --git a/src/types/column/string.rs b/src/types/column/string.rs index 650eaec9..e2ed8ab4 100644 --- a/src/types/column/string.rs +++ b/src/types/column/string.rs @@ -46,6 +46,18 @@ impl<'a> ColumnFrom for Vec<&'a str> { } } +impl ColumnFrom for Vec> { + fn column_from(data: Self) -> W::Wrapper { + W::wrap(StringColumnData { pool: data.into() }) + } +} + +impl<'a> ColumnFrom for Vec<&'a [u8]> { + fn column_from(data: Self) -> W::Wrapper { + W::wrap(StringColumnData { pool: data.into() }) + } +} + impl ColumnFrom for Vec> { fn column_from(source: Self) -> W::Wrapper { let inner = Box::new(StringColumnData::with_capacity(source.len())); @@ -63,6 +75,23 @@ impl ColumnFrom for Vec> { } } +impl ColumnFrom for Vec>> { + fn column_from(source: Self) -> W::Wrapper { + let inner = Box::new(StringColumnData::with_capacity(source.len())); + + let mut data = NullableColumnData { + inner, + nulls: Vec::with_capacity(source.len()), + }; + + for value in source { + data.push(value.into()); + } + + W::wrap(data) + } +} + impl ColumnFrom for Vec> { fn column_from(source: Self) -> W::Wrapper { let inner = Box::new(StringColumnData::with_capacity(source.len())); @@ -97,9 +126,9 @@ impl ColumnData for StringColumnData { } fn push(&mut self, value: Value) { - let s: String = value.into(); + let s: Vec = value.into(); let mut b = self.pool.allocate(s.len()); - b.write_all(s.as_bytes()).unwrap(); + b.write_all(s.as_ref()).unwrap(); } fn at(&self, index: usize) -> ValueRef { diff --git a/src/types/column/string_pool.rs b/src/types/column/string_pool.rs index 2c7a6742..951e7b6b 100644 --- a/src/types/column/string_pool.rs +++ b/src/types/column/string_pool.rs @@ -36,12 +36,15 @@ impl<'a> Iterator for StringIter<'a> { } } -impl From> for StringPool { - fn from(source: Vec) -> Self { +impl From> for StringPool +where + T: AsRef<[u8]>, +{ + fn from(source: Vec) -> Self { let mut pool = StringPool::with_capacity(source.len()); for s in source.iter() { - let mut b = pool.allocate(s.len()); - b.write_all(s.as_bytes()).unwrap(); + let mut b = pool.allocate(s.as_ref().len()); + b.write_all(s.as_ref()).unwrap(); } pool } diff --git a/src/types/from_sql.rs b/src/types/from_sql.rs index 0518d6da..1a451973 100644 --- a/src/types/from_sql.rs +++ b/src/types/from_sql.rs @@ -36,12 +36,24 @@ impl<'a> FromSql<'a> for &'a str { } } +impl<'a> FromSql<'a> for &'a [u8] { + fn from_sql(value: ValueRef<'a>) -> FromSqlResult<&'a [u8]> { + value.as_bytes() + } +} + impl<'a> FromSql<'a> for String { fn from_sql(value: ValueRef<'a>) -> FromSqlResult { value.as_str().map(str::to_string) } } +impl<'a> FromSql<'a> for Vec { + fn from_sql(value: ValueRef<'a>) -> FromSqlResult { + value.as_bytes().map(|bs| bs.to_vec()) + } +} + impl<'a, T> FromSql<'a> for Option where T: FromSql<'a>, diff --git a/src/types/value.rs b/src/types/value.rs index c443ace6..f2d4aa36 100644 --- a/src/types/value.rs +++ b/src/types/value.rs @@ -1,4 +1,4 @@ -use std::{convert, fmt}; +use std::{convert, fmt, mem, str}; use chrono::prelude::*; use chrono_tz::Tz; @@ -19,7 +19,7 @@ pub enum Value { Int16(i16), Int32(i32), Int64(i64), - String(String), + String(Vec), Float32(f32), Float64(f64), Date(Date), @@ -38,11 +38,8 @@ impl Value { SqlType::Int16 => Value::Int16(0), SqlType::Int32 => Value::Int32(0), SqlType::Int64 => Value::Int64(0), - SqlType::String => Value::String(String::default()), - SqlType::FixedString(str_len) => { - let bytes = vec![0_u8; str_len]; - Value::String(unsafe { String::from_utf8_unchecked(bytes) }) - } + SqlType::String => Value::String(Vec::default()), + SqlType::FixedString(str_len) => Value::String(vec![0_u8; str_len]), SqlType::Float32 => Value::Float32(0.0), SqlType::Float64 => Value::Float64(0.0), SqlType::Date => 0_u16.to_date(Tz::Zulu).into(), @@ -63,7 +60,10 @@ impl fmt::Display for Value { Value::Int16(ref v) => fmt::Display::fmt(v, f), Value::Int32(ref v) => fmt::Display::fmt(v, f), Value::Int64(ref v) => fmt::Display::fmt(v, f), - Value::String(ref v) => fmt::Display::fmt(v, f), + Value::String(ref v) => match str::from_utf8(v) { + Ok(s) => fmt::Display::fmt(s, f), + Err(_) => write!(f, "{:?}", v), + }, Value::Float32(ref v) => fmt::Display::fmt(v, f), Value::Float64(ref v) => fmt::Display::fmt(v, f), Value::DateTime(ref time) if f.alternate() => write!(f, "{}", time.to_rfc2822()), @@ -122,11 +122,11 @@ where } macro_rules! value_from { - ( $( $t:ident : $k:ident ),* ) => { + ( $( $t:ty : $k:ident ),* ) => { $( impl convert::From<$t> for Value { fn from(v: $t) -> Value { - Value::$k(v) + Value::$k(v.into()) } } )* @@ -148,29 +148,45 @@ value_from! { f32: Float32, f64: Float64, - String: String + &[u8]: String, + String: String, + Vec: String } impl<'a> convert::From<&'a str> for Value { fn from(v: &'a str) -> Self { - Value::String(v.to_string()) + Value::String(v.as_bytes().into()) } } impl convert::From for String { + fn from(mut v: Value) -> Self { + if let Value::String(ref mut x) = &mut v { + let mut tmp = Vec::new(); + mem::swap(x, &mut tmp); + if let Ok(result) = String::from_utf8(tmp) { + return result; + } + } + let from = SqlType::from(v); + panic!("Can't convert Value::{} into String.", from); + } +} + +impl convert::From for Vec { fn from(v: Value) -> Self { match v { - Value::String(x) => x, + Value::String(bs) => bs, _ => { let from = SqlType::from(v); - panic!("Can't convert Value::{} into String.", from) + panic!("Can't convert Value::{} into Vec.", from) } } } } macro_rules! from_value { - ( $( $t:ident : $k:ident ),* ) => { + ( $( $t:ty : $k:ident ),* ) => { $( impl convert::From for $t { fn from(v: Value) -> $t { @@ -204,8 +220,10 @@ from_value! { mod test { use std::fmt; - use rand::distributions::{Distribution, Standard}; - use rand::random; + use rand::{ + random, + distributions::{Distribution, Standard} + }; use super::*; @@ -285,45 +303,57 @@ mod test { #[test] fn test_string_from() { - let v = Value::String("df47a455-bb3c-4bd6-b2f2-a24be3db36ab".to_string()); + let v = Value::String(b"df47a455-bb3c-4bd6-b2f2-a24be3db36ab".to_vec()); let u = String::from(v); - assert_eq!(u, "df47a455-bb3c-4bd6-b2f2-a24be3db36ab".to_string()); + assert_eq!("df47a455-bb3c-4bd6-b2f2-a24be3db36ab".to_string(), u); } #[test] fn test_into_string() { - let v = Value::String("d2384838-dfe8-43ea-b1f7-63fb27b91088".to_string()); + let v = Value::String(b"d2384838-dfe8-43ea-b1f7-63fb27b91088".to_vec()); let u: String = v.into(); - assert_eq!(u, "d2384838-dfe8-43ea-b1f7-63fb27b91088".to_string()); + assert_eq!("d2384838-dfe8-43ea-b1f7-63fb27b91088".to_string(), u); + } + + #[test] + fn test_into_vec() { + let v = Value::String(vec![1, 2, 3]); + let u: Vec = v.into(); + assert_eq!(vec![1, 2, 3], u); } #[test] fn test_display() { - assert_eq!(format!("{}", Value::UInt8(42)), "42".to_string()); - assert_eq!(format!("{}", Value::UInt16(42)), "42".to_string()); - assert_eq!(format!("{}", Value::UInt32(42)), "42".to_string()); - assert_eq!(format!("{}", Value::UInt64(42)), "42".to_string()); + assert_eq!("42".to_string(), format!("{}", Value::UInt8(42))); + assert_eq!("42".to_string(), format!("{}", Value::UInt16(42))); + assert_eq!("42".to_string(), format!("{}", Value::UInt32(42))); + assert_eq!("42".to_string(), format!("{}", Value::UInt64(42))); - assert_eq!(format!("{}", Value::Int8(42)), "42".to_string()); - assert_eq!(format!("{}", Value::Int16(42)), "42".to_string()); - assert_eq!(format!("{}", Value::Int32(42)), "42".to_string()); - assert_eq!(format!("{}", Value::Int64(42)), "42".to_string()); + assert_eq!("42".to_string(), format!("{}", Value::Int8(42))); + assert_eq!("42".to_string(), format!("{}", Value::Int16(42))); + assert_eq!("42".to_string(), format!("{}", Value::Int32(42))); + assert_eq!("42".to_string(), format!("{}", Value::Int64(42))); + + assert_eq!( + "text".to_string(), + format!("{}", Value::String(b"text".to_vec())) + ); assert_eq!( - format!("{}", Value::String("text".to_string())), - "text".to_string() + "\u{1}\u{2}\u{3}".to_string(), + format!("{}", Value::String(vec![1, 2, 3])) ); assert_eq!( - format!("{}", Value::Nullable(Either::Left(SqlType::UInt8))), - "NULL".to_string() + "NULL".to_string(), + format!("{}", Value::Nullable(Either::Left(SqlType::UInt8))) ); assert_eq!( + "42".to_string(), format!( "{}", Value::Nullable(Either::Right(Box::new(Value::UInt8(42)))) - ), - "42".to_string() + ) ); } diff --git a/src/types/value_ref.rs b/src/types/value_ref.rs index 30950757..8628b005 100644 --- a/src/types/value_ref.rs +++ b/src/types/value_ref.rs @@ -1,4 +1,4 @@ -use std::{convert, fmt}; +use std::{convert, fmt, str}; use chrono::prelude::*; use chrono_tz::Tz; @@ -18,7 +18,7 @@ pub enum ValueRef<'a> { Int16(i16), Int32(i32), Int64(i64), - String(&'a str), + String(&'a [u8]), Float32(f32), Float64(f64), Date(Date), @@ -37,7 +37,10 @@ impl<'a> fmt::Display for ValueRef<'a> { ValueRef::Int16(v) => fmt::Display::fmt(v, f), ValueRef::Int32(v) => fmt::Display::fmt(v, f), ValueRef::Int64(v) => fmt::Display::fmt(v, f), - ValueRef::String(v) => fmt::Display::fmt(v, f), + ValueRef::String(v) => match str::from_utf8(v) { + Ok(s) => fmt::Display::fmt(s, f), + Err(_) => write!(f, "{:?}", *v), + }, ValueRef::Float32(v) => fmt::Display::fmt(v, f), ValueRef::Float64(v) => fmt::Display::fmt(v, f), ValueRef::Date(v) if f.alternate() => fmt::Display::fmt(v, f), @@ -78,13 +81,24 @@ impl<'a> convert::From> for SqlType { impl<'a> ValueRef<'a> { pub fn as_str(&self) -> ClickhouseResult<&'a str> { + if let ValueRef::String(t) = self { + return Ok(str::from_utf8(t)?); + } + let from = SqlType::from(self.clone()).to_string(); + Err(Error::FromSql(FromSqlError::InvalidType { + src: from, + dst: "&str".into(), + })) + } + + pub fn as_bytes(&self) -> ClickhouseResult<&'a [u8]> { if let ValueRef::String(t) = self { return Ok(t); } let from = SqlType::from(self.clone()).to_string(); Err(Error::FromSql(FromSqlError::InvalidType { src: from, - dst: "String".into(), + dst: "&[u8]".into(), })) } } @@ -100,7 +114,7 @@ impl<'a> From> for Value { ValueRef::Int16(v) => Value::Int16(v), ValueRef::Int32(v) => Value::Int32(v), ValueRef::Int64(v) => Value::Int64(v), - ValueRef::String(v) => Value::String(v.to_string()), + ValueRef::String(v) => Value::String(v.into()), ValueRef::Float32(v) => Value::Float32(v), ValueRef::Float64(v) => Value::Float64(v), ValueRef::Date(v) => Value::Date(v), @@ -118,12 +132,18 @@ impl<'a> From> for Value { impl<'a> From<&'a str> for ValueRef<'a> { fn from(s: &str) -> ValueRef { - ValueRef::String(s) + ValueRef::String(s.as_bytes()) + } +} + +impl<'a> From<&'a [u8]> for ValueRef<'a> { + fn from(bs: &[u8]) -> ValueRef { + ValueRef::String(bs) } } macro_rules! from_number { - ( $($t:ident: $k:ident),* ) => { + ( $($t:ty: $k:ident),* ) => { $( impl<'a> From<$t> for ValueRef<'a> { fn from(v: $t) -> ValueRef<'static> { @@ -177,7 +197,7 @@ impl<'a> From<&'a Value> for ValueRef<'a> { } macro_rules! value_from { - ( $( $t:ident: $k:ident ),* ) => { + ( $( $t:ty: $k:ident ),* ) => { $( impl<'a> From> for $t { fn from(value: ValueRef<'a>) -> Self { @@ -207,3 +227,21 @@ value_from! { f32: Float32, f64: Float64 } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_display() { + assert_eq!( + "[0, 159, 146, 150]".to_string(), + format!("{}", ValueRef::String(&[0, 159, 146, 150])) + ); + + assert_eq!( + "text".to_string(), + format!("{}", ValueRef::String(b"text")) + ); + } +} diff --git a/tests/clickhouse.rs b/tests/clickhouse.rs index d6f83173..2fc7b92d 100644 --- a/tests/clickhouse.rs +++ b/tests/clickhouse.rs @@ -40,7 +40,7 @@ fn test_ping() { } #[test] -fn fn_connection_by_wrong_address() { +fn test_connection_by_wrong_address() { let pool = Pool::new("tcp://badaddr:9000"); let done = pool.get_handle().and_then(ClientHandle::ping).map(|_| ()); @@ -516,7 +516,7 @@ fn test_generic_column() { #[test] fn test_fixed_string() { let ddl = " - CREATE TABLE clickhouse_test_fixed ( + CREATE TABLE clickhouse_test_fixed_string ( text FixedString(4), opt_text Nullable(FixedString(4)) ) Engine=Memory"; @@ -525,7 +525,7 @@ fn test_fixed_string() { SELECT text, opt_text - FROM clickhouse_test_fixed"; + FROM clickhouse_test_fixed_string"; let block = Block::new() .add_column("opt_text", vec![Some("text")]) @@ -534,9 +534,9 @@ fn test_fixed_string() { let pool = Pool::new(database_url()); let done = pool .get_handle() - .and_then(|c| c.execute("DROP TABLE IF EXISTS clickhouse_test_fixed")) + .and_then(|c| c.execute("DROP TABLE IF EXISTS clickhouse_test_fixed_string")) .and_then(move |c| c.execute(ddl)) - .and_then(move |c| c.insert("clickhouse_test_fixed", block)) + .and_then(move |c| c.insert("clickhouse_test_fixed_string", block)) .and_then(move |c| c.query(query).fetch_all()) .and_then(move |(_, block)| { let text: &str = block.get(0, "text")?; @@ -548,5 +548,66 @@ fn test_fixed_string() { Ok(()) }); + run(done).unwrap(); +} + +#[test] +fn test_binary_string() { + let ddl = " + CREATE TABLE IF NOT EXISTS clickhouse_binary_string ( + text String, + fx_text FixedString(4), + opt_text Nullable(String), + fx_opt_text Nullable(FixedString(4)) + ) Engine=Memory"; + + let query = " + SELECT + text, + fx_text, + opt_text, + fx_opt_text + FROM clickhouse_binary_string"; + + let block = Block::new() + .add_column( + "text", + vec![vec![0, 159, 146, 150]], + ) + .add_column( + "fx_text", + vec![vec![0, 159, 146, 150]], + ) + .add_column( + "opt_text", + vec![Some(vec![0, 159, 146, 150])] + ) + .add_column( + "fx_opt_text", + vec![Some(vec![0, 159, 146, 150])] + ); + + let pool = Pool::new(database_url()); + let done = pool + .get_handle() + .and_then(|c| c.execute("DROP TABLE IF EXISTS clickhouse_binary_string")) + .and_then(move |c| c.execute(ddl)) + .and_then(move |c| c.insert("clickhouse_binary_string", block)) + .and_then(move |c| c.query(query).fetch_all()) + .and_then(move |(_, block)| { + let text: &[u8] = block.get(0, "text")?; + let fx_text: &[u8] = block.get(0, "fx_text")?; + let opt_text: Option<&[u8]> = block.get(0, "opt_text")?; + let fx_opt_text: Option<&[u8]> = block.get(0, "fx_opt_text")?; + + assert_eq!(1, block.row_count()); + assert_eq!([0, 159, 146, 150].as_ref(), text); + assert_eq!([0, 159, 146, 150].as_ref(), fx_text); + assert_eq!(Some([0, 159, 146, 150].as_ref()), opt_text); + assert_eq!(Some([0, 159, 146, 150].as_ref()), fx_opt_text); + + Ok(()) + }); + run(done).unwrap(); } \ No newline at end of file