Skip to content

Commit

Permalink
getting an iterator of the column
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Oct 10, 2019
1 parent 49e568a commit 8f0d7cf
Show file tree
Hide file tree
Showing 40 changed files with 1,298 additions and 327 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.16"
version = "0.1.17"
authors = ["Mikhail Sukharev <suharev7@gmail.com>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ pub fn main() {
) Engine=Memory";

let block = Block::new()
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
.column("customer_id", vec![1_u32, 3, 5, 7, 9])
.column("amount", vec![2_u32, 4, 6, 8, 10])
.column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);

let pool = Pool::new(database_url);

Expand Down
8 changes: 4 additions & 4 deletions examples/array.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate clickhouse_rs;
extern crate futures;

use clickhouse_rs::{types::Block, Pool};
use clickhouse_rs::{types::{Block, Complex}, Pool};
use futures::Future;
use std::env;

Expand All @@ -21,16 +21,16 @@ fn main() {
let query = "SELECT nums, text FROM array_table";

let block = Block::new()
.add_column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]])
.add_column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]);
.column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]])
.column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]);

let done = pool
.get_handle()
.and_then(move |c| c.execute("DROP TABLE IF EXISTS array_table"))
.and_then(move |c| c.execute(ddl))
.and_then(move |c| c.insert("array_table", block))
.and_then(move |c| c.query(query).fetch_all())
.and_then(move |(_, block): (_, Block)| {
.and_then(move |(_, block): (_, Block<Complex>)| {
for row in block.rows() {
let nums: Vec<u32> = row.get("nums")?;
let text: Vec<&str> = row.get("text")?;
Expand Down
8 changes: 4 additions & 4 deletions examples/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ fn main() {
) Engine=Memory";

let block = Block::new()
.add_column("text", vec![[0, 159, 146, 150].as_ref(), b"ABCD"])
.add_column("fx_text", vec![b"ABCD".as_ref(), &[0, 159, 146, 150]])
.add_column("opt_text", vec![Some(vec![0, 159, 146, 150]), None])
.add_column("fx_opt_text", vec![None, Some(vec![0, 159, 146, 150])]);
.column("text", vec![[0, 159, 146, 150].as_ref(), b"ABCD"])
.column("fx_text", vec![b"ABCD".as_ref(), &[0, 159, 146, 150]])
.column("opt_text", vec![Some(vec![0, 159, 146, 150]), None])
.column("fx_opt_text", vec![None, Some(vec![0, 159, 146, 150])]);

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
Expand Down
22 changes: 11 additions & 11 deletions src/binary/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use chrono_tz::Tz;

use crate::{
binary::{protocol, ReadEx},
errors::{DriverError, Error, ServerError},
types::{Block, ClickhouseResult, Packet, ProfileInfo, Progress, ServerInfo},
errors::{DriverError, Error, ServerError, Result},
types::{Block, Packet, ProfileInfo, Progress, ServerInfo},
};

/// The internal clickhouse response parser.
Expand All @@ -32,10 +32,10 @@ impl<'a, T: Read> Parser<T> {
}
}

/// parses a single value out of the stream. If there are multiple
/// values you can call this multiple times. If the reader is not yet
/// Parses a single value out of the stream. If there are multiple
/// values you can call this multiple times. If the reader is not yet
/// ready this will block.
pub(crate) fn parse_packet(&mut self) -> ClickhouseResult<Packet<()>> {
pub(crate) fn parse_packet(&mut self) -> Result<Packet<()>> {
let packet = self.reader.read_uvarint()?;
match packet {
protocol::SERVER_HELLO => Ok(self.parse_server_info()?),
Expand All @@ -51,7 +51,7 @@ impl<'a, T: Read> Parser<T> {
}
}

fn parse_block(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_block(&mut self) -> Result<Packet<()>> {
match self.tz {
None => Err(Error::Driver(DriverError::UnexpectedPacket)),
Some(tz) => {
Expand All @@ -62,7 +62,7 @@ impl<'a, T: Read> Parser<T> {
}
}

fn parse_server_info(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_server_info(&mut self) -> Result<Packet<()>> {
let server_info = ServerInfo {
name: self.reader.read_string()?,
major_version: self.reader.read_uvarint()?,
Expand All @@ -78,7 +78,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Hello((), server_info))
}

fn parse_progress(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_progress(&mut self) -> Result<Packet<()>> {
let progress = Progress {
rows: self.reader.read_uvarint()?,
bytes: self.reader.read_uvarint()?,
Expand All @@ -95,7 +95,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Progress(progress))
}

fn parse_profile_info(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_profile_info(&mut self) -> Result<Packet<()>> {
let info = Packet::ProfileInfo(ProfileInfo {
rows: self.reader.read_uvarint()?,
blocks: self.reader.read_uvarint()?,
Expand All @@ -109,7 +109,7 @@ impl<'a, T: Read> Parser<T> {
Ok(info)
}

fn parse_exception(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_exception(&mut self) -> Result<Packet<()>> {
let exception = ServerError {
code: self.reader.read_scalar()?,
name: self.reader.read_string()?,
Expand All @@ -121,7 +121,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Exception(exception))
}

fn parse_pong(&self) -> ClickhouseResult<Packet<()>> {
fn parse_pong(&self) -> Result<Packet<()>> {
trace!("[process] <- pong");
Ok(Packet::Pong(()))
}
Expand Down
28 changes: 14 additions & 14 deletions src/binary/read_ex.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::{io, mem::MaybeUninit};

use crate::{
errors::{DriverError, Error},
types::{column::StringPool, ClickhouseResult, StatBuffer, Unmarshal},
errors::{DriverError, Error, Result},
types::{column::StringPool, StatBuffer, Unmarshal},
};

pub(crate) trait ReadEx {
fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()>;
fn read_scalar<V>(&mut self) -> ClickhouseResult<V>
fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()>;
fn read_scalar<V>(&mut self) -> Result<V>
where
V: Copy + Unmarshal<V> + StatBuffer;
fn read_string(&mut self) -> ClickhouseResult<String>;
fn skip_string(&mut self) -> ClickhouseResult<()>;
fn read_uvarint(&mut self) -> ClickhouseResult<u64>;
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> ClickhouseResult<()>;
fn read_string(&mut self) -> Result<String>;
fn skip_string(&mut self) -> Result<()>;
fn read_uvarint(&mut self) -> Result<u64>;
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()>;
}

const MAX_STACK_BUFFER_LEN: usize = 1024;
Expand All @@ -22,7 +22,7 @@ impl<T> ReadEx for T
where
T: io::Read,
{
fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()> {
fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()> {
let mut i = 0;
while i < rv.len() {
let res_nread = {
Expand All @@ -41,7 +41,7 @@ where
Ok(())
}

fn read_scalar<V>(&mut self) -> ClickhouseResult<V>
fn read_scalar<V>(&mut self) -> Result<V>
where
V: Copy + Unmarshal<V> + StatBuffer,
{
Expand All @@ -50,14 +50,14 @@ where
Ok(V::unmarshal(buffer.as_ref()))
}

fn read_string(&mut self) -> ClickhouseResult<String> {
fn read_string(&mut self) -> Result<String> {
let str_len = self.read_uvarint()? as usize;
let mut buffer = vec![0_u8; str_len];
self.read_bytes(buffer.as_mut())?;
Ok(String::from_utf8(buffer)?)
}

fn skip_string(&mut self) -> ClickhouseResult<()> {
fn skip_string(&mut self) -> Result<()> {
let str_len = self.read_uvarint()? as usize;

if str_len <= MAX_STACK_BUFFER_LEN {
Expand All @@ -76,7 +76,7 @@ where
Ok(())
}

fn read_uvarint(&mut self) -> ClickhouseResult<u64> {
fn read_uvarint(&mut self) -> Result<u64> {
let mut x = 0_u64;
let mut s = 0_u32;
let mut i = 0_usize;
Expand All @@ -97,7 +97,7 @@ where
}
}

fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> ClickhouseResult<()> {
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()> {
let str_len = self.read_uvarint()? as usize;
let buffer = pool.allocate(str_len);
self.read_bytes(buffer)?;
Expand Down
8 changes: 7 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, io, mem, str::Utf8Error, string::FromUtf8Error};
use std::{borrow::Cow, io, mem, str::Utf8Error, string::FromUtf8Error, result};

use failure::*;
use tokio::prelude::*;
Expand All @@ -8,6 +8,9 @@ use url::ParseError;

use crate::types::Packet;

/// Result type alias for this library.
pub type Result<T> = result::Result<T, Error>;

/// This type enumerates library errors.
#[derive(Debug, Fail)]
pub enum Error {
Expand Down Expand Up @@ -92,6 +95,9 @@ pub enum FromSqlError {

#[fail(display = "Out of range.")]
OutOfRange,

#[fail(display = "Unsupported operation.")]
UnsupportedOperation,
}

impl From<DriverError> for Error {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(crate) use self::{
io_future::{BoxFuture, BoxStream},
box_future::{BoxFuture, BoxStream},
transport::ClickhouseTransport,
};

mod io_future;
mod box_future;
pub(crate) mod transport;
Empty file removed src/io/read_to_end.rs
Empty file.
6 changes: 1 addition & 5 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,7 @@ impl Stream for ClickhouseTransport {
// Try to parse the new data!
let ret = self.try_parse_msg();

self.buf_is_incomplete = if let Ok(Async::NotReady) = ret {
true
} else {
false
};
self.buf_is_incomplete = if let Ok(Async::NotReady) = ret { true } else { false };

ret
}
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@
//! ) Engine=Memory";
//!
//! let block = Block::new()
//! .add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
//! .add_column("amount", vec![2_u32, 4, 6, 8, 10])
//! .add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
//! .column("customer_id", vec![1_u32, 3, 5, 7, 9])
//! .column("amount", vec![2_u32, 4, 6, 8, 10])
//! .column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
//!
//! # let database_url = env::var("DATABASE_URL").unwrap_or("tcp://localhost:9000?compression=lz4".into());
//! let pool = Pool::new(database_url);
Expand Down Expand Up @@ -137,6 +137,7 @@ use crate::{
types::{Block, Cmd, Context, IntoOptions, Options, OptionsSource, Packet, Query, QueryResult},
};
use failure::_core::time::Duration;
use crate::types::Complex;

mod binary;
mod client_info;
Expand Down Expand Up @@ -351,7 +352,7 @@ impl ClientHandle {

/// Fetch data from table. It returns a block that contains all rows.
#[deprecated(since = "0.1.7", note = "please use query(sql).fetch_all() instead")]
pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block)>
pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block<Complex>)>
where
Query: From<Q>,
{
Expand Down
7 changes: 4 additions & 3 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tokio::prelude::{*, task::{self, Task}};
use crate::{
io::BoxFuture,
pool::futures::GetHandle,
types::{ClickhouseResult, IntoOptions, OptionsSource},
errors::Result,
types::{IntoOptions, OptionsSource},
Client, ClientHandle,
};

Expand Down Expand Up @@ -178,7 +179,7 @@ impl Pool {
fun(self.inner.lock().unwrap())
}

fn poll(&mut self) -> ClickhouseResult<Async<ClientHandle>> {
fn poll(&mut self) -> Result<Async<ClientHandle>> {
self.handle_futures()?;

match self.take_conn() {
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Pool {
Client::open(&self.options, Some(self.clone()))
}

fn handle_futures(&mut self) -> ClickhouseResult<()> {
fn handle_futures(&mut self) -> Result<()> {
self.with_inner(|mut inner| {
let result = match inner.new {
None => return Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions src/types/block/block_info.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
errors::Result,
binary::{Encoder, ReadEx},
types::ClickhouseResult,
};

#[allow(dead_code)]
Expand All @@ -26,7 +26,7 @@ impl Default for BlockInfo {
}

impl BlockInfo {
pub(crate) fn read<R: ReadEx>(reader: &mut R) -> ClickhouseResult<Self> {
pub(crate) fn read<R: ReadEx>(reader: &mut R) -> Result<Self> {
let block_info = Self {
num1: reader.read_uvarint()?,
is_overflows: reader.read_scalar()?,
Expand Down
Loading

0 comments on commit 8f0d7cf

Please sign in to comment.