Skip to content

Commit

Permalink
Merge 3847794 into 97ebc94
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Oct 9, 2019
2 parents 97ebc94 + 3847794 commit c7b5365
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 71 deletions.
50 changes: 47 additions & 3 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ use std::{
collections::VecDeque,
io::{self, Cursor},
ptr,
sync::{
self,
atomic::{AtomicBool, Ordering},
Arc,
},
};

use chrono_tz::Tz;
Expand All @@ -12,9 +17,9 @@ use crate::{
binary::Parser,
errors::{DriverError, Error},
io::BoxFuture,
pool::PoolBinding,
pool::{Inner, PoolBinding},
types::{Block, Cmd, Context, Packet},
ClientHandle,
ClientHandle, Pool,
};

/// Line transport
Expand All @@ -34,6 +39,7 @@ pub(crate) struct ClickhouseTransport {
// Server time zone
timezone: Option<Tz>,
compress: bool,
status: Arc<TransportStatus>,
}

enum PacketStreamState {
Expand All @@ -43,14 +49,19 @@ enum PacketStreamState {
Done,
}

pub(crate) struct TransportStatus {
inside: AtomicBool,
pool: sync::Weak<sync::Mutex<Inner>>,
}

pub(crate) struct PacketStream {
inner: Option<ClickhouseTransport>,
state: PacketStreamState,
read_block: bool,
}

impl ClickhouseTransport {
pub fn new(inner: TcpStream, compress: bool) -> Self {
pub fn new(inner: TcpStream, compress: bool, pool: Option<Pool>) -> Self {
ClickhouseTransport {
inner,
done: false,
Expand All @@ -60,6 +71,39 @@ impl ClickhouseTransport {
cmds: VecDeque::new(),
timezone: None,
compress,
status: Arc::new(TransportStatus::new(pool)),
}
}

pub(crate) fn set_inside(&self, value: bool) {
self.status.inside.store(value, Ordering::Relaxed);
}
}

impl Drop for TransportStatus {
fn drop(&mut self) {
let inside = self.inside.load(Ordering::Relaxed);

if inside {
return;
}

if let Some(pool_inner) = self.pool.upgrade() {
Inner::release_conn(&pool_inner);
}
}
}

impl TransportStatus {
fn new(pool: Option<Pool>) -> TransportStatus {
let pool = match pool {
None => sync::Weak::new(),
Some(p) => Arc::downgrade(&p.inner),
};

TransportStatus {
inside: AtomicBool::new(true),
pool,
}
}
}
Expand Down
44 changes: 21 additions & 23 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ impl fmt::Debug for ClientHandle {
impl Client {
#[deprecated(since = "0.1.4", note = "please use Pool to connect")]
pub fn connect(options: Options) -> BoxFuture<ClientHandle> {
Self::open(&options.into_options_src())
Self::open(&options.into_options_src(), None)
}

pub(crate) fn open(source: &OptionsSource) -> BoxFuture<ClientHandle> {
pub(crate) fn open(source: &OptionsSource, pool: Option<Pool>) -> BoxFuture<ClientHandle> {
let options = try_opt!(source.get()).as_ref().to_owned();
let compress = options.compression;
let timeout = options.connection_timeout;
Expand All @@ -258,7 +258,7 @@ impl Client {
stream.set_nodelay(options.nodelay)?;
stream.set_keepalive(options.keepalive)?;

let transport = ClickhouseTransport::new(stream, compress);
let transport = ClickhouseTransport::new(stream, compress, pool);
Ok(ClientHandle {
inner: Some(transport),
context,
Expand Down Expand Up @@ -365,14 +365,14 @@ impl ClientHandle {
{
let context = self.context.clone();
let pool = self.pool.clone();
let release_pool = self.pool.clone();

let query = Query::from(sql);
self.wrap_future(|mut c| -> BoxFuture<Self> {
info!("[execute] {}", query.get_sql());
let timeout = try_opt!(context.options.get()).execute_timeout;

let future = c.inner
let future = c
.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
Expand All @@ -395,7 +395,7 @@ impl ClientHandle {
})
.map(Option::unwrap);

with_timeout(future, timeout, release_pool)
with_timeout(future, timeout)
})
}

Expand All @@ -417,13 +417,13 @@ impl ClientHandle {

let context = self.context.clone();
let pool = self.pool.clone();
let release_pool = self.pool.clone();

self.wrap_future(|mut c| -> BoxFuture<Self> {
info!("[insert] {}", query.get_sql());
let timeout = try_opt!(context.options.get()).insert_timeout;

let future = c.inner
let future = c
.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
Expand Down Expand Up @@ -451,7 +451,7 @@ impl ClientHandle {
)
});

with_timeout(future, timeout, release_pool)
with_timeout(future, timeout)
})
}

Expand Down Expand Up @@ -507,7 +507,7 @@ impl ClientHandle {
let reconnect = move || -> BoxFuture<Self> {
warn!("[reconnect]");
match pool.clone() {
None => Client::open(&source),
None => Client::open(&source, None),
Some(p) => Box::new(p.get_handle()),
}
};
Expand All @@ -523,26 +523,24 @@ impl ClientHandle {
),
)
}

pub(crate) fn set_inside(&self, value: bool) {
if let Some(ref inner) = self.inner {
inner.set_inside(value);
} else {
unreachable!()
}
}
}

pub(crate) fn with_timeout<F>(
f: F,
timeout: Option<Duration>,
release_pool: PoolBinding,
) -> BoxFuture<F::Item>
pub(crate) fn with_timeout<F>(f: F, timeout: Option<Duration>) -> BoxFuture<F::Item>
where
F: Future<Error = Error> + Send + 'static,
{
if let Some(timeout) = timeout {
Box::new(f.timeout(timeout).map_err(move |err| {
release_pool.release_conn();
err.into()
}))
Box::new(f.timeout(timeout).map_err(|err| err.into()))
} else {
Box::new(f.map_err(move |err| {
release_pool.release_conn();
err
}))
Box::new(f)
}
}

Expand Down
39 changes: 17 additions & 22 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ use crate::{

mod futures;

struct Inner {
pub(crate) struct Inner {
new: Option<BoxFuture<ClientHandle>>,
idle: Vec<ClientHandle>,
tasks: Vec<Task>,
ongoing: usize,
}

impl Inner {
pub(crate) fn release_conn(inner: &Mutex<Inner>) {
let mut guard = inner.lock().unwrap();
guard.ongoing -= 1;
while let Some(task) = guard.tasks.pop() {
task.notify()
}
}

fn conn_count(&self) -> usize {
self.new.is_some() as usize + self.idle.len() + self.ongoing
}
Expand Down Expand Up @@ -54,12 +62,6 @@ impl PoolBinding {
}
}

pub(crate) fn release_conn(self) {
if let Some(mut pool) = self.into() {
Pool::release_conn(&mut pool);
}
}

pub(crate) fn is_attached(&self) -> bool {
match self {
PoolBinding::Attached(_) => true,
Expand Down Expand Up @@ -93,7 +95,7 @@ impl PoolBinding {
#[derive(Clone)]
pub struct Pool {
options: OptionsSource,
inner: Arc<Mutex<Inner>>,
pub(crate) inner: Arc<Mutex<Inner>>,
min: usize,
max: usize,
}
Expand Down Expand Up @@ -201,7 +203,7 @@ impl Pool {
}

fn new_connection(&self) -> BoxFuture<ClientHandle> {
Client::open(&self.options)
Client::open(&self.options, Some(self.clone()))
}

fn handle_futures(&mut self) -> ClickhouseResult<()> {
Expand Down Expand Up @@ -231,6 +233,7 @@ impl Pool {
self.with_inner(|mut inner| {
if let Some(mut client) = inner.idle.pop() {
client.pool = PoolBinding::Attached(self.clone());
client.set_inside(false);
inner.ongoing += 1;
Some(client)
} else {
Expand All @@ -243,21 +246,13 @@ impl Pool {
let min = self.min;

self.with_inner(|mut inner| {
inner.ongoing -= 1;
if inner.idle.len() < min && client.pool.is_attached() {
inner.idle.push(client);
} else {
client.pool = PoolBinding::None;
}
let is_attached = client.pool.is_attached();
client.pool = PoolBinding::None;
client.set_inside(true);

while let Some(task) = inner.tasks.pop() {
task.notify()
if inner.idle.len() < min && is_attached {
inner.idle.push(client);
}
})
}

pub(crate) fn release_conn(&mut self) {
self.with_inner(|mut inner| {
inner.ongoing -= 1;

while let Some(task) = inner.tasks.pop() {
Expand Down
1 change: 1 addition & 0 deletions src/retry_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state.poll() {
RetryPoll::Check(Err(err)) => {
warn!("[check] {}", err);
if self.attempt >= self.max_attempt {
return Err(err);
}
Expand Down
26 changes: 4 additions & 22 deletions src/types/query_result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl QueryResult {
let timeout = try_opt!(self.client.context.options.get()).query_timeout;
let context = self.client.context.clone();
let pool = self.client.pool.clone();
let release_pool = self.client.pool.clone();

let acc = (None, init);

Expand All @@ -130,16 +129,10 @@ impl QueryResult {
future
.map(|(c, t)| (c.unwrap(), t))
.timeout(timeout)
.map_err(move |err| {
release_pool.release_conn();
err.into()
}),
.map_err(move |err| err.into()),
)
} else {
Box::new(future.map(|(c, t)| (c.unwrap(), t)).map_err(move |err| {
release_pool.release_conn();
err
}))
Box::new(future.map(|(c, t)| (c.unwrap(), t)))
}
}

Expand Down Expand Up @@ -201,7 +194,6 @@ impl QueryResult {

let context = c.context.clone();
let pool = c.pool.clone();
let mut release_pool = Some(c.pool.clone());

let stream = BlockStream::new(
c.inner
Expand All @@ -213,19 +205,9 @@ impl QueryResult {
);

if let Some(timeout) = timeout {
Box::new(stream.timeout(timeout).map_err(move |err| {
if let Some(pool) = release_pool.take() {
pool.release_conn();
}
err.into()
}))
Box::new(stream.timeout(timeout).map_err(|err| err.into()))
} else {
Box::new(stream.map_err(move |err| {
if let Some(pool) = release_pool.take() {
pool.release_conn();
}
err
}))
Box::new(stream)
}
})
}
Expand Down
Loading

0 comments on commit c7b5365

Please sign in to comment.