Skip to content

Commit

Permalink
Merge pull request #34 from suharev7/feature/streaming
Browse files Browse the repository at this point in the history
streaming (for #32)
  • Loading branch information
suharev7 committed Apr 6, 2019
2 parents 56e8d97 + c5ddae3 commit b8156da
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 69 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.9"
version = "0.1.10"
authors = ["Mikhail Sukharev <suharev7@gmail.com>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand Down
7 changes: 5 additions & 2 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
pub(crate) use self::{io_future::{BoxFuture, BoxStream}, transport::ClickhouseTransport};
pub(crate) use self::{
io_future::{BoxFuture, BoxStream},
transport::ClickhouseTransport,
};

mod io_future;
mod transport;
pub(crate) mod transport;
2 changes: 1 addition & 1 deletion src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Stream for ClickhouseTransport {
}

impl PacketStream {
pub fn read_block(
pub(crate) fn read_block(
mut self,
context: Context,
pool: PoolBinding,
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//! * Float32, Float64
//! * String
//! * UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64
//! * Nullable(T)
//!
//! ### DNS
//!
Expand Down Expand Up @@ -426,7 +427,10 @@ impl ClientHandle {
};

if ping_before_query {
let fut = self.check_connection().and_then(move |c| future::ok(Box::new(f(c)))).flatten_stream();
let fut = self
.check_connection()
.and_then(move |c| future::ok(Box::new(f(c))))
.flatten_stream();
Box::new(fut)
} else {
Box::new(f(self))
Expand Down
11 changes: 7 additions & 4 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,13 @@ mod test {
fn test_detach() {
let pool = Pool::new(DATABASE_URL.as_str());

let done = pool.get_handle().and_then(ClientHandle::ping).and_then(|mut c| {
c.pool.detach();
Ok(())
});
let done = pool
.get_handle()
.and_then(ClientHandle::ping)
.and_then(|mut c| {
c.pool.detach();
Ok(())
});

run(done).unwrap();
assert_eq!(pool.info().idle_len, 0);
Expand Down
2 changes: 1 addition & 1 deletion src/types/column/string.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{string::ToString, io::Write};
use std::{io::Write, string::ToString};

use crate::{
binary::{Encoder, ReadEx},
Expand Down
2 changes: 1 addition & 1 deletion src/types/date_converter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{Date, prelude::*};
use chrono::{prelude::*, Date};
use chrono_tz::Tz;

use crate::types::{SqlType, Value, ValueRef};
Expand Down
1 change: 0 additions & 1 deletion src/types/query_result/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ where
}
}
}

15 changes: 10 additions & 5 deletions src/types/query_result/fold_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ where
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut state;
loop {
let mut state = mem::replace(&mut self.state, State::Empty);
state = mem::replace(&mut self.state, State::Empty);

match state {
State::Empty => unreachable!(),
Expand All @@ -64,12 +65,16 @@ where
self.state = State::Run((self.f)(acc, row).into_future());
}
},
State::Run(ref mut inner) => {
let row = try_ready!(inner.poll());
self.state = State::Ready(row);
}
State::Run(ref mut inner) => match inner.poll() {
Ok(Async::Ready(item)) => self.state = State::Ready(item),
Ok(Async::NotReady) => break,
Err(e) => return Err(e),
},
}
}

self.state = state;
Ok(Async::NotReady)
}
}

Expand Down
100 changes: 58 additions & 42 deletions src/types/query_result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ use tokio::prelude::*;
use crate::{
errors::{DriverError, Error},
io::{BoxFuture, BoxStream, ClickhouseTransport},
types::{Block, Cmd, Packet, Query, Row},
types::{
block::BlockRef, query_result::stream_blocks::BlockStream, Block, Cmd, Packet, Query, Row,
Rows,
},
ClientHandle,
};

use self::{either::Either, fold_block::FoldBlock};

mod either;
mod fold_block;
mod stream_blocks;

/// Result of a query or statement execution.
pub struct QueryResult {
Expand All @@ -35,7 +39,7 @@ impl QueryResult {
/// # let done =
/// pool.get_handle()
/// .and_then(|c| {
/// c.query("SELECT * FROM system.numbers LIMIT 10")
/// c.query("SELECT number FROM system.numbers LIMIT 10000000")
/// .fold(0, |acc, row| {
/// let number: u64 = row.get("number")?;
/// Ok(acc + number)
Expand Down Expand Up @@ -132,53 +136,65 @@ impl QueryResult {
}

/// Method that produces a stream of blocks containing rows
pub fn stream_blocks(self) -> BoxStream<Result<Block, Error>>
{
let release_pool = self.client.pool.clone();

Box::new(
self.map_packets(|packet| match packet {
Packet::Block(b) => {
if b.row_count() == 0 {
None
} else {
Some(Ok(b))
}
},
Packet::Eof(_) => None,
Packet::ProfileInfo(_) | Packet::Progress(_) => None,
Packet::Exception(exception) => {
Some(Err(Error::Server(exception)))
}
_ => Some(Err(Error::Driver(DriverError::UnexpectedPacket))),
})
.filter_map(|some_either| some_either)
.map_err(move |err| {
// hwc: not sure why I have to clone again
release_pool.clone().release_conn();
err
})
)
}

fn map_packets<F, T>(self, f: F) -> BoxStream<T>
where
F: Fn(Packet<ClickhouseTransport>) -> T + Send + 'static,
T: Send + 'static,
{
let context = self.client.context.clone();
///
/// example:
/// ```rust
/// # extern crate clickhouse_rs;
/// # extern crate futures;
/// # use futures::{Future, Stream};
/// # use clickhouse_rs::Pool;
/// # use std::env;
/// # let database_url = env::var("DATABASE_URL").unwrap_or("tcp://localhost:9000?compression=lz4".into());
/// # let pool = Pool::new(database_url);
/// # let done =
/// pool.get_handle()
/// .and_then(|c| {
/// # let sql_query = "SELECT number FROM system.numbers LIMIT 100000";
/// c.query(sql_query)
/// .stream_blocks()
/// .for_each(|block| {
/// println!("{:?}\nblock counts: {} rows", block, block.row_count());
/// # Ok(())
/// })
/// })
/// # .map(|_| ())
/// # .map_err(|err| eprintln!("database error: {}", err));
/// # tokio::run(done)
/// ```
pub fn stream_blocks(self) -> BoxStream<Block> {
let query = self.query;

self.client.wrap_stream(move |mut c| {
info!("[send query] {}", query.get_sql());

c.pool.detach();
c.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone()))
.map(f)

let context = c.context.clone();
let pool = c.pool.clone();

BlockStream::new(
c.inner
.take()
.unwrap()
.call(Cmd::SendQuery(query, context.clone())),
context,
pool,
)
})
}

/// Method that produces a stream of rows
pub fn stream_rows(self) -> BoxStream<Row<'static>> {
Box::new(
self.stream_blocks()
.map(Arc::new)
.map(|block| {
let block_ref = BlockRef::Owned(block);
stream::iter_ok(Rows { row: 0, block_ref })
})
.flatten(),
)
}
}

fn wrap_future<T, F>(future: F) -> BoxFuture<T>
Expand Down
74 changes: 74 additions & 0 deletions src/types/query_result/stream_blocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use futures::{Async, Poll, Stream};

use crate::{
errors::{DriverError, Error},
io::transport::PacketStream,
pool::PoolBinding,
types::{Block, Context, Packet},
ClientHandle,
};

pub(crate) struct BlockStream {
inner: PacketStream,
rest: Option<(Context, PoolBinding)>,
eof: bool,
block_index: usize,
}

impl BlockStream {
pub(crate) fn new(inner: PacketStream, context: Context, pool: PoolBinding) -> BlockStream {
BlockStream {
inner,
rest: Some((context, pool)),
eof: false,
block_index: 0,
}
}
}

impl Stream for BlockStream {
type Item = Block;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.eof {
return Ok(Async::Ready(None));
}

let packet = match self.inner.poll() {
Err(err) => return Err(err),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => {
self.eof = true;
continue;
}
Ok(Async::Ready(Some(packet))) => packet,
};

match packet {
Packet::Eof(inner) => {
let (context, pool) = self.rest.take().unwrap();
let mut client = ClientHandle {
inner: Some(inner),
context,
pool,
};
if !client.pool.is_attached() {
client.pool.attach();
}
self.eof = true;
}
Packet::ProfileInfo(_) | Packet::Progress(_) => {}
Packet::Exception(exception) => return Err(Error::Server(exception)),
Packet::Block(block) => {
self.block_index += 1;
if self.block_index > 1 && !block.is_empty() {
return Ok(Async::Ready(Some(block)));
}
}
_ => return Err(Error::Driver(DriverError::UnexpectedPacket)),
}
}
}
}
32 changes: 22 additions & 10 deletions tests/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use chrono::prelude::*;
use chrono_tz::Tz::{self, UTC};
use tokio::prelude::*;

use clickhouse_rs::{errors::Error, types::Block, Pool, ClientHandle};
use clickhouse_rs::{errors::Error, types::Block, ClientHandle, Pool};
use std::f64::EPSILON;

pub type BoxFuture<T> = Box<Future<Item = T, Error = Error> + Send>;
type BoxFuture<T> = Box<Future<Item = T, Error = Error> + Send>;

fn database_url() -> String {
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into())
Expand Down Expand Up @@ -318,6 +318,22 @@ fn test_with_totals() {
run(done).unwrap();
}

#[test]
fn test_stream_rows() {
let pool = Pool::new(database_url());

let done = pool.get_handle().and_then(|c| {
c.query("SELECT number FROM system.numbers LIMIT 10")
.stream_rows()
.fold(0_u64, |acc, row| -> Result<u64, Error> {
let number: u64 = row.get("number")?;
Ok(acc + number)
})
});

assert_eq!(45, run(done).unwrap());
}

#[test]
fn test_concurrent_queries() {
fn query_sum(n: u64) -> BoxFuture<u64> {
Expand All @@ -327,9 +343,8 @@ fn test_concurrent_queries() {
Box::new(
pool.get_handle()
.and_then(move |c| {
c.query(sql.as_str()).fold(0_u64, |acc, row| {
Ok(acc + row.get::<u64, _>("number")?)
})
c.query(sql.as_str())
.fold(0_u64, |acc, row| Ok(acc + row.get::<u64, _>("number")?))
})
.map(|(_, value)| value),
)
Expand Down Expand Up @@ -368,9 +383,7 @@ fn test_big_block() {
let done = pool
.get_handle()
.and_then(move |c| c.query(sql).fetch_all())
.and_then(move |(_, block)| {
Ok(block.row_count())
});
.and_then(move |(_, block)| Ok(block.row_count()));

let actual = run(done).unwrap();
assert_eq!(actual, 20000)
Expand Down Expand Up @@ -438,7 +451,6 @@ fn test_nullable() {
.and_then(move |c| c.insert("clickhouse_test_nullable", block))
.and_then(move |c| c.query(query).fetch_all())
.and_then(move |(_, block)| {

let int8: Option<i8> = block.get(0, "int8")?;
let int16: Option<i16> = block.get(0, "int16")?;
let int32: Option<i32> = block.get(0, "int32")?;
Expand Down Expand Up @@ -471,4 +483,4 @@ fn test_nullable() {
});

run(done).unwrap();
}
}

0 comments on commit b8156da

Please sign in to comment.