Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support transaction for single-table CDC #11453

Merged
merged 8 commits into from
Aug 10, 2023

Conversation

BugenZhao
Copy link
Member

@BugenZhao BugenZhao commented Aug 4, 2023

Signed-off-by: Bugen Zhao i@bugenzhao.comI hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Support for handling transaction boundaries in Debezium CDC messages. Users can specify transactional = 'true' in the table definition to enable this feature. Tested locally.

The idea is relatively simple:

  • do not yield a chunk if we're currently in a transaction
  • if we're delayed by a transaction, yield it immediately after the current transaction is committed
  • if a transaction is too large, force commit

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

  • My PR contains user-facing changes.
Click here for Documentation

Types of user-facing changes

Please keep the types that apply to your changes, and remove the others.

  • Connector (sources & sinks)

Release note

EXPERIMENTAL Specify transactional = 'true' in the WITH clause of a CDC table definition to provide atomicity for transactions in the upstream for this single table. For the consideration of performance, transactions involving changes of more than 4096 rows cannot be guaranteed.

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
@BugenZhao BugenZhao marked this pull request as ready for review August 7, 2023 07:25
@github-actions github-actions bot added the user-facing-changes Contains changes that are visible to users label Aug 7, 2023
@codecov
Copy link

codecov bot commented Aug 7, 2023

Codecov Report

Merging #11453 (6243f35) into main (f76cbc5) will decrease coverage by 0.02%.
Report is 15 commits behind head on main.
The diff coverage is 18.33%.

@@            Coverage Diff             @@
##             main   #11453      +/-   ##
==========================================
- Coverage   70.42%   70.41%   -0.02%     
==========================================
  Files        1351     1351              
  Lines      225283   225338      +55     
==========================================
+ Hits       158655   158664       +9     
- Misses      66628    66674      +46     
Flag Coverage Δ
rust 70.41% <18.33%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
src/connector/src/parser/mod.rs 45.60% <3.70%> (-3.07%) ⬇️
src/connector/src/parser/unified/debezium.rs 58.09% <17.64%> (-7.82%) ⬇️
...c/connector/src/parser/debezium/debezium_parser.rs 51.25% <40.00%> (-3.17%) ⬇️
src/connector/src/parser/unified/mod.rs 87.50% <100.00%> (+0.83%) ⬆️

... and 2 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest LGTM

split_offset_mapping: Some(split_offset_mapping),
};
// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we limit the max size of a transaction? Similar with the limitation for DML transaction. Otherwise, it's possible that a very large transaction from upstream DB causes RW to crash

/// If a transaction's data is less than `MAX_CHUNK_FOR_ATOMICITY` * `CHUNK_SIZE`, we can provide
/// atomicity. Otherwise, it is possible that part of transaction's data is sent to the downstream
/// without barrier boundaries. There are some cases that could cause non-atomicity for large
/// transaction. 1. The system crashes.
/// 2. Actor scale-in or migration.
/// 3. Dml's batch query error occurs at the middle of its execution. (e.g. Remove UDF function
/// server become unavailable).
const MAX_CHUNK_FOR_ATOMICITY: usize = 32;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. PTAL 🥰

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to unify these 2 numbers and document it somewhere in our docs. cc. @chenzl25

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. We need to document it in DML and table CDC section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenzl25 @fuyufjh @BugenZhao Please see this doc PR for the related doc updates. Please review and make edits as you see fit. Thanks.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically LGTM

so we cannot handle case that txn and scale occur at the same time? and how to prevent it?

Comment on lines +130 to +138
#[allow(clippy::unused_async)] // false positive for `async_trait`
async fn parse_one<'a>(
&'a mut self,
_key: Option<Vec<u8>>,
_payload: Option<Vec<u8>>,
_writer: SourceStreamChunkRowWriter<'a>,
) -> Result<WriteGuard> {
unreachable!("should call `parse_one_with_txn` instead")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a required method of this trait, used for simplifying the implementation of other connectors that do not care about transactions.

Copy link
Contributor

@StrikeW StrikeW left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest lgtm. I think we should set a maximum events limit for transactions to prevent OOM.

src/connector/src/parser/mod.rs Show resolved Hide resolved
@BugenZhao
Copy link
Member Author

so we cannot handle case that txn and scale occur at the same time? and how to prevent it?

I don't quite get it. 👀 As we never yield a data chunk when currently inside a transaction, the offset persisted by the source executor for recovery and scaling will also respect the transaction. 🤔

Signed-off-by: Bugen Zhao <i@bugenzhao.com>
@BugenZhao BugenZhao added this pull request to the merge queue Aug 10, 2023
Merged via the queue into main with commit 31e68e9 Aug 10, 2023
39 checks passed
@BugenZhao BugenZhao deleted the bz/single-table-txn branch August 10, 2023 06:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants