Skip to content

Commit

Permalink
optional timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Oct 6, 2019
1 parent 922dca7 commit 2ad2a8b
Show file tree
Hide file tree
Showing 21 changed files with 228 additions and 169 deletions.
7 changes: 5 additions & 2 deletions src/binary/read_ex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ where

if str_len <= MAX_STACK_BUFFER_LEN {
unsafe {
let mut buffer: [MaybeUninit<u8>; MAX_STACK_BUFFER_LEN] = MaybeUninit::uninit().assume_init();
self.read_bytes(&mut *(&mut buffer[..str_len] as *mut [MaybeUninit<u8>] as *mut [u8]))?;
let mut buffer: [MaybeUninit<u8>; MAX_STACK_BUFFER_LEN] =
MaybeUninit::uninit().assume_init();
self.read_bytes(
&mut *(&mut buffer[..str_len] as *mut [MaybeUninit<u8>] as *mut [u8]),
)?;
}
} else {
let mut buffer = vec![0_u8; str_len];
Expand Down
4 changes: 1 addition & 3 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ impl ClickhouseTransport {
// Data is consumed
let new_len = self.rd.len() - pos;
unsafe {
ptr::copy(self.rd.as_ptr().add(pos),
self.rd.as_mut_ptr(),
new_len);
ptr::copy(self.rd.as_ptr().add(pos), self.rd.as_mut_ptr(), new_len);
self.rd.set_len(new_len);
}
}
Expand Down
17 changes: 9 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ macro_rules! row {
};
}

#[macro_export]
macro_rules! try_opt {
($expr:expr) => {
match $expr {
Expand Down Expand Up @@ -526,17 +525,19 @@ impl ClientHandle {
}
}

fn with_timeout<F>(f: F, timeout: Option<Duration>, release_pool: PoolBinding) -> BoxFuture<F::Item>
pub(crate) fn with_timeout<F>(
f: F,
timeout: Option<Duration>,
release_pool: PoolBinding,
) -> BoxFuture<F::Item>
where
F: Future<Error = Error> + Send + 'static,
{
if let Some(timeout) = timeout {
Box::new(f
.timeout(timeout)
.map_err(move |err| {
release_pool.release_conn();
err.into()
}))
Box::new(f.timeout(timeout).map_err(move |err| {
release_pool.release_conn();
err.into()
}))
} else {
Box::new(f.map_err(move |err| {
release_pool.release_conn();
Expand Down
25 changes: 14 additions & 11 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,13 @@ mod test {

use tokio::prelude::*;

use crate::{ClientHandle, errors::Error, io::BoxFuture, test_misc::DATABASE_URL, types::{Block, Options}};
use crate::{
errors::Error,
io::BoxFuture,
test_misc::DATABASE_URL,
types::{Block, Options},
ClientHandle,
};

use super::Pool;

Expand Down Expand Up @@ -472,7 +478,8 @@ mod test {
let test_db_url = format!("{}{}", DATABASE_URL.as_str(), "&query_timeout=10ms");
let pool = Pool::new(test_db_url.to_string());

let done = pool.get_handle()
let done = pool
.get_handle()
.and_then(|c| c.query("SELECT sleep(10)").fetch_all());

run(done).unwrap_err();
Expand All @@ -487,12 +494,10 @@ mod test {
fn test_wrong_insert() {
let pool = Pool::new(DATABASE_URL.as_str());

let done = pool
.get_handle()
.and_then(|c| {
let block = Block::new();
c.insert("unexisting", block)
});
let done = pool.get_handle().and_then(|c| {
let block = Block::new();
c.insert("unexisting", block)
});

run(done).unwrap_err();

Expand All @@ -508,9 +513,7 @@ mod test {

let done = pool
.get_handle()
.and_then(|c| {
c.execute("DROP TABLE unexisting")
});
.and_then(|c| c.execute("DROP TABLE unexisting"));

run(done).unwrap_err();

Expand Down
72 changes: 42 additions & 30 deletions src/types/block/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use std::borrow::Cow;
use chrono_tz::Tz;

use crate::{
Block,
errors::{Error, FromSqlError},
types::{
block::ColumnIdx,
column::{ArcColumnWrapper, ColumnData, Either},
Column,
Value
Column, Value,
},
Block,
};

pub trait RowBuilder {
Expand Down Expand Up @@ -89,7 +88,11 @@ fn put_param(key: Cow<'static, str>, value: Value, block: &mut Block) -> Result<

let column = Column {
name: key.clone().into(),
data: ColumnData::from_type::<ArcColumnWrapper>(sql_type, timezone, block.capacity)?,
data: ColumnData::from_type::<ArcColumnWrapper>(
sql_type,
timezone,
block.capacity,
)?,
};

block.columns.push(column);
Expand All @@ -109,17 +112,15 @@ fn extract_timezone(value: &Value) -> Tz {
match value {
Value::Date(_, tz) => *tz,
Value::DateTime(_, tz) => *tz,
Value::Nullable(Either::Right(d)) => {
extract_timezone(&&d)
}
Value::Nullable(Either::Right(d)) => extract_timezone(&&d),
Value::Array(_, data) => {
if let Some(v) = data.first() {
extract_timezone(v)
} else {
Tz::Zulu
}
}
_ => Tz::Zulu
_ => Tz::Zulu,
}
}

Expand All @@ -128,7 +129,10 @@ mod test {
use chrono::prelude::*;
use chrono_tz::Tz::{self, UTC};

use crate::{row, types::{Decimal, SqlType}};
use crate::{
row,
types::{Decimal, SqlType},
};

use super::*;

Expand All @@ -140,29 +144,31 @@ mod test {
let decimal = Decimal::of(2.0_f64, 4);

let mut block = Block::new();
block.push(row!{
i8_field: 1_i8,
i16_field: 1_i16,
i32_field: 1_i32,
i64_field: 1_i64,
block
.push(row! {
i8_field: 1_i8,
i16_field: 1_i16,
i32_field: 1_i32,
i64_field: 1_i64,

u8_field: 1_u8,
u16_field: 1_u16,
u32_field: 1_u32,
u64_field: 1_u64,
u8_field: 1_u8,
u16_field: 1_u16,
u32_field: 1_u32,
u64_field: 1_u64,

f32_field: 4.66_f32,
f64_field: 2.71_f64,
f32_field: 4.66_f32,
f64_field: 2.71_f64,

str_field: "text",
opt_filed: Some("text"),
nil_filed: Option::<&str>::None,
str_field: "text",
opt_filed: Some("text"),
nil_filed: Option::<&str>::None,

date_field: date_value,
date_time_field: date_time_value,
date_field: date_value,
date_time_field: date_time_value,

decimal_field: decimal
}).unwrap();
decimal_field: decimal
})
.unwrap();

assert_eq!(block.row_count(), 1);

Expand All @@ -180,11 +186,17 @@ mod test {
assert_eq!(block.columns[9].sql_type(), SqlType::Float64);

assert_eq!(block.columns[10].sql_type(), SqlType::String);
assert_eq!(block.columns[11].sql_type(), SqlType::Nullable(SqlType::String.into()));
assert_eq!(block.columns[12].sql_type(), SqlType::Nullable(SqlType::String.into()));
assert_eq!(
block.columns[11].sql_type(),
SqlType::Nullable(SqlType::String.into())
);
assert_eq!(
block.columns[12].sql_type(),
SqlType::Nullable(SqlType::String.into())
);

assert_eq!(block.columns[13].sql_type(), SqlType::Date);
assert_eq!(block.columns[14].sql_type(), SqlType::DateTime);
assert_eq!(block.columns[15].sql_type(), SqlType::Decimal(18, 4));
}
}
}
8 changes: 4 additions & 4 deletions src/types/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ use self::chunk_iterator::ChunkIterator;
pub(crate) use self::row::BlockRef;
pub use self::{
block_info::BlockInfo,
builder::{RCons, RNil, RowBuilder},
row::{Row, Rows},
builder::{RowBuilder, RCons, RNil},
};

mod block_info;
mod builder;
mod chunk_iterator;
mod compressed;
mod row;
mod builder;

const INSERT_BLOCK_SIZE: usize = 1_048_576;

Expand Down Expand Up @@ -116,7 +116,7 @@ impl Block {
Self {
info: Default::default(),
columns: vec![],
capacity
capacity,
}
}

Expand Down Expand Up @@ -315,7 +315,7 @@ impl Block {
Self {
info: first.info,
columns,
capacity: blocks.iter().map(|b| b.capacity).sum()
capacity: blocks.iter().map(|b| b.capacity).sum(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/types/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
binary::{Encoder, protocol},
binary::{protocol, Encoder},
client_info,
types::{Block, ClickhouseResult, Context, Query},
};
Expand Down
2 changes: 1 addition & 1 deletion src/types/column/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{
column::{list::List, BoxColumnWrapper, ColumnData, column_data::BoxColumnData},
column::{column_data::BoxColumnData, list::List, BoxColumnWrapper, ColumnData},
SqlType, Value, ValueRef,
},
};
Expand Down
5 changes: 4 additions & 1 deletion src/types/column/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{cmp, ops};

use crate::{
binary::Encoder,
types::{SqlType, Value, ValueRef, column::column_data::{ArcColumnData, BoxColumnData}},
types::{
column::column_data::{ArcColumnData, BoxColumnData},
SqlType, Value, ValueRef,
},
};

use super::ColumnData;
Expand Down
2 changes: 1 addition & 1 deletion src/types/column/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
types::{SqlType, Value, ValueRef},
};

use super::column_data::{ColumnData, ArcColumnData, BoxColumnData};
use super::column_data::{ArcColumnData, BoxColumnData, ColumnData};

pub struct ConcatColumnData {
data: Vec<ArcColumnData>,
Expand Down
17 changes: 8 additions & 9 deletions src/types/column/date.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use std::{convert, fmt, sync::Arc};

use chrono::{Date, prelude::*};
use chrono::{prelude::*, Date};
use chrono_tz::Tz;

use crate::{
binary::{Encoder, ReadEx},
errors::Error,
types::{DateConverter, Marshal, SqlType, StatBuffer, Unmarshal, Value, ValueRef},
types::column::{
array::ArrayColumnData,
BoxColumnWrapper,
column_data::{BoxColumnData, ColumnData},
ColumnFrom,
ColumnWrapper,
Either,
list::List,
nullable::NullableColumnData,
numeric::save_data
numeric::save_data,
BoxColumnWrapper, ColumnFrom, ColumnWrapper, Either,
},
types::{DateConverter, Marshal, SqlType, StatBuffer, Unmarshal, Value, ValueRef},
};

pub struct DateColumnData<T>
Expand Down Expand Up @@ -63,7 +60,9 @@ where
tz: Tz,
) -> Result<DateColumnData<T>, Error> {
let mut data = List::with_capacity(size);
unsafe { data.set_len(size); }
unsafe {
data.set_len(size);
}
reader.read_bytes(data.as_mut())?;
Ok(DateColumnData { data, tz })
}
Expand Down Expand Up @@ -227,7 +226,7 @@ where
}

fn clone_instance(&self) -> BoxColumnData {
Box::new(Self{
Box::new(Self {
data: self.data.clone(),
tz: self.tz,
})
Expand Down
11 changes: 6 additions & 5 deletions src/types/column/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use crate::{
errors::Error,
types::{
column::{
BoxColumnWrapper, column_data::BoxColumnData, ColumnFrom, ColumnWrapper, Either,
list::List, nullable::NullableColumnData, VectorColumnData, column_data::ColumnData
column_data::BoxColumnData, column_data::ColumnData, list::List,
nullable::NullableColumnData, BoxColumnWrapper, ColumnFrom, ColumnWrapper, Either,
VectorColumnData,
},
Column,
decimal::{Decimal, NoBits},
from_sql::FromSql, SqlType, Value, ValueRef,
from_sql::FromSql,
Column, SqlType, Value, ValueRef,
},
};

Expand Down Expand Up @@ -180,7 +181,7 @@ impl ColumnData for DecimalColumnData {
inner: self.inner.clone_instance(),
precision: self.precision,
scale: self.scale,
nobits: self.nobits
nobits: self.nobits,
})
}
}
Expand Down

0 comments on commit 2ad2a8b

Please sign in to comment.