Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for async insert #66

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

magurotuna
Copy link

@magurotuna magurotuna commented May 21, 2023

Closes #65

This patch adds a support for asynchronous inserts.

In order not to break public interface, a new method named async_insert is added to Client, instead of fixing the existing insert method.

This is my first patch to this repo, so please let me know if there's anything I can improve to make it more consistent with the existing code base. Thanks!

@magurotuna
Copy link
Author

465b1cf has fixed a issue with serializing Datetime data type (there may have been other types that also had the issue). Before this commit, we got the following error message when trying to insert.

thread 'async_insert' panicked at 'called `Result::unwrap()` on an `Err` value: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 17. Bytes expected: 111.: (at row 105)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 23.4.2.11 (official build))")', tests/test_query.rs:106:24

This happened because FORMAT RowBinary must come last, as the doc says in https://clickhouse.com/docs/en/sql-reference/statements/insert-into:

If you want to specify SETTINGS for INSERT query then you have to do it before FORMAT clause since everything after FORMAT format_name is treated as data.

@adijo
Copy link

adijo commented Jun 12, 2023

Thanks a lot for this. I've used it to ingest fairly large datasets ~200m+ rows

@orlandosh
Copy link

that's awesome, thank you! I hope it gets merged soon

@loyd
Copy link
Owner

loyd commented Oct 5, 2023

Sorry for the late answer.

Async inserts are a nice feature and have become production-ready to have API for them. And suggested PR looks great. However, I have ambivalent feelings about it. It's simply options and can be provided by Client::with_option(..) already. So, should we include it in the library generally?

@adijo

I've used it to ingest fairly large datasets ~200m+ rows

Have you tried to use client.inserter(..) instead, which provides automatic slicing for streaming? Some of our data stream inserts 300k/s rows with 1s slicing, which is very convenient. Async inserts are not available for reading anyway. However, if you have a lot of writing nodes, async inserts can be useful, of course.

@magurotuna
Copy link
Author

However, I have ambivalent feelings about it. It's simply options and can be provided by Client::with_option(..) already. So, should we include it in the library generally?

Does Client::with_option(..) work for async inserts? I just tried the following code to confirm:

use clickhouse::{Client, Row};
use serde::Serialize;

#[tokio::main]
async fn main() {
    let client = Client::default()
        .with_url("http://localhost:8123")
        .with_user("user")
        .with_password("password")
        .with_database("mydb")
        .with_option("async_insert", "1")
        .with_option("wait_for_async_insert", "1");

    #[derive(Debug, Row, Serialize)]
    struct MyRow {
        user_id: u32,
        message: String,
    }

    let mut insert = client.insert("my_table").unwrap();
    insert
        .write(&MyRow {
            user_id: 2,
            message: "I am 2".to_string(),
        })
        .await
        .unwrap();
    insert.end().await.unwrap();
}

and the query actually performed doesn't seem to contain SETTINGS async_insert=1, wait_for_async_insert=1 part.

SELECT
    query,
    event_time
FROM query_log
ORDER BY event_time DESC
LIMIT 5

Query id: 2fbe6de2-45ad-46ed-8a93-aee2b7c53342

┌─query──────────────────────────────────────────────────────┬──────────event_time─┐
│ INSERT INTO my_table(`user_id`,`message`) FORMAT RowBinary
 │ 2023-11-01 15:39:41 │
│ INSERT INTO my_table(`user_id`,`message`) FORMAT RowBinary
 │ 2023-11-01 15:39:41 │
└────────────────────────────────────────────────────────────┴─────────────────────┘
┌─query─────────────────────────────────────────────────────────────────────┬──────────event_time─┐
│ select * from my_table;                                                   │ 2023-11-01 15:38:41 │
│ select * from my_table;                                                   │ 2023-11-01 15:38:41 │
│ select query, event_time from query_log order by event_time desc limit 5; │ 2023-11-01 15:38:22 │
└───────────────────────────────────────────────────────────────────────────┴─────────────────────┘

5 rows in set. Elapsed: 0.008 sec.

While by using my patch:

use clickhouse::{AsyncInsertOptions, Client, Row};
use serde::Serialize;

#[tokio::main]
async fn main() {
    let client = Client::default()
        .with_url("http://localhost:22222")
        .with_user("user")
        .with_password("password")
        .with_database("deploy");

    #[derive(Debug, Row, Serialize)]
    struct MyRow {
        user_id: u32,
        message: String,
    }

    let opts = AsyncInsertOptions::builder()
        .async_insert(true)
        .wait_for_async_insert(true)
        .build();
    let mut insert = client.async_insert("my_table", opts).unwrap();
    insert
        .write(&MyRow {
            user_id: 3,
            message: "I am 3".to_string(),
        })
        .await
        .unwrap();
    insert.end().await.unwrap();
}

the performed query does include SETTINGS async_insert=1, wait_for_async_insert=1 part.

312c0255e2d4 :) select query, event_time from query_log order by event_time desc limit 5;

SELECT
    query,
    event_time
FROM query_log
ORDER BY event_time DESC
LIMIT 5

Query id: 5b095928-da64-4728-a9d0-e1242e9d40ef

┌─query────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────event_time─┐
│ INSERT INTO my_table (`user_id`,`message`) SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT RowBinary
 │ 2023-11-01 15:54:37 │
│ INSERT INTO my_table (`user_id`,`message`) SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT RowBinary
 │ 2023-11-01 15:54:37 │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────┘
┌─query─────────────────────────────────────────────────────────────────────┬──────────event_time─┐
│ select query, event_time from query_log order by event_time desc limit 5; │ 2023-11-01 15:54:30 │
│ select query, event_time from query_log order by event_time desc limit 5; │ 2023-11-01 15:54:30 │
│ select * from my_table;                                                   │ 2023-11-01 15:54:15 │
└───────────────────────────────────────────────────────────────────────────┴─────────────────────┘

5 rows in set. Elapsed: 0.007 sec.

Also, I'd prefer not to make async_insert settings configured via Client level, because we might want to use the same instance of Client with async_insert enabled sometimes, and with disabled at another time. It would be reasonable to enable async insert on every insert operation in this scenario in my opinion.

There is a conflict in one file, and if there's a chance of getting my patch to be merged I'd be happy to address this conflict :)

@magurotuna
Copy link
Author

@loyd Resolved the conflict and it's now ready for review again. Could I get your review and opinion on whether you think this is a right direction? Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for async insert
4 participants