Skip to content

Conversation

@XuPeng-SH
Copy link
Contributor

@XuPeng-SH XuPeng-SH commented Nov 10, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22718

What this PR does / why we need it:

cdc stale watermark cleanup 3.0


PR Type

Bug fix


Description

  • Add orphan watermark cleanup mechanism with periodic deletion

  • Implement cleanup period and warning threshold configuration

  • Change logging levels from Info to Debug for reduced verbosity

  • Add early context cancellation check in RequestMultipleCn

  • Remove automatic context cancellation on UnRegister


Diagram Walkthrough

flowchart LR
  A["scanTableLoop"] -->|"periodic trigger"| B["cleanupTicker"]
  B -->|"timeout event"| C["cleanupOrphanWatermarks"]
  C -->|"execute SQL"| D["DeleteOrphanWatermarkSQL"]
  D -->|"LEFT JOIN"| E["mo_cdc_watermark & mo_cdc_task"]
  E -->|"identify orphans"| F["Delete orphaned entries"]
  F -->|"log results"| G["Performance metrics"]
Loading

File Walkthrough

Relevant files
Enhancement
sql_builder.go
Add orphan watermark deletion SQL builder                               

pkg/cdc/sql_builder.go

  • Add new DeleteOrphanWatermarkSQL() method to generate SQL for cleaning
    orphaned watermarks
  • SQL performs LEFT JOIN between watermark and task tables to identify
    orphaned entries
+7/-0     
table_scanner.go
Implement periodic orphan watermark cleanup mechanism       

pkg/cdc/table_scanner.go

  • Add two new constants: DefaultWatermarkCleanupPeriod (10 minutes) and
    DefaultCleanupWarnThreshold (5 seconds)
  • Add cleanupPeriod and cleanupWarn fields to TableDetector struct
  • Initialize cleanup period and warning threshold in GetTableDetector
    function
  • Add validation logic for cleanup period with fallback to default value
  • Create new cleanupTicker in scanTableLoop to trigger periodic cleanup
  • Implement cleanupOrphanWatermarks() method to execute orphan watermark
    deletion with error handling and performance logging
  • Change logging levels from Info to Debug for Register, UnRegister,
    scanTableLoop, and callback success messages
  • Remove automatic context cancellation logic from UnRegister method
+92/-12 
Tests
table_scanner_test.go
Update test fixtures with cleanup configuration                   

pkg/cdc/table_scanner_test.go

  • Initialize cleanupPeriod and cleanupWarn fields in all TableDetector
    test instances
  • Set cleanup period to 1 hour in tests to prevent interference with
    test execution
+8/-0     
Bug fix
query_service.go
Add early context cancellation check                                         

pkg/queryservice/query_service.go

  • Add early context cancellation check at the beginning of
    RequestMultipleCn function
  • Return immediately if context is already canceled before processing
    requests
+5/-0     

@qodo-code-review
Copy link

qodo-code-review bot commented Nov 10, 2025

PR Compliance Guide 🔍

(Compliance updated until commit 3807943)

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action Logging: The cleanup of orphan watermarks performs a critical delete operation but logs at
debug/warn levels without explicit inclusion of user/task actor context, making audit
adequacy unclear from the diff.

Referred Code
sql := CDCSQLBuilder.DeleteOrphanWatermarkSQL()
cleanupCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

logutil.Debug(
	"cdc.table_detector.cleanup_watermark_execute",
	zap.String("sql", sql),
)
start := s.now()
res, err := s.exec.Exec(
	cleanupCtx,
	sql,
	executor.Options{}.
		WithStatementOption(
			executor.StatementOption{}.
				WithAccountID(catalog.System_Account).
				WithDisableLog(),
		),
)
if err != nil {
	logutil.Error(


 ... (clipped 26 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

Previous compliance checks

Compliance check up to commit 839a1c2
Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action Logging: The new orphan watermark cleanup operation executes deletions without emitting structured
audit logs that include actor identity and explicit outcome details required for audit
trails.

Referred Code
if s.exec == nil {
	logutil.Debug("cdc.table_detector.cleanup_watermark_skip", zap.String("reason", "executor nil"))
	return
}

sql := CDCSQLBuilder.DeleteOrphanWatermarkSQL()
cleanupCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

logutil.Debug(
	"cdc.table_detector.cleanup_watermark_execute",
	zap.String("sql", sql),
)
start := time.Now()
res, err := s.exec.Exec(
	cleanupCtx,
	sql,
	executor.Options{}.
		WithStatementOption(
			executor.StatementOption{}.
				WithAccountID(catalog.System_Account).


 ... (clipped 32 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
SQL Safety: The added SQL builder DeleteOrphanWatermarkSQL constructs a static DELETE with JOIN which
appears parameterless, but without visibility into execution context it's unclear if
multi-tenant constraints and authorization are enforced when executed.

Referred Code
func (b cdcSQLBuilder) DeleteOrphanWatermarkSQL() string {
	return "DELETE w FROM `mo_catalog`.`mo_cdc_watermark` AS w " +
		"LEFT JOIN `mo_catalog`.`mo_cdc_task` AS t " +
		"ON t.account_id = w.account_id AND t.task_id = w.task_id " +
		"WHERE t.task_id IS NULL"
}

Learn more about managing compliance generic rules or creating your own custom rules

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working Review effort 3/5 size/M Denotes a PR that changes [100,499] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants