sink(cloudstorage): add use-table-id-as-path option (#4356)#4594
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request adds a new Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the cloud storage sink by allowing users to configure data organization using numeric table IDs instead of table names. This change aims to improve data integrity by preventing issues with residual data when tables are dropped and recreated with the same name, and to optimize storage by skipping unnecessary database-level schema writes. Additionally, it strengthens the system's robustness by adding validation for DDL events and improving error handling in file path generation. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new configuration option, UseTableIDAsPath, for cloud storage sinks. When enabled, it modifies the file path generation logic to use table IDs instead of schema and table names for organizing schema and data files, and skips writing database-level schema files. Additionally, it adds validation for ActionExchangeTablePartition DDL events. A minor logging inconsistency was noted where shardID was used instead of workerID.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/sink/cloudstorage/config_test.go (1)
81-85: Add one malformed-value test foruse-table-id-as-path.Consider adding a case like
?use-table-id-as-path=not-booland asserting an error, so parser behavior is locked down for invalid inputs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/cloudstorage/config_test.go` around lines 81 - 85, Add a negative test case to the existing test table (next to the "sink uri with use-table-id-as-path" case) that uses the query param `use-table-id-as-path=not-bool` and sets expectedErr to a non-empty string; ensure the test invokes the same parser used by the file (the table-driven test that calls the config parsing function) and asserts that parsing returns an error (and optionally that the error message contains "use-table-id-as-path" or "invalid boolean") so malformed boolean values are rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/sink/cloudstorage/sink_test.go`:
- Around line 231-243: The test uses undefined ast.NewCIStr; replace all uses of
ast.NewCIStr with the existing parser_model.NewCIStr so the code
compiles—specifically update occurrences inside the tableInfo construction
(timodel.TableInfo, its Columns entries) and any other spots listed (around the
tableInfo variable and the later assertions at lines noted) to call
parser_model.NewCIStr instead of ast.NewCIStr.
In `@pkg/config/sink.go`:
- Around line 718-737: The compatibility bug: when applyParameterBySinkURI
parses a sink URI with the query param use-table-id-as-path it does not persist
that value into the SinkConfig (CloudStorageConfig.UseTableIDAsPath), so later
CheckUseTableIDAsPathCompatibility sees nil and allows an unintended flip; fix
applyParameterBySinkURI (and the analogous block around lines 1005-1056) to set
oldSinkConfig.CloudStorageConfig.UseTableIDAsPath = pointer(boolValue) (or
initialize CloudStorageConfig if nil) whenever the URI contains
use-table-id-as-path so the parsed boolean is stored in the SinkConfig and
CheckUseTableIDAsPathCompatibility will compare actual values instead of nil.
---
Nitpick comments:
In `@pkg/sink/cloudstorage/config_test.go`:
- Around line 81-85: Add a negative test case to the existing test table (next
to the "sink uri with use-table-id-as-path" case) that uses the query param
`use-table-id-as-path=not-bool` and sets expectedErr to a non-empty string;
ensure the test invokes the same parser used by the file (the table-driven test
that calls the config parsing function) and asserts that parsing returns an
error (and optionally that the error message contains "use-table-id-as-path" or
"invalid boolean") so malformed boolean values are rejected.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 7a7a0d7f-3acb-4efc-ac70-b0914cd8682a
📒 Files selected for processing (12)
api/v2/model.goapi/v2/model_test.godownstreamadapter/sink/cloudstorage/sink.godownstreamadapter/sink/cloudstorage/sink_test.godownstreamadapter/sink/cloudstorage/writer.gopkg/config/sink.gopkg/sink/cloudstorage/config.gopkg/sink/cloudstorage/config_test.gopkg/sink/cloudstorage/path.gopkg/sink/cloudstorage/path_test.gopkg/sink/cloudstorage/table_definition.gopkg/sink/cloudstorage/table_definition_test.go
| // CheckUseTableIDAsPathCompatibility checks the compatibility between sink config and sink URI. | ||
| func CheckUseTableIDAsPathCompatibility( | ||
| sinkConfig *SinkConfig, | ||
| useTableIDAsPathFromURI *bool, | ||
| ) error { | ||
| if sinkConfig == nil || | ||
| sinkConfig.CloudStorageConfig == nil || | ||
| sinkConfig.CloudStorageConfig.UseTableIDAsPath == nil || | ||
| useTableIDAsPathFromURI == nil { | ||
| return nil | ||
| } | ||
| useTableIDAsPathFromConfig := sinkConfig.CloudStorageConfig.UseTableIDAsPath | ||
| if util.GetOrZero(useTableIDAsPathFromConfig) == util.GetOrZero(useTableIDAsPathFromURI) { | ||
| return nil | ||
| } | ||
| return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs( | ||
| fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromURI)), | ||
| fmt.Sprintf("%s=%t", UseTableIDAsPathKey, util.GetOrZero(useTableIDAsPathFromConfig)), | ||
| ) | ||
| } |
There was a problem hiding this comment.
Persist use-table-id-as-path in applyParameterBySinkURI too.
This compatibility path only sees the new URI. A changefeed created with ...?use-table-id-as-path=true still leaves oldSinkConfig.CloudStorageConfig.UseTableIDAsPath == nil, so a later update that drops the query param looks compatible here and silently flips the sink back to name-based paths.
💡 Suggested fix
func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error {
if sinkURI == nil {
return nil
}
cfgInSinkURI := map[string]string{}
cfgInFile := map[string]string{}
params := sinkURI.Query()
@@
protocolFromURI := params.Get(ProtocolKey)
if protocolFromURI != "" {
if s.Protocol != nil && util.GetOrZero(s.Protocol) != protocolFromURI {
cfgInSinkURI[ProtocolKey] = protocolFromURI
cfgInFile[ProtocolKey] = util.GetOrZero(s.Protocol)
}
s.Protocol = util.AddressOf(protocolFromURI)
}
+
+ if IsStorageScheme(sinkURI.Scheme) {
+ useTableIDAsPathFromURI := params.Get(UseTableIDAsPathKey)
+ if useTableIDAsPathFromURI != "" {
+ enabled, err := strconv.ParseBool(useTableIDAsPathFromURI)
+ if err != nil {
+ return cerror.WrapError(cerror.ErrSinkURIInvalid, err)
+ }
+ if s.CloudStorageConfig == nil {
+ s.CloudStorageConfig = &CloudStorageConfig{}
+ }
+ if s.CloudStorageConfig.UseTableIDAsPath != nil &&
+ util.GetOrZero(s.CloudStorageConfig.UseTableIDAsPath) != enabled {
+ cfgInSinkURI[UseTableIDAsPathKey] = strconv.FormatBool(enabled)
+ cfgInFile[UseTableIDAsPathKey] = strconv.FormatBool(util.GetOrZero(s.CloudStorageConfig.UseTableIDAsPath))
+ }
+ s.CloudStorageConfig.UseTableIDAsPath = util.AddressOf(enabled)
+ }
+ }Also applies to: 1005-1056
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/config/sink.go` around lines 718 - 737, The compatibility bug: when
applyParameterBySinkURI parses a sink URI with the query param
use-table-id-as-path it does not persist that value into the SinkConfig
(CloudStorageConfig.UseTableIDAsPath), so later
CheckUseTableIDAsPathCompatibility sees nil and allows an unintended flip; fix
applyParameterBySinkURI (and the analogous block around lines 1005-1056) to set
oldSinkConfig.CloudStorageConfig.UseTableIDAsPath = pointer(boolValue) (or
initialize CloudStorageConfig if nil) whenever the URI contains
use-table-id-as-path so the parsed boolean is stored in the SinkConfig and
CheckUseTableIDAsPathCompatibility will compare actual values instead of nil.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: flowbehappy, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This is an automated cherry-pick of #4356
What problem does this PR solve?
Issue Number: close #4357
What is changed and how it works?
The 'use-table-id-as-path' configuration option ONLY applies to TICI.
use-table-id-as-path.use_table_id_as_pathinto API conversion and sink config parsing.In this mode, we adjust cloud storage path generation to omit schema when table-id-as-path is enabled and skip DB schema writes, that is:
For example:
The
use-table-id-as-pathoption switches the path to use table_id instead of table_name when it set totrue.With configuration
use-table-id-as-path=truein sink uri, for example:--sink-uri="s3://cdc&use-table-id-as-path=true", the cdc path changed fromto
The reason for this design is:
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
use-table-id-as-pathconfiguration parameter for cloud storage sink, enabling file organization by numeric table IDs instead of table names.Bug Fixes
Tests
Summary by CodeRabbit
New Features
use-table-id-as-pathconfiguration option for cloud storage sinks to organize schema files by table ID instead of schema and table names.Bug Fixes