Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sink): add cassandra batch size and fix bigquery array null #15516

Merged
merged 5 commits into from
Mar 8, 2024

Conversation

xxhZs
Copy link
Contributor

@xxhZs xxhZs commented Mar 7, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

in this pr, we add bigquery's max_batch_rows and cassandra's max_batch_rows request_timeout to fix problems of excessive data size.
And we fix bigquery array null can't insert error,(the array null will be converted to []. )
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

In cassandra sink. Add two optional option.
cassandra.max_batch_rows: Number of batch rows sent at a time defaults is 512,
It should be less than 65535 according to cassandra, and ensures that the size of each batch(max_batch_rows * size of each row) is less than 50kb (Configuration items for cassandra: batch_size_fail_threshold_in_kb)
cassandra.request_timeout_ms: Waiting time for each batch, default is 2000ms
it not recommended to change, if you encounter timeout time is not enough, prioritize to reduce batch size

In bigquery sink, Add optional option
bigquery.max_batch_rows: Number of batch rows sent at a time defaults is 1024,

Also due to this document, we will convert an array with a null value to an empty array
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types

@github-actions github-actions bot added the type/fix Bug fix label Mar 7, 2024
@xxhZs xxhZs requested review from wenym1 and hzxa21 March 7, 2024 09:50
@xxhZs xxhZs force-pushed the xxh/fix-cassandra-max-batch-size branch from 64911ba to e10dad1 Compare March 7, 2024 09:54
@hzxa21 hzxa21 added the user-facing-changes Contains changes that are visible to users label Mar 7, 2024
Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM!

Please don't forget to update the "Documentation" + "Release Notes" sections in the PR description and add user-facing-change label for PR including user facing changes. Otherwise, the doc can become outdated.

Also, please link relevant issues to the PR.

public CassandraConfig withMaxBatchRows(Integer maxBatchRows) {
if (maxBatchRows > 65536 || maxBatchRows < 1) {
throw new IllegalArgumentException(
"cassandra.max_batch_rows must be <= 65535 and >= 1");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: better to include the maxBatchRows in the error message.


public CassandraConfig withRequestTimeoutMs(Integer requestTimeoutMs) {
if (requestTimeoutMs < 1) {
throw new IllegalArgumentException("cassandra.request_timeout_ms must be >= 1");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits: better to include requestTimeoutMs in the error message.

src/connector/src/sink/big_query.rs Show resolved Hide resolved
Comment on lines 207 to 210
if let CustomJsonType::BigQuery = custom_json_type
&& matches!(field.data_type(), DataType::List(_))
{
return Ok(Value::Array(vec![]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include the BigQuery doc link as a comment here to explain why we need to do this?

@xxhZs xxhZs enabled auto-merge March 8, 2024 03:34
@xxhZs xxhZs added this pull request to the merge queue Mar 8, 2024
@@ -17,6 +17,10 @@ BigQueryConfig:
- name: bigquery.table
field_type: String
required: true
- name: bigquery.max_batch_rows
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that with options for cassandra sink are not included in this yaml. Let's revisit other sinks as well to ensure all sinks are covered. We can merge this PR first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants