Skip to content

Commit

Permalink
fix inserting block of size bigger than INSERT_BLOCK_SIZE (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Nov 1, 2023
1 parent 409a406 commit b1f627b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 45 deletions.
98 changes: 66 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ use crate::{
pool::PoolBinding,
retry_guard::retry_guard,
types::{
query_result::stream_blocks::BlockStream, Cmd, Context, IntoOptions, OptionsSource, Packet,
Query, QueryResult, SqlType,
block::{ChunkIterator, INSERT_BLOCK_SIZE},
query_result::stream_blocks::BlockStream,
Cmd, Context, IntoOptions, OptionsSource, Packet, Query, QueryResult, SqlType,
},
};
pub use crate::{
errors::ConnectionError,
pool::Pool,
types::{block::Block, Options},
types::{block::Block, Options, Simple},
};

mod binary;
Expand Down Expand Up @@ -437,26 +438,16 @@ impl ClientHandle {
Query: From<Q>,
B: AsRef<Block>,
{
let transport = self.insert_(table, block.as_ref()).await?;
let query = Self::make_query(table, block.as_ref())?;
let transport = self.insert_(query.clone(), block.as_ref()).await?;
self.inner = Some(transport);
Ok(())
}

async fn insert_<Q>(&mut self, table: Q, block: &Block) -> Result<ClickhouseTransport>
where
Query: From<Q>,
{
async fn insert_(&mut self, query: Query, block: &Block) -> Result<ClickhouseTransport> {
let timeout = try_opt!(self.context.options.get())
.insert_timeout
.unwrap_or_else(|| Duration::from_secs(0));
let mut names: Vec<_> = Vec::with_capacity(block.column_count());
for column in block.columns() {
names.push(try_opt!(column_name_to_string(column.name())));
}
let fields = names.join(", ");

let query =
Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES"));

let context = self.context.clone();

Expand All @@ -468,22 +459,15 @@ impl ClientHandle {

async move {
let transport = transport?.clear().await?;
let stream = transport.call(Cmd::SendQuery(query, context.clone()));
let (transport, b) = stream.read_block().await?;
let dst_block = b.unwrap();

let casted_block = match block.cast_to(&dst_block) {
Ok(value) => value,
Err(err) => return Err(err),
};

let send_cmd = Cmd::Union(
Box::new(Cmd::SendData(casted_block, context.clone())),
Box::new(Cmd::SendData(Block::default(), context.clone())),
);

let (transport, _) = transport.call(send_cmd).read_block().await?;
Ok(transport)
let (transport, dst_block) =
Self::send_insert_query_(transport, context.clone(), query.clone())
.await?;
let casted_block = block.cast_to(&dst_block)?;
let mut chunks = casted_block.chunks(INSERT_BLOCK_SIZE);
let transport =
Self::insert_block_(transport, context.clone(), chunks.next().unwrap())
.await?;
Self::insert_tail_(transport, context, query, chunks).await
}
})
.await
Expand All @@ -493,6 +477,56 @@ impl ClientHandle {
.await
}

async fn insert_tail_(
mut transport: ClickhouseTransport,
context: Context,
query: Query,
chunks: ChunkIterator<Simple>,
) -> Result<ClickhouseTransport> {
for chunk in chunks {
let (transport_, _) =
Self::send_insert_query_(transport, context.clone(), query.clone()).await?;
transport = Self::insert_block_(transport_, context.clone(), chunk).await?;
}
Ok(transport)
}

async fn send_insert_query_(
transport: ClickhouseTransport,
context: Context,
query: Query,
) -> Result<(ClickhouseTransport, Block)> {
let stream = transport.call(Cmd::SendQuery(query, context));
let (transport, b) = stream.read_block().await?;
let dst_block = b.unwrap();
Ok((transport, dst_block))
}

async fn insert_block_(
transport: ClickhouseTransport,
context: Context,
block: Block,
) -> Result<ClickhouseTransport> {
let send_cmd = Cmd::Union(
Box::new(Cmd::SendData(block, context.clone())),
Box::new(Cmd::SendData(Block::default(), context)),
);
let (transport, _) = transport.call(send_cmd).read_block().await?;
Ok(transport)
}

fn make_query<Q>(table: Q, block: &Block) -> Result<Query>
where
Query: From<Q>,
{
let mut names: Vec<_> = Vec::with_capacity(block.as_ref().column_count());
for column in block.as_ref().columns() {
names.push(try_opt!(column_name_to_string(column.name())));
}
let fields = names.join(", ");
Ok(Query::from(table).map_sql(|table| format!("INSERT INTO {table} ({fields}) VALUES")))
}

pub(crate) async fn wrap_future<T, R, F>(&mut self, f: F) -> Result<T>
where
F: FnOnce(&mut Self) -> R + Send,
Expand Down
10 changes: 5 additions & 5 deletions src/types/block/chunk_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::cmp;

use crate::types::{Block, ColumnType};

pub struct ChunkIterator<'a, K: ColumnType> {
pub(crate) struct ChunkIterator<K: ColumnType> {
position: usize,
size: usize,
block: &'a Block<K>,
block: Block<K>,
}

impl<'a, K: ColumnType> Iterator for ChunkIterator<'a, K> {
impl<K: ColumnType> Iterator for ChunkIterator<K> {
type Item = Block;

fn next(&mut self) -> Option<Block> {
Expand Down Expand Up @@ -37,8 +37,8 @@ impl<'a, K: ColumnType> Iterator for ChunkIterator<'a, K> {
}
}

impl<'a, K: ColumnType> ChunkIterator<'a, K> {
pub fn new(size: usize, block: &Block<K>) -> ChunkIterator<K> {
impl<K: ColumnType> ChunkIterator<K> {
pub fn new(size: usize, block: Block<K>) -> ChunkIterator<K> {
ChunkIterator {
position: 0,
size,
Expand Down
13 changes: 5 additions & 8 deletions src/types/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,20 @@ use crate::{
},
};

use self::chunk_iterator::ChunkIterator;
pub(crate) use self::row::BlockRef;
pub use self::{
block_info::BlockInfo,
builder::{RCons, RNil, RowBuilder},
row::{Row, Rows},
};
pub(crate) use self::{chunk_iterator::ChunkIterator, row::BlockRef};

mod block_info;
mod builder;
mod chunk_iterator;
mod compressed;
mod row;

const INSERT_BLOCK_SIZE: usize = 1_048_576;
pub(crate) const INSERT_BLOCK_SIZE: usize = 1_048_576;

const DEFAULT_CAPACITY: usize = 100;

Expand Down Expand Up @@ -372,12 +371,10 @@ impl<K: ColumnType> Block<K> {
pub(crate) fn send_data(&self, encoder: &mut Encoder, compress: bool) {
encoder.uvarint(protocol::CLIENT_DATA);
encoder.string(""); // temporary table
for chunk in self.chunks(INSERT_BLOCK_SIZE) {
chunk.write(encoder, compress);
}
self.write(encoder, compress);
}

pub(crate) fn chunks(&self, n: usize) -> ChunkIterator<K> {
pub(crate) fn chunks(self, n: usize) -> ChunkIterator<K> {
ChunkIterator::new(n, self)
}
}
Expand Down Expand Up @@ -571,7 +568,7 @@ mod test {
#[test]
fn test_chunks_of_empty_block() {
let block = Block::default();
assert_eq!(1, block.chunks(100_500).count());
assert_eq!(1, block.clone().chunks(100_500).count());
assert_eq!(Some(block.clone()), block.chunks(100_500).next());
}

Expand Down
28 changes: 28 additions & 0 deletions tests/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2378,3 +2378,31 @@ async fn test_iter_int_128() -> Result<(), Error> {

Ok(())
}

#[cfg(feature = "tokio_io")]
#[tokio::test]
async fn test_insert_big_block() -> Result<(), Error> {
let ddl = r"
CREATE TABLE clickhouse_test_insert_big_block (
int8 Int8
) Engine=Memory";
let big_block_size = 1024*1024 + 1;

let block = Block::new()
.column("int8", vec![-1_i8; big_block_size]);

let expected = block.clone();
let pool = Pool::new(database_url());
let mut c = pool.get_handle().await?;
c.execute("DROP TABLE IF EXISTS clickhouse_test_insert_big_block")
.await?;
c.execute(ddl).await?;
c.insert("clickhouse_test_insert_big_block", block).await?;
let actual = c
.query("SELECT * FROM clickhouse_test_insert_big_block")
.fetch_all()
.await?;

assert_eq!(format!("{:?}", expected.as_ref()), format!("{:?}", &actual));
Ok(())
}

0 comments on commit b1f627b

Please sign in to comment.