Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations #974

Closed
lvboudre opened this issue Apr 3, 2024 · 11 comments · May be fixed by #738
Closed
Labels
performance Improves performance of existing features
Milestone

Comments

@lvboudre
Copy link

lvboudre commented Apr 3, 2024

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations

Overview:

In our current project utilizing ScyllaDB, we're implementing a high-frequency, append-log style architecture to handle concurrent append requests. To optimize performance and minimize network traffic, we're batching these requests similar to how Kafka API operates, sending batches to ScyllaDB every 10 milliseconds.

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

Current Challenge:

Presently, the existing API poses challenges in efficiently computing the token of a PreparedStatement without incurring significant performance overhead. The process involves invoking Session::calculate_token, which necessitates serializing a row (resulting in memory allocation), extracting the partition key, and then computing the token. Subsequently, when batching these statements using Session::batch, each row undergoes serialization again, effectively doubling memory allocation and serialization overhead.

Immediate Solution

To streamline this process and enhance performance, we propose making Session::calculate_token_untyped public instead of keeping it pub(crate). By exposing this method publicly, we can pre-serialize every row, thereby reusing the serialization results to compute tokens and seamlessly integrate them into our batching process.

Additionnal Note

In addition to the proposed enhancement of making Session::calculate_token_untyped public, we suggest making the PartitionHasher publicly accessible as well. This would empower users to compute results in advance without having to go through the serialization process of SerializeRow and PreparedStatement.

Considering that many ScyllaDB use cases involve key-value stores where the partition key is often known early on, exposing PartitionHasher would facilitate more efficient pre-computation of tokens, enhancing overall performance and developer experience.

lvboudre added a commit to lvboudre/scylla-rust-driver that referenced this issue Apr 3, 2024
@Lorak-mmk
Copy link
Collaborator

I'm reluctant to expose such functions publicly - I'd rather think about the solution that avoids the allocation while preserving type safety.
BoundStatement could possibly be such a solution ( #941 ) - it would allow the values to be serialized only once and you could compute token of such statement without serializing again.

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query.

By the way, did you benchmark the performance impact of batches? Does it actually improve performance compared to executing the queries in parallel?
If so, did you benchmark grouping queries on the client side vs skipping grouping and letting Scylla deal with it?

Enhancement Proposal: Simplifying Token Calculation for High-Frequency Append-Log Style Operations

Overview:

In our current project utilizing ScyllaDB, we're implementing a high-frequency, append-log style architecture to handle concurrent append requests. To optimize performance and minimize network traffic, we're batching these requests similar to how Kafka API operates, sending batches to ScyllaDB every 10 milliseconds.

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

Current Challenge:

Presently, the existing API poses challenges in efficiently computing the token of a PreparedStatement without incurring significant performance overhead. The process involves invoking Session::calculate_token, which necessitates serializing a row (resulting in memory allocation), extracting the partition key, and then computing the token. Subsequently, when batching these statements using Session::batch, each row undergoes serialization again, effectively doubling memory allocation and serialization overhead.

You are talking about PreparedStatement::calculate_token, not Session::calculate_token (there is no such thing), right?

Immediate Solution

To streamline this process and enhance performance, we propose making Session::calculate_token_untyped public instead of keeping it pub(crate). By exposing this method publicly, we can pre-serialize every row, thereby reusing the serialization results to compute tokens and seamlessly integrate them into our batching process.

Additionnal Note

In addition to the proposed enhancement of making Session::calculate_token_untyped public, we suggest making the PartitionHasher publicly accessible as well. This would empower users to compute results in advance without having to go through the serialization process of SerializeRow and PreparedStatement.

Considering that many ScyllaDB use cases involve key-value stores where the partition key is often known early on, exposing PartitionHasher would facilitate more efficient pre-computation of tokens, enhancing overall performance and developer experience.

@Lorak-mmk
Copy link
Collaborator

One more thing: does your proposed change actually let you do what you want? In order to use calculate_token_untyped you need SerializedValues which don't implement SerializeRow and so can't be passed as values to a query.

@lvboudre
Copy link
Author

lvboudre commented Apr 3, 2024

One more thing: does your proposed change actually let you do what you want? In order to use calculate_token_untyped you need SerializedValues which don't implement SerializeRow and so can't be passed as values to a query.

@Lorak-mmk yes, what I do is a serialize first my rows and want to send it over calculate_token_untyped.

By the way, did you benchmark the performance impact of batches? Does it actually improve performance compared to executing the queries in parallel?
If so, did you benchmark grouping queries on the client side vs skipping grouping and letting Scylla deal with it?

In our system, we are tasked with achieving a throughput requirement of 125MB/s. It is imperative for us to maintain low latency throughout our operations. We have identified that any delays introduced by ScyllaDB in rerouting have a significant impact on our overall latency performance.

To address this concern and optimize our system for efficiency, we are exploring the implementation of parallel batch processing. Our strategy involves assigning dedicated threads to handle batch inserts for specific nodes. By segmenting the workload in this manner, we aim to minimize the impact of rerouting on latency and ensure consistent performance across the system.

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

@lvboudre
Copy link
Author

lvboudre commented Apr 3, 2024

@Lorak-mmk

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query

I tried to use this function, however, it requires a serialized partition key which is only accessible through a PreparedStatement. The overall driver API makes partition key handling quite convoluting :

I have to go through a PreparedStatement because I can not forge my own RowSerializationContext (since everything is pub(crate)) which is required for serialization.

@Lorak-mmk
Copy link
Collaborator

@Lorak-mmk

In order to decrease the impact for now you could use ClusterData::compute_token - that way you'll only need to allocate and serialize partition key twice, not all values for the query

I tried to use this function, however, it requires a serialized partition key which is only accessible through a PreparedStatement. The overall driver API makes partition key handling quite convoluting :

I have to go through a PreparedStatement because I can not forge my own RowSerializationContext (since everything is pub(crate)) which is required for serialization.

ClusterData::compute_token needs SerializedValues. SerializedValues can be created manually, without PreparedStatement / RowSerializationContext: first you create it with new method and then add partition key values using add_value method.

@mykaul
Copy link
Contributor

mykaul commented Apr 4, 2024

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

nit: LZ4 does not have entropy encoding.

@lvboudre
Copy link
Author

lvboudre commented Apr 4, 2024

In scylladb protocol, each batch frame is capable of accommodating up to 256MB of data and supports Lz4 compression. Batching data reduces entropy since we batch related data together thus enhancing compression ratios when utilizing algorithms such as LZ4.

nit: LZ4 does not have entropy encoding.

I meant that as data have less entropy, the compression ratio will be higher. I was not refering the LZ4 entropyless encoding.
In my use case, data in same partition will be extremely similar (low entropy) therefore I will benefits from compression.

@Ten0
Copy link
Contributor

Ten0 commented May 25, 2024

To ensure efficient batching and minimize network overhead, it's crucial to group insert requests that will ultimately end up on the same node within ScyllaDB. This necessitates the computation of tokens for each insert statement, enabling us to determine their placement within the token ring.

It looks like #738 may be a more direct API than #975 for what you're trying to achieve.
Could you please confirm that is indeed the case?
(See the tests/integration/shard_aware_batching.rs file for a complete usage example.)

@wprzytula wprzytula added this to the 0.15.0 milestone Jun 20, 2024
@wprzytula
Copy link
Collaborator

I'm reluctant to expose such functions publicly - I'd rather think about the solution that avoids the allocation while preserving type safety.
BoundStatement could possibly be such a solution ( #941 ) - it would allow the values to be serialized only once and you could compute token of such statement without serializing again.

As said here, we are not going to expose unsafe (in the sense of non-type-safety) API.
Instead, we're going to introduce BoundStatement, which will pre-serialize values and thus know its token before execution.

@wprzytula wprzytula added the performance Improves performance of existing features label Jun 20, 2024
@lvboudre
Copy link
Author

I've implemented the approach proposed by @Lorak-mmk using ClusterData::compute_token.

However, I would like to share my perspective on the exclusion of lower-level /unsafe APIs. I believe that not providing access to these APIs may hinder power users from fully leveraging ScyllaDB for their specific use cases.

While this is a topic open for debate, I think a database designed for low latency and high scalability should cater to a "hacker" mindset by offering a less opinionated, more flexible API (aka less type safe API).

For instance, the serde API provided by the driver can sometimes lead to performance drawbacks. In my use case, I frequently handle small blob data with a known fixed length. The driver, however, requires me to deserialize blob data as Vec, resulting in unnecessary memory allocations.
I can not provide my custom allocator too, for example I flush fixed size buffer at specific interval that I could reuse the same memory in between flushes. A bump allocator could do the job.

Attempting to work around this by using the serde API is challenging, if not impossible.

In my opinion, this approach might not align well with the target audience of the database.

@Lorak-mmk
Copy link
Collaborator

Lorak-mmk commented Jun 21, 2024

I've implemented the approach proposed by @Lorak-mmk using ClusterData::compute_token.

However, I would like to share my perspective on the exclusion of lower-level /unsafe APIs. I believe that not providing access to these APIs may hinder power users from fully leveraging ScyllaDB for their specific use cases.

While this is a topic open for debate, I think a database designed for low latency and high scalability should cater to a "hacker" mindset by offering a less opinionated, more flexible API (aka less type safe API).

I think that exposing some less safe low-level APIs may make sens if given functionality can't be implemented in a safe way. This is not the case for this issue because implementing BoundStatement solves it.

I think you could also make a bit more hacky workaround (until BoundStatement is introduced) that would not require you to serialize anything twice (the current solution requires serializing partition key twice iiuc). You can make a newtype around SerializedValues and implement SerializeRow for this wrapper.
Then you can serialize values for the query, use inner SerializedValues in ClusterData::compute_token and outer structure when passing the values to Session. I'm not 100% about it, but I think it should work. If you'll have any trouble implementing it let me know and I'll try to prepare a PoC.

For instance, the serde API provided by the driver can sometimes lead to performance drawbacks. In my use case, I frequently handle small blob data with a known fixed length. The driver, however, requires me to deserialize blob data as Vec, resulting in unnecessary memory allocations. I can not provide my custom allocator too, for example I flush fixed size buffer at specific interval that I could reuse the same memory in between flushes. A bump allocator could do the job.

Ongoing deserialization refactor will make the API more flexible. You will be able to deserialize to borrowed types, so you will be able to get rid of this allocation. @wprzytula can tell you more. Not sure about custom allocators, perhaps your custom implementation of DeserializeValue trait could use it.

Attempting to work around this by using the serde API is challenging, if not impossible.

In my opinion, this approach might not align well with the target audience of the database.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance Improves performance of existing features
Projects
None yet
5 participants