Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): Add MongoDB CDC Source #14966

Merged
merged 27 commits into from
Feb 29, 2024
Merged

feat(cdc): Add MongoDB CDC Source #14966

merged 27 commits into from
Feb 29, 2024

Conversation

StrikeW
Copy link
Contributor

@StrikeW StrikeW commented Feb 4, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  1. Add mongodb-cdc connector to support capturing cdc events from MongoDB
  2. Extend the DebeziumMongoJsonParser to handle message key, because we need to extract the _id field for DELETE event
  3. Refactor cdc split implementation
  4. User can use following query to ingest and capture collections in the dev database
CREATE TABLE test (
   _id varchar PRIMARY KEY,
   payload jsonb
) WITH (
   connector='mongodb-cdc',
   mongodb.url='mongodb://localhost:27017/?replicaSet=rs0',
   collection.name='dev.*'
);

Unit test and e2e tests will be added in follow up #15003

#14759

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

  • Add mongodb-cdc connector to support capturing cdc events from MongoDB

@StrikeW StrikeW added the user-facing-changes Contains changes that are visible to users label Feb 4, 2024
Copy link

gitguardian bot commented Feb 4, 2024

⚠️ GitGuardian has uncovered 1 secret following the scan of your pull request.

Please consider investigating the findings and remediating the incidents. Failure to do so may lead to compromising the associated services or software components.

🔎 Detected hardcoded secret in your pull request
GitGuardian id GitGuardian status Secret Commit Filename
9425213 Triggered Generic Password 8433b84 ci/scripts/regress-test.sh View secret
🛠 Guidelines to remediate hardcoded secrets
  1. Understand the implications of revoking this secret by investigating where it is used in your code.
  2. Replace and store your secret safely. Learn here the best practices.
  3. Revoke and rotate this secret.
  4. If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.

To avoid such incidents in the future consider


🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.

Our GitHub checks need improvements? Share your feedbacks!

@neverchanje
Copy link
Contributor

neverchanje commented Feb 4, 2024

Good job!

  • I wonder if we support multiple mongodb addresses? It'd be common to have multiple mongodb nodes in real cases.
  • Is FORMAT DEBEZIUM_MONGO ENCODE JSON; still needed for the new source?

@StrikeW
Copy link
Contributor Author

StrikeW commented Feb 4, 2024

Good job!

  • I wonder if we support multiple mongodb addresses? It'd be common to have multiple mongodb nodes in real cases.

I think we do. The underlying debezium connector can support replica set and shared cluster deployment. User just need to provide the corresponding connection string to us:

  • replica set
    mongodb://user:password@mongodb0.example.com:27017,mongodb1.example.com:27017,mongodb2.example.com:27017/?replicaSet=myRepl

  • sharded cluster
    mongodb://user:password@mongos0.example.com:27017,mongos1.example.com:27017,mongos2.example.com:27017/

  • Is FORMAT DEBEZIUM_MONGO ENCODE JSON; still needed for the new source?

Yeah, we can omit it from the sql.

@neverchanje
Copy link
Contributor

Please forbid unnecessary attributes in the final syntax, since a direct mongodb source does not require them.

Copy link
Contributor

@neverchanje neverchanje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried my best reviewing the code. Still some code to refine, but overall LGTM. Please find engineers to approve this PR.

@@ -218,9 +218,12 @@ impl Parser {
if connector.contains("-cdc") {
Copy link
Member

@fuyufjh fuyufjh Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend parsing all possible values (mongodb-cdc, postgres-cdc, mysql-cdc) to enum and use equal conditions here. This code looks too lossy, and it's hard to say what will happen if I pass some strange values ending with -cdc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave to the PR that adding e2e tests.

@StrikeW
Copy link
Contributor Author

StrikeW commented Feb 26, 2024

@fuyufjh @hzxa21 @tabVersion PTAL again, I‘d like to get the basic functionality merge first.

LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
if (error != null) {
responseObserver.onError(error);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any motivation for this change? In what situation will it failed with error == null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For defensive of nullptr and match the implementation of the following create(DbzConnectorConfig config, long channelPtr) method.
Actually newCdcEngineRunner only used in unit test right now, since we replaced with embeded jvm.

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks!

@StrikeW StrikeW mentioned this pull request Feb 29, 2024
2 tasks
@StrikeW StrikeW added this pull request to the merge queue Feb 29, 2024
Merged via the queue into main with commit 0c2c2b9 Feb 29, 2024
27 of 29 checks passed
@StrikeW StrikeW deleted the siyuan/mongodb-source branch February 29, 2024 07:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants