Skip to content

Commit

Permalink
Merge e16d98e into b52011d
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Oct 6, 2019
2 parents b52011d + e16d98e commit ddb9578
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ parameters:

- `query_timeout` - Timeout for queries (defaults to `180 sec`).
- `query_block_timeout` - Timeout for each block in a query (defaults to `180 sec`).
- `insert_timeout` - Timeout for inserts (defaults to `180 sec`).

example:
```url
Expand Down
35 changes: 28 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
//!
//! - `query_timeout` - Timeout for queries (defaults to `180 sec`).
//! - `query_block_timeout` - Timeout for each block in a query (defaults to `180 sec`).
//! - `insert_timeout` - Timeout for inserts (defaults to `180 sec`).
//!
//! example:
//! ```url
Expand Down Expand Up @@ -134,6 +135,7 @@ use crate::{
retry_guard::RetryGuard,
types::{Block, Cmd, Context, IntoOptions, Options, OptionsSource, Packet, Query, QueryResult},
};
use failure::_core::time::Duration;

mod binary;
mod client_info;
Expand Down Expand Up @@ -417,9 +419,11 @@ impl ClientHandle {
let pool = self.pool.clone();
let release_pool = self.pool.clone();

self.wrap_future(|mut c| {
self.wrap_future(|mut c| -> BoxFuture<Self> {
info!("[insert] {}", query.get_sql());
c.inner
let timeout = try_opt!(context.options.get()).insert_timeout;

let future = c.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
Expand All @@ -445,11 +449,9 @@ impl ClientHandle {
.read_block(context, pool)
.map(|(c, _)| c),
)
})
.map_err(move |err| {
release_pool.release_conn();
err
})
});

with_timeout(future, timeout, release_pool)
})
}

Expand Down Expand Up @@ -523,6 +525,25 @@ impl ClientHandle {
}
}

fn with_timeout<F>(f: F, timeout: Option<Duration>, release_pool: PoolBinding) -> BoxFuture<F::Item>
where
F: Future<Error = Error> + Send + 'static,
{
if let Some(timeout) = timeout {
Box::new(f
.timeout(timeout)
.map_err(move |err| {
release_pool.release_conn();
err.into()
}))
} else {
Box::new(f.map_err(move |err| {
release_pool.release_conn();
err
}))
}
}

#[cfg(test)]
mod test_misc {
use std::env;
Expand Down
26 changes: 23 additions & 3 deletions src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ pub struct Options {

/// Timeout for each block in a query (defaults to `180 sec`)
pub(crate) query_block_timeout: Duration,

/// Timeout for inserts (defaults to `180 sec`)
pub(crate) insert_timeout: Option<Duration>,
}

impl Default for Options {
Expand All @@ -195,6 +198,7 @@ impl Default for Options {
connection_timeout: Duration::from_millis(500),
query_timeout: Duration::from_secs(180),
query_block_timeout: Duration::from_secs(180),
insert_timeout: Some(Duration::from_secs(180)),
}
}
}
Expand Down Expand Up @@ -308,6 +312,11 @@ impl Options {
/// Timeout for each block in a query (defaults to `180,000 ms`).
=> query_block_timeout: Duration
}

property! {
/// Timeout for insert (defaults to `180,000 ms`).
=> insert_timeout: Option<Duration>
}
}

impl FromStr for Options {
Expand Down Expand Up @@ -383,6 +392,9 @@ where
"query_block_timeout" => {
options.query_block_timeout = parse_param(key, value, parse_duration)?
},
"insert_timeout" => {
options.insert_timeout = parse_param(key, value, parse_opt_duration)?
}
"compression" => options.compression = parse_param(key, value, parse_compression)?,
_ => return Err(UrlError::UnknownParameter { param: key.into() }),
};
Expand Down Expand Up @@ -460,6 +472,10 @@ fn parse_duration(source: &str) -> std::result::Result<Duration, ()> {
}

fn parse_opt_duration(source: &str) -> std::result::Result<Option<Duration>, ()> {
if source == "none" {
return Ok(None);
}

let duration = parse_duration(source)?;
Ok(Some(duration))
}
Expand All @@ -474,9 +490,7 @@ fn parse_compression(source: &str) -> std::result::Result<bool, ()> {

#[cfg(test)]
mod test {
use std::time::Duration;

use super::{from_url, parse_compression, parse_duration, Options};
use super::*;

#[test]
fn test_parse_default() {
Expand Down Expand Up @@ -536,6 +550,12 @@ mod test {
assert_eq!(parse_duration("1ss").unwrap_err(), ());
}

#[test]
fn test_parse_opt_duration() {
assert_eq!(parse_opt_duration("3s").unwrap(), Some(Duration::from_secs(3)));
assert_eq!(parse_opt_duration("none").unwrap(), None::<Duration>);
}

#[test]
fn test_parse_compression() {
assert_eq!(parse_compression("none").unwrap(), false);
Expand Down

0 comments on commit ddb9578

Please sign in to comment.