Skip to content

Commit

Permalink
feat(inserter): add force_commit()
Browse files Browse the repository at this point in the history
  • Loading branch information
loyd committed Feb 29, 2024
1 parent e445311 commit 24b39c7
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- insert: apply options set on the client ([#90]).
- inserter: can be limited by size, see `Inserter::with_max_bytes()`.
- inserter: `Inserter::pending()` to get stats about still being inserted data.
- inserter: `Inserter::force_commit()` to commit and insert immediately.

### Changed
- **BREAKING** inserter: move under the `inserter` feature.
Expand Down
27 changes: 17 additions & 10 deletions src/inserter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::{error::Result, insert::Insert, row::Row, ticks::Ticks, Client};
/// Performs multiple consecutive `INSERT`s.
///
/// By default, it doesn't end the current active `INSERT` automatically.
/// Use `with_max_bytes`,`with_max_rows` and `with_period` to set limits.
/// Use `with_max_bytes`, `with_max_rows` and `with_period` to set limits.
/// Alternatively, use `force_commit` to end an active `INSERT` whenever you want.
///
/// Rows are being sent progressively to spread network load.
///
Expand Down Expand Up @@ -213,17 +214,23 @@ where

/// Checks limits and ends the current `INSERT` if they are reached.
pub async fn commit(&mut self) -> Result<Quantities> {
if !self.limits_reached() {
self.in_transaction = false;
return Ok(Quantities::ZERO);
}

self.force_commit().await
}

/// Ends the current `INSERT` unconditionally.
pub async fn force_commit(&mut self) -> Result<Quantities> {
self.in_transaction = false;

Ok(if self.limits_reached() {
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
let result = self.insert().await;
self.ticks.reschedule();
result?;
quantities
} else {
Quantities::ZERO
})
let quantities = mem::replace(&mut self.pending, Quantities::ZERO);
let result = self.insert().await;
self.ticks.reschedule();
result?;
Ok(quantities)
}

/// Ends the current `INSERT` and whole `Inserter` unconditionally.
Expand Down
55 changes: 43 additions & 12 deletions tests/it/inserter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![cfg(feature = "inserter")]

use std::string::ToString;

use serde::Serialize;

use clickhouse::{inserter::Quantities, Client, Row};
Expand All @@ -9,6 +11,14 @@ struct MyRow {
data: String,
}

impl MyRow {
fn new(data: impl ToString) -> Self {
Self {
data: data.to_string(),
}
}
}

async fn create_table(client: &Client) {
client
.query("CREATE TABLE test(data String) ENGINE = MergeTree ORDER BY data")
Expand All @@ -17,6 +27,35 @@ async fn create_table(client: &Client) {
.unwrap();
}

#[tokio::test]
async fn force_commit() {
let client = prepare_database!();
create_table(&client).await;

let mut inserter = client.inserter("test").unwrap();
let rows = 100;

for i in 1..=rows {
inserter.write(&MyRow::new(i)).unwrap();
assert_eq!(inserter.commit().await.unwrap(), Quantities::ZERO);

if i % 10 == 0 {
assert_eq!(inserter.force_commit().await.unwrap().rows, 10);
}
}

assert_eq!(inserter.end().await.unwrap(), Quantities::ZERO);

let (count, sum) = client
.query("SELECT count(), sum(toUInt64(data)) FROM test")
.fetch_one::<(u64, u64)>()
.await
.unwrap();

assert_eq!(count, rows);
assert_eq!(sum, (1..=rows).sum::<u64>());
}

#[tokio::test]
async fn limited_by_rows() {
let client = prepare_database!();
Expand All @@ -26,13 +65,9 @@ async fn limited_by_rows() {
let rows = 100;

for i in (2..=rows).step_by(2) {
let row = MyRow {
data: (i - 1).to_string(),
};
let row = MyRow::new(i - 1);
inserter.write(&row).unwrap();
let row = MyRow {
data: i.to_string(),
};
let row = MyRow::new(i);
inserter.write(&row).unwrap();

let inserted = inserter.commit().await.unwrap();
Expand Down Expand Up @@ -71,9 +106,7 @@ async fn limited_by_bytes() {
let mut inserter = client.inserter("test").unwrap().with_max_bytes(100);
let rows = 100;

let row = MyRow {
data: "x".repeat(9), // +1 for length
};
let row = MyRow::new("x".repeat(9));

for i in 1..=rows {
inserter.write(&row).unwrap();
Expand Down Expand Up @@ -118,9 +151,7 @@ async fn limited_by_time() {
let rows = 100;

for i in 1..=rows {
let row = MyRow {
data: i.to_string(),
};
let row = MyRow::new(i);
inserter.write(&row).unwrap();

tokio::time::sleep(period / 10).await;
Expand Down

0 comments on commit 24b39c7

Please sign in to comment.