Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

getting an iterator of the column #60

Merged
merged 1 commit into from
Oct 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "clickhouse-rs"
version = "0.1.16"
version = "0.1.17"
authors = ["Mikhail Sukharev <suharev7@gmail.com>"]
license = "MIT"
homepage = "https://github.com/suharev7/clickhouse-rs"
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ pub fn main() {
) Engine=Memory";

let block = Block::new()
.add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
.add_column("amount", vec![2_u32, 4, 6, 8, 10])
.add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
.column("customer_id", vec![1_u32, 3, 5, 7, 9])
.column("amount", vec![2_u32, 4, 6, 8, 10])
.column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);

let pool = Pool::new(database_url);

Expand Down
8 changes: 4 additions & 4 deletions examples/array.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
extern crate clickhouse_rs;
extern crate futures;

use clickhouse_rs::{types::Block, Pool};
use clickhouse_rs::{types::{Block, Complex}, Pool};
use futures::Future;
use std::env;

Expand All @@ -21,16 +21,16 @@ fn main() {
let query = "SELECT nums, text FROM array_table";

let block = Block::new()
.add_column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]])
.add_column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]);
.column("nums", vec![vec![1_u32, 2, 3], vec![4, 5, 6]])
.column("text", vec![vec!["A", "B", "C"], vec!["D", "E"]]);

let done = pool
.get_handle()
.and_then(move |c| c.execute("DROP TABLE IF EXISTS array_table"))
.and_then(move |c| c.execute(ddl))
.and_then(move |c| c.insert("array_table", block))
.and_then(move |c| c.query(query).fetch_all())
.and_then(move |(_, block): (_, Block)| {
.and_then(move |(_, block): (_, Block<Complex>)| {
for row in block.rows() {
let nums: Vec<u32> = row.get("nums")?;
let text: Vec<&str> = row.get("text")?;
Expand Down
8 changes: 4 additions & 4 deletions examples/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ fn main() {
) Engine=Memory";

let block = Block::new()
.add_column("text", vec![[0, 159, 146, 150].as_ref(), b"ABCD"])
.add_column("fx_text", vec![b"ABCD".as_ref(), &[0, 159, 146, 150]])
.add_column("opt_text", vec![Some(vec![0, 159, 146, 150]), None])
.add_column("fx_opt_text", vec![None, Some(vec![0, 159, 146, 150])]);
.column("text", vec![[0, 159, 146, 150].as_ref(), b"ABCD"])
.column("fx_text", vec![b"ABCD".as_ref(), &[0, 159, 146, 150]])
.column("opt_text", vec![Some(vec![0, 159, 146, 150]), None])
.column("fx_opt_text", vec![None, Some(vec![0, 159, 146, 150])]);

let database_url =
env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
Expand Down
22 changes: 11 additions & 11 deletions src/binary/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use chrono_tz::Tz;

use crate::{
binary::{protocol, ReadEx},
errors::{DriverError, Error, ServerError},
types::{Block, ClickhouseResult, Packet, ProfileInfo, Progress, ServerInfo},
errors::{DriverError, Error, ServerError, Result},
types::{Block, Packet, ProfileInfo, Progress, ServerInfo},
};

/// The internal clickhouse response parser.
Expand All @@ -32,10 +32,10 @@ impl<'a, T: Read> Parser<T> {
}
}

/// parses a single value out of the stream. If there are multiple
/// values you can call this multiple times. If the reader is not yet
/// Parses a single value out of the stream. If there are multiple
/// values you can call this multiple times. If the reader is not yet
/// ready this will block.
pub(crate) fn parse_packet(&mut self) -> ClickhouseResult<Packet<()>> {
pub(crate) fn parse_packet(&mut self) -> Result<Packet<()>> {
let packet = self.reader.read_uvarint()?;
match packet {
protocol::SERVER_HELLO => Ok(self.parse_server_info()?),
Expand All @@ -51,7 +51,7 @@ impl<'a, T: Read> Parser<T> {
}
}

fn parse_block(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_block(&mut self) -> Result<Packet<()>> {
match self.tz {
None => Err(Error::Driver(DriverError::UnexpectedPacket)),
Some(tz) => {
Expand All @@ -62,7 +62,7 @@ impl<'a, T: Read> Parser<T> {
}
}

fn parse_server_info(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_server_info(&mut self) -> Result<Packet<()>> {
let server_info = ServerInfo {
name: self.reader.read_string()?,
major_version: self.reader.read_uvarint()?,
Expand All @@ -78,7 +78,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Hello((), server_info))
}

fn parse_progress(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_progress(&mut self) -> Result<Packet<()>> {
let progress = Progress {
rows: self.reader.read_uvarint()?,
bytes: self.reader.read_uvarint()?,
Expand All @@ -95,7 +95,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Progress(progress))
}

fn parse_profile_info(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_profile_info(&mut self) -> Result<Packet<()>> {
let info = Packet::ProfileInfo(ProfileInfo {
rows: self.reader.read_uvarint()?,
blocks: self.reader.read_uvarint()?,
Expand All @@ -109,7 +109,7 @@ impl<'a, T: Read> Parser<T> {
Ok(info)
}

fn parse_exception(&mut self) -> ClickhouseResult<Packet<()>> {
fn parse_exception(&mut self) -> Result<Packet<()>> {
let exception = ServerError {
code: self.reader.read_scalar()?,
name: self.reader.read_string()?,
Expand All @@ -121,7 +121,7 @@ impl<'a, T: Read> Parser<T> {
Ok(Packet::Exception(exception))
}

fn parse_pong(&self) -> ClickhouseResult<Packet<()>> {
fn parse_pong(&self) -> Result<Packet<()>> {
trace!("[process] <- pong");
Ok(Packet::Pong(()))
}
Expand Down
28 changes: 14 additions & 14 deletions src/binary/read_ex.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::{io, mem::MaybeUninit};

use crate::{
errors::{DriverError, Error},
types::{column::StringPool, ClickhouseResult, StatBuffer, Unmarshal},
errors::{DriverError, Error, Result},
types::{column::StringPool, StatBuffer, Unmarshal},
};

pub(crate) trait ReadEx {
fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()>;
fn read_scalar<V>(&mut self) -> ClickhouseResult<V>
fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()>;
fn read_scalar<V>(&mut self) -> Result<V>
where
V: Copy + Unmarshal<V> + StatBuffer;
fn read_string(&mut self) -> ClickhouseResult<String>;
fn skip_string(&mut self) -> ClickhouseResult<()>;
fn read_uvarint(&mut self) -> ClickhouseResult<u64>;
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> ClickhouseResult<()>;
fn read_string(&mut self) -> Result<String>;
fn skip_string(&mut self) -> Result<()>;
fn read_uvarint(&mut self) -> Result<u64>;
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()>;
}

const MAX_STACK_BUFFER_LEN: usize = 1024;
Expand All @@ -22,7 +22,7 @@ impl<T> ReadEx for T
where
T: io::Read,
{
fn read_bytes(&mut self, rv: &mut [u8]) -> ClickhouseResult<()> {
fn read_bytes(&mut self, rv: &mut [u8]) -> Result<()> {
let mut i = 0;
while i < rv.len() {
let res_nread = {
Expand All @@ -41,7 +41,7 @@ where
Ok(())
}

fn read_scalar<V>(&mut self) -> ClickhouseResult<V>
fn read_scalar<V>(&mut self) -> Result<V>
where
V: Copy + Unmarshal<V> + StatBuffer,
{
Expand All @@ -50,14 +50,14 @@ where
Ok(V::unmarshal(buffer.as_ref()))
}

fn read_string(&mut self) -> ClickhouseResult<String> {
fn read_string(&mut self) -> Result<String> {
let str_len = self.read_uvarint()? as usize;
let mut buffer = vec![0_u8; str_len];
self.read_bytes(buffer.as_mut())?;
Ok(String::from_utf8(buffer)?)
}

fn skip_string(&mut self) -> ClickhouseResult<()> {
fn skip_string(&mut self) -> Result<()> {
let str_len = self.read_uvarint()? as usize;

if str_len <= MAX_STACK_BUFFER_LEN {
Expand All @@ -76,7 +76,7 @@ where
Ok(())
}

fn read_uvarint(&mut self) -> ClickhouseResult<u64> {
fn read_uvarint(&mut self) -> Result<u64> {
let mut x = 0_u64;
let mut s = 0_u32;
let mut i = 0_usize;
Expand All @@ -97,7 +97,7 @@ where
}
}

fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> ClickhouseResult<()> {
fn read_str_into_buffer(&mut self, pool: &mut StringPool) -> Result<()> {
let str_len = self.read_uvarint()? as usize;
let buffer = pool.allocate(str_len);
self.read_bytes(buffer)?;
Expand Down
8 changes: 7 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, io, mem, str::Utf8Error, string::FromUtf8Error};
use std::{borrow::Cow, io, mem, str::Utf8Error, string::FromUtf8Error, result};

use failure::*;
use tokio::prelude::*;
Expand All @@ -8,6 +8,9 @@ use url::ParseError;

use crate::types::Packet;

/// Result type alias for this library.
pub type Result<T> = result::Result<T, Error>;

/// This type enumerates library errors.
#[derive(Debug, Fail)]
pub enum Error {
Expand Down Expand Up @@ -92,6 +95,9 @@ pub enum FromSqlError {

#[fail(display = "Out of range.")]
OutOfRange,

#[fail(display = "Unsupported operation.")]
UnsupportedOperation,
}

impl From<DriverError> for Error {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub(crate) use self::{
io_future::{BoxFuture, BoxStream},
box_future::{BoxFuture, BoxStream},
transport::ClickhouseTransport,
};

mod io_future;
mod box_future;
pub(crate) mod transport;
Empty file removed src/io/read_to_end.rs
Empty file.
6 changes: 1 addition & 5 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,7 @@ impl Stream for ClickhouseTransport {
// Try to parse the new data!
let ret = self.try_parse_msg();

self.buf_is_incomplete = if let Ok(Async::NotReady) = ret {
true
} else {
false
};
self.buf_is_incomplete = if let Ok(Async::NotReady) = ret { true } else { false };

ret
}
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@
//! ) Engine=Memory";
//!
//! let block = Block::new()
//! .add_column("customer_id", vec![1_u32, 3, 5, 7, 9])
//! .add_column("amount", vec![2_u32, 4, 6, 8, 10])
//! .add_column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
//! .column("customer_id", vec![1_u32, 3, 5, 7, 9])
//! .column("amount", vec![2_u32, 4, 6, 8, 10])
//! .column("account_name", vec![Some("foo"), None, None, None, Some("bar")]);
//!
//! # let database_url = env::var("DATABASE_URL").unwrap_or("tcp://localhost:9000?compression=lz4".into());
//! let pool = Pool::new(database_url);
Expand Down Expand Up @@ -137,6 +137,7 @@ use crate::{
types::{Block, Cmd, Context, IntoOptions, Options, OptionsSource, Packet, Query, QueryResult},
};
use failure::_core::time::Duration;
use crate::types::Complex;

mod binary;
mod client_info;
Expand Down Expand Up @@ -351,7 +352,7 @@ impl ClientHandle {

/// Fetch data from table. It returns a block that contains all rows.
#[deprecated(since = "0.1.7", note = "please use query(sql).fetch_all() instead")]
pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block)>
pub fn query_all<Q>(self, sql: Q) -> BoxFuture<(Self, Block<Complex>)>
where
Query: From<Q>,
{
Expand Down
7 changes: 4 additions & 3 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tokio::prelude::{*, task::{self, Task}};
use crate::{
io::BoxFuture,
pool::futures::GetHandle,
types::{ClickhouseResult, IntoOptions, OptionsSource},
errors::Result,
types::{IntoOptions, OptionsSource},
Client, ClientHandle,
};

Expand Down Expand Up @@ -178,7 +179,7 @@ impl Pool {
fun(self.inner.lock().unwrap())
}

fn poll(&mut self) -> ClickhouseResult<Async<ClientHandle>> {
fn poll(&mut self) -> Result<Async<ClientHandle>> {
self.handle_futures()?;

match self.take_conn() {
Expand Down Expand Up @@ -206,7 +207,7 @@ impl Pool {
Client::open(&self.options, Some(self.clone()))
}

fn handle_futures(&mut self) -> ClickhouseResult<()> {
fn handle_futures(&mut self) -> Result<()> {
self.with_inner(|mut inner| {
let result = match inner.new {
None => return Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions src/types/block/block_info.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
errors::Result,
binary::{Encoder, ReadEx},
types::ClickhouseResult,
};

#[allow(dead_code)]
Expand All @@ -26,7 +26,7 @@ impl Default for BlockInfo {
}

impl BlockInfo {
pub(crate) fn read<R: ReadEx>(reader: &mut R) -> ClickhouseResult<Self> {
pub(crate) fn read<R: ReadEx>(reader: &mut R) -> Result<Self> {
let block_info = Self {
num1: reader.read_uvarint()?,
is_overflows: reader.read_scalar()?,
Expand Down
Loading