Skip to content

Commit

Permalink
Merge fc2d205 into 6b09480
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed May 10, 2019
2 parents 6b09480 + fc2d205 commit d4a92c9
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 128 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.11"
version = "0.1.12"
authors = ["Mikhail Sukharev <suharev7@gmail.com>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand All @@ -15,8 +15,8 @@ exclude = ["test/*"]

[dependencies]
log = "0.4.6"
futures = "0.1.25"
tokio = "0.1.13"
futures = "0.1.26"
tokio = "0.1.18"
tokio-timer = "0.2.10"
hostname = "^0.1"

Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -71,8 +71,8 @@ pub fn main() {
) Engine=Memory";

let block = Block::new()
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);

let pool = Pool::new(database_url);
Expand Down
61 changes: 61 additions & 0 deletions examples/binary.rs
@@ -0,0 +1,61 @@
extern crate clickhouse_rs;
extern crate futures;

use std::env;

use futures::Future;
use clickhouse_rs::{types::Block, Pool};

fn main() {
let ddl = "
CREATE TABLE IF NOT EXISTS test_blob (
text String,
fx_text FixedString(4),
opt_text Nullable(String),
fx_opt_text Nullable(FixedString(4))
) Engine=Memory";

let block = Block::new()
.add_column(
"text",
vec![[0, 159, 146, 150].as_ref(), b"ABCD"],
)
.add_column(
"fx_text",
vec![b"ABCD".as_ref(), &[0, 159, 146, 150]],
)
.add_column(
"opt_text",
vec![Some(vec![0, 159, 146, 150]), None]
)
.add_column(
"fx_opt_text",
vec![None, Some(vec![0, 159, 146, 150])]
);

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
let pool = Pool::new(database_url);

let done = pool
.get_handle()
.and_then(move |c| c.execute(ddl))
.and_then(move |c| c.insert("test_blob", block))
.and_then(move |c| c.query("SELECT text, fx_text, opt_text, fx_opt_text FROM test_blob").fetch_all())
.and_then(move |(_, block)| {
for row in block.rows() {
let text: &[u8] = row.get("text")?;
let fx_text: &[u8] = row.get("fx_text")?;
let opt_text: Option<&[u8]> = row.get("opt_text")?;
let fx_opt_text: Option<&[u8]> = row.get("fx_opt_text")?;
println!(
"{:?}\t{:?}\t{:?}\t{:?}",
text, fx_text, opt_text, fx_opt_text
);
}
Ok(())
})
.map_err(|err| eprintln!("database error: {}", err));

tokio::run(done)
}
3 changes: 0 additions & 3 deletions src/binary/read_ex.rs
Expand Up @@ -5,8 +5,6 @@ use crate::{
types::{column::StringPool, ClickhouseResult, StatBuffer, Unmarshal},
};

use std::str;

pub(crate) trait ReadEx {
fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()>;
fn read_scalar<V>(&mut self) -> ClickhouseResult<V>
Expand Down Expand Up @@ -81,7 +79,6 @@ where
let str_len = self.read_uvarint()? as usize;
let buffer = pool.allocate(str_len);
self.read_bytes(buffer)?;
str::from_utf8(buffer)?;
Ok(())
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/errors.rs
@@ -1,4 +1,4 @@
use std::{borrow::Cow, io, mem, string::FromUtf8Error};
use std::{borrow::Cow, io, mem, string::FromUtf8Error, str::Utf8Error};

use failure::*;
use tokio::prelude::*;
Expand All @@ -7,7 +7,6 @@ use tokio_timer::Error as TimerError;
use url::ParseError;

use crate::types::Packet;
use std::str::Utf8Error;

/// This type enumerates library errors.
#[derive(Debug, Fail)]
Expand Down
63 changes: 7 additions & 56 deletions src/types/block/mod.rs
@@ -1,7 +1,6 @@
use std::{
cmp, fmt,
io::{Cursor, Read},
sync::Arc,
};

use byteorder::{LittleEndian, WriteBytesExt};
Expand All @@ -13,13 +12,8 @@ use crate::{
binary::{protocol, Encoder, ReadEx},
errors::{Error, FromSqlError},
types::{
ClickhouseResult,
FromSql,
SqlType,
column::{
{self, Column, ColumnFrom, ArcColumnWrapper},
fixed_string::{FixedStringAdapter, NullableFixedStringAdapter}
}
column::{self, ArcColumnWrapper, Column, ColumnFrom},
ClickhouseResult, FromSql,
},
};

Expand Down Expand Up @@ -200,63 +194,20 @@ impl Block {

impl Block {
pub(crate) fn cast_to(self, header: &Block) -> Result<Self, Error> {
let mut columns = self.columns;
let info = self.info;
let mut columns = self.columns;
columns.reverse();

if header.column_count() != columns.len() {
return Err(Error::FromSql(FromSqlError::OutOfRange));
}

let mut new_columns = Vec::with_capacity(columns.len());
for column in header.columns().iter() {
let old_column = columns.pop().unwrap();
for column in header.columns() {
let dst_type = column.sql_type();
let src_type = old_column.sql_type();

if dst_type == src_type {
new_columns.push(old_column);
continue;
}

if let SqlType::FixedString(str_len) = dst_type {
if src_type == SqlType::String {
let name = old_column.name().to_owned();
let adapter = FixedStringAdapter {
column: old_column,
str_len,
};
new_columns.push(Column {
name,
data: Arc::new(adapter),
});
continue;
}
}

if let SqlType::Nullable(left_type) = dst_type {
if let SqlType::FixedString(str_len) = left_type {
if let SqlType::Nullable(right_type) = src_type {
if *right_type == SqlType::String {
let name = old_column.name().to_owned();
let adapter = NullableFixedStringAdapter {
column: old_column,
str_len: *str_len,
};
new_columns.push(Column {
name,
data: Arc::new(adapter),
});
continue;
}
}
}
}

return Err(Error::FromSql(FromSqlError::InvalidType {
src: src_type.to_string(),
dst: dst_type.to_string(),
}));
let old_column = columns.pop().unwrap();
let new_column = old_column.cast_to(dst_type)?;
new_columns.push(new_column);
}

Ok(Block {
Expand Down
16 changes: 8 additions & 8 deletions src/types/column/fixed_string.rs
@@ -1,9 +1,9 @@
use std::{cmp, str};
use std::cmp;

use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{Column, SqlType, Value, ValueRef, from_sql::*},
types::{from_sql::*, Column, SqlType, Value, ValueRef},
};

use super::column_data::ColumnData;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl ColumnData for FixedStringColumnData {
fn at(&self, index: usize) -> ValueRef {
let shift = index * self.str_len;
let str_ref = &self.buffer[shift..shift + self.str_len];
ValueRef::String(unsafe { str::from_utf8_unchecked(str_ref) })
ValueRef::String(str_ref)
}
}

Expand All @@ -86,9 +86,9 @@ impl ColumnData for FixedStringAdapter {
fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
let mut buffer = Vec::with_capacity(self.str_len);
for index in start..end {
let string_ref = self.column.at(index).as_str().unwrap();
let string_ref = self.column.at(index).as_bytes().unwrap();
buffer.resize(0, 0);
buffer.extend(string_ref.as_bytes());
buffer.extend(string_ref);
buffer.resize(self.str_len, 0);
encoder.write_bytes(&buffer[..]);
}
Expand All @@ -115,9 +115,9 @@ impl ColumnData for NullableFixedStringAdapter {
fn save(&self, encoder: &mut Encoder, start: usize, end: usize) {
let size = end - start;
let mut nulls = vec![0; size];
let mut values: Vec<Option<&str>> = vec![None; size];
let mut values: Vec<Option<&[u8]>> = vec![None; size];

for (i, index) in (start .. end).enumerate() {
for (i, index) in (start..end).enumerate() {
values[i] = Option::from_sql(self.at(index)).unwrap();
if values[i].is_none() {
nulls[i] = 1;
Expand All @@ -130,7 +130,7 @@ impl ColumnData for NullableFixedStringAdapter {
for value in values {
buffer.resize(0, 0);
if let Some(string_ref) = value {
buffer.extend(string_ref.as_bytes());
buffer.extend(string_ref);
}
buffer.resize(self.str_len, 0);
encoder.write_bytes(buffer.as_ref());
Expand Down
44 changes: 44 additions & 0 deletions src/types/column/mod.rs
Expand Up @@ -14,6 +14,10 @@ pub use self::{
};

pub(crate) use self::string_pool::StringPool;
use crate::{
errors::{Error, FromSqlError},
types::column::fixed_string::{FixedStringAdapter, NullableFixedStringAdapter},
};

mod chunk;
mod column_data;
Expand Down Expand Up @@ -132,6 +136,46 @@ impl Column {
data: Arc::new(data),
}
}

pub fn cast_to(self, dst_type: SqlType) -> ClickhouseResult<Column> {
let src_type = self.sql_type();

if dst_type == src_type {
return Ok(self);
}

match (dst_type, src_type) {
(SqlType::FixedString(str_len), SqlType::String) => {
let name = self.name().to_owned();
let adapter = FixedStringAdapter {
column: self,
str_len,
};
Ok(Column {
name,
data: Arc::new(adapter),
})
}
(
SqlType::Nullable(SqlType::FixedString(str_len)),
SqlType::Nullable(SqlType::String),
) => {
let name = self.name().to_owned();
let adapter = NullableFixedStringAdapter {
column: self,
str_len: *str_len,
};
Ok(Column {
name,
data: Arc::new(adapter),
})
}
_ => Err(Error::FromSql(FromSqlError::InvalidType {
src: src_type.to_string(),
dst: dst_type.to_string(),
})),
}
}
}

pub fn new_column(name: &str, data: Arc<(ColumnData + Sync + Send + 'static)>) -> Column {
Expand Down
33 changes: 31 additions & 2 deletions src/types/column/string.rs
Expand Up @@ -46,6 +46,18 @@ impl<'a> ColumnFrom for Vec<&'a str> {
}
}

impl ColumnFrom for Vec<Vec<u8>> {
fn column_from<W: ColumnWrapper>(data: Self) -> W::Wrapper {
W::wrap(StringColumnData { pool: data.into() })
}
}

impl<'a> ColumnFrom for Vec<&'a [u8]> {
fn column_from<W: ColumnWrapper>(data: Self) -> W::Wrapper {
W::wrap(StringColumnData { pool: data.into() })
}
}

impl ColumnFrom for Vec<Option<String>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let inner = Box::new(StringColumnData::with_capacity(source.len()));
Expand All @@ -63,6 +75,23 @@ impl ColumnFrom for Vec<Option<String>> {
}
}

impl ColumnFrom for Vec<Option<Vec<u8>>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let inner = Box::new(StringColumnData::with_capacity(source.len()));

let mut data = NullableColumnData {
inner,
nulls: Vec::with_capacity(source.len()),
};

for value in source {
data.push(value.into());
}

W::wrap(data)
}
}

impl ColumnFrom for Vec<Option<&str>> {
fn column_from<W: ColumnWrapper>(source: Self) -> W::Wrapper {
let inner = Box::new(StringColumnData::with_capacity(source.len()));
Expand Down Expand Up @@ -97,9 +126,9 @@ impl ColumnData for StringColumnData {
}

fn push(&mut self, value: Value) {
let s: String = value.into();
let s: Vec<u8> = value.into();
let mut b = self.pool.allocate(s.len());
b.write_all(s.as_bytes()).unwrap();
b.write_all(s.as_ref()).unwrap();
}

fn at(&self, index: usize) -> ValueRef {
Expand Down

0 comments on commit d4a92c9

Please sign in to comment.