Skip to content

Conversation

@XuPeng-SH
Copy link
Contributor

@XuPeng-SH XuPeng-SH commented Nov 9, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22718

What this PR does / why we need it:

The new implementation has undergone systemic upgrades in architecture, testing, monitoring, and error handling, bringing it much closer to production readiness: issues can be detected faster, errors are corrected or downgraded automatically, diagnosis and recovery are easier, and there is room for future feature expansion and performance tuning.


PR Type

Bug fix, Enhancement, Tests


Description

  • Complete CDC v2 architecture reimplementation with systemic upgrades in architecture, testing, monitoring, and error handling for production readiness

  • New state machine-based executor (ExecutorStateMachine) replacing simple boolean flags with explicit state transitions (Starting, Running, Paused, Failed, Cancelled) and comprehensive error handling

  • V2 MySQL sinker (mysqlSinker2) implementing producer-consumer pattern with structured commands, transaction state machine (IDLE, ACTIVE, COMMITTED, ROLLED_BACK), and atomic error handling

  • Table change stream pipeline (TableChangeStream) with dual-layer transaction safety via TransactionManager and DataProcessor, handling StaleRead errors gracefully with watermark reset

  • Enhanced watermark updater with three-tier cache architecture, circuit breaker pattern for commit failures, error metadata caching, and retry logic with automatic circuit breaker

  • Comprehensive observability framework (ProgressTracker, ProgressMonitor) with detailed metrics tracking, watermark lag measurement, and stuck table detection

  • Executor with transaction management implementing circuit breaker pattern, retry mechanism with exponential backoff, and automatic reconnection logic

  • Extensive test coverage including regression tests for critical bug fixes, comprehensive integration tests, and end-to-end scenarios with mocked dependencies

  • Improved error handling with unified error metadata parsing, retry decision making, and proper resource cleanup with sync.Once pattern

  • Enhanced observability with structured logging, state information tracking, and comprehensive CDC metrics collection


Diagram Walkthrough

flowchart LR
  A["CDC Executor<br/>State Machine"] -->|manages| B["Table Change Stream<br/>V2 Pipeline"]
  B -->|collects changes| C["Transaction Manager<br/>& Data Processor"]
  C -->|processes| D["MySQL Sinker V2<br/>Producer-Consumer"]
  D -->|executes SQL| E["Executor<br/>Circuit Breaker"]
  E -->|updates| F["Watermark Updater<br/>Error Handling"]
  F -->|tracks| G["Progress Tracker<br/>Observability"]
  A -->|monitors| G
Loading

File Walkthrough

Relevant files
Tests
9 files
sinker_v2_sql_builder_test.go
New comprehensive SQL builder test suite for CDC v2           

pkg/cdc/sinker_v2_sql_builder_test.go

  • Comprehensive test suite for CDCStatementBuilder covering INSERT and
    DELETE SQL generation
  • Tests for single and composite primary keys with MySQL and MO format
    support
  • Validation of SQL size limits, string escaping, NULL handling, and
    various data types
  • Tests for batch splitting when SQL exceeds maximum size constraints
+878/-0 
table_change_stream_test.go
New comprehensive integration tests for table change stream

pkg/cdc/table_change_stream_test.go

  • Comprehensive integration tests for TableChangeStream covering
    initialization and lifecycle
  • Tests for single and composite primary key handling
  • Stale read retry logic validation with context cancellation scenarios
  • Tests for pause/resume behavior and duplicate reader detection
  • End-to-end tests with mocked dependencies using gostub
+1032/-0
sinker_test.go
Simplified sinker tests with v2 architecture mocking         

pkg/cdc/sinker_test.go

  • Simplified test setup by mocking CreateMysqlSinker2 instead of
    low-level sink operations
  • Removed SQL mock expectations and database connection setup complexity
  • Updated test to focus on high-level sinker creation logic rather than
    SQL execution details
+11/-923
bug_fix_tests_test.go
New regression tests for critical bug fixes                           

pkg/cdc/bug_fix_tests_test.go

  • Added regression tests for TransactionTracker.UpdateToTs() ensuring
    watermark advances correctly
  • Tests validating that CommitTransaction uses latest toTs after
    multiple updates
  • Validation tests for empty batch detection in command processing
  • Tests for AtomicBatch.Close() behavior and fail-fast principles
+125/-0 
watermark_updater_retry_test.go
Tests for watermark retry count tracking and error metadata

pkg/cdc/watermark_updater_retry_test.go

  • New test file validating retry count tracking and persistence in
    watermark updater
  • Tests verify retry count increments correctly (1→2→3→4) and converts
    to non-retryable when exceeding MaxRetryCount
  • Validates non-retryable errors don't track retry count and error type
    changes reset the count
  • Uses helper function waitForErrorMetadata() to poll database for error
    metadata with timeout
+254/-0 
cdc_test.go
Comprehensive CDC test refactoring with state machine and mock
improvements

pkg/frontend/cdc_test.go

  • Added global stub for GetTableDetector to prevent panics across all
    tests
  • Enhanced test setup with createMockTableDetectorForTest() factory
    function
  • Updated test cases to use state machine transitions instead of
    isRunning flag
  • Added setupCDCTestStubs() helper to stub CDC operations for table
    stream tests
  • Renamed mockTableReader to mockChangeReader with updated interface
    methods
  • Improved mock sinker implementations to remove panic-inducing TODO
    stubs
  • Added proper cleanup and synchronization in executor lifecycle tests
+271/-81
sinker_v2_test.go
New MySQL sinker v2 comprehensive test coverage                   

pkg/cdc/sinker_v2_test.go

  • New comprehensive test suite for mysqlSinker2 error handling and
    transaction lifecycle
  • Tests cover error state management, transaction operations
    (begin/commit/rollback)
  • Validates command processing pipeline and reset behavior
  • Tests send methods and panic-safe close operations
  • Includes batch insert handling with success and failure scenarios
+626/-0 
sinker_v2_executor_test.go
New executor transaction and retry mechanism test suite   

pkg/cdc/sinker_v2_executor_test.go

  • New test suite for Executor transaction management and SQL execution
  • Tests cover transaction lifecycle (begin/commit/rollback) with
    idempotent operations
  • Validates retry logic with exponential backoff and circuit breaker
    patterns
  • Tests connection reestablishment and transaction cleanup on errors
  • Comprehensive coverage of error handling and state consistency
+685/-0 
reader_v2_data_processor_test.go
New data processor change handling test suite                       

pkg/cdc/reader_v2_data_processor_test.go

  • New test suite for DataProcessor covering change processing workflows
  • Tests snapshot, tail-in-progress, and tail-done change types
  • Validates transaction range management and atomic batch handling
  • Tests error handling and cleanup operations
  • Includes mock sinker implementation for data processor testing
+418/-0 
Enhancement
8 files
watermark_updater.go
Enhanced watermark updater with circuit breaker and error handling

pkg/cdc/watermark_updater.go

  • Added extensive documentation on watermark consistency model
    (lag-acceptable, advance-forbidden)
  • Implemented three-tier cache architecture with circuit breaker pattern
    for commit failures
  • Added error metadata caching to avoid synchronous SQL queries during
    error updates
  • Enhanced metrics collection and observability with detailed logging
    for watermark operations
  • Improved error handling with retry logic and automatic circuit breaker
    for persistent failures
+459/-42
cdc_exector.go
CDC executor state machine and error handling improvements

pkg/frontend/cdc_exector.go

  • Replaced simple isRunning boolean flag with ExecutorStateMachine for
    robust state management with explicit transitions (Starting, Running,
    Paused, Failed, Cancelled)
  • Implemented comprehensive error handling with state transitions,
    metrics recording, and proper cleanup of resources (readers, routines,
    table detector registrations)
  • Added stopAllReaders() method to synchronously stop all running
    readers with timeout protection before Pause/Cancel operations
  • Enhanced logging with structured fields and state information; updated
    log message format from "CDC-Task-*" to "cdc.frontend.task.*" pattern
  • Improved handleNewTables() to continue processing other tables on
    individual failures instead of returning immediately, with detailed
    error tracking and metrics
  • Added clearAllTableErrors() method to reset error messages during
    Resume, allowing retry of tables with non-retryable errors after user
    fixes
  • Refactored error checking logic to use unified ParseErrorMetadata()
    and ShouldRetry() functions with improved retry decision making
  • Changed from NewTableReader to NewTableChangeStream (V2 architecture)
    with frequency parameter support
+471/-58
sinker_v2.go
New V2 MySQL sinker with structured commands and state machine

pkg/cdc/sinker_v2.go

  • New file implementing mysqlSinker2 with improved architecture:
    structured commands, explicit transaction state machine, and atomic
    error handling
  • Implements producer-consumer pattern with command channel for async
    communication between reader and sinker goroutines
  • Transaction state machine with four states (IDLE, ACTIVE, COMMITTED,
    ROLLED_BACK) ensuring no transaction leaks
  • Comprehensive error handling with atomic error storage, preventing
    panics and ensuring thread-safe error propagation
  • Command handlers for BEGIN, COMMIT, ROLLBACK, INSERT, DELETE, and
    FLUSH operations with detailed logging and metrics
  • Proper resource cleanup with sync.Once pattern and graceful shutdown
    of consumer goroutine
  • Integration with ProgressTracker for observability and metrics
    recording (SQL execution, duration, throughput)
+886/-0 
table_change_stream.go
New table change stream V2 implementation for CDC pipeline

pkg/cdc/table_change_stream.go

  • New file implementing TableChangeStream as complete CDC data pipeline
    replacing old TableReader
  • Manages continuous table change monitoring with dual-layer transaction
    safety via TransactionManager and DataProcessor
  • Implements change collection, processing, and watermark management
    with detailed progress tracking
  • Handles StaleRead errors gracefully by resetting watermark and sinker
    state with retryable flag
  • Registers/unregisters with CDC detector and observability manager for
    monitoring active runners
  • Lazy error clearing on first successful data processing to preserve
    eventual consistency design
  • Comprehensive lifecycle management with cleanup of watermarks, error
    messages, and resource deregistration
+786/-0 
observability.go
New observability framework for CDC progress tracking       

pkg/cdc/observability.go

  • New file implementing ProgressTracker for detailed observability of
    CDC processing per table with state tracking, watermark management,
    and round-based metrics
  • Tracks initial sync state (not started, running, success, failed) with
    detailed metrics (rows, bytes, SQL count, duration)
  • Records processing rounds with success/failure tracking, throughput
    calculation, and watermark lag measurement
  • Implements ProgressMonitor for monitoring multiple trackers and
    detecting stuck tables based on state age and watermark update lag
  • Provides comprehensive statistics collection and periodic progress
    logging with configurable intervals
  • Integrates with Prometheus metrics (CdcInitialSyncStatusGauge,
    CdcThroughputRowsPerSecond, CdcWatermarkLagSeconds, etc.)
+698/-0 
sinker_v2_executor.go
New executor with transaction and retry management             

pkg/cdc/sinker_v2_executor.go

  • New Executor struct managing database connections, transactions, and
    SQL execution
  • Implements circuit breaker pattern for fault tolerance with
    configurable thresholds
  • Provides retry mechanism with exponential backoff and error
    classification
  • Ensures idempotent transaction operations (begin/commit/rollback)
  • Includes transaction state tracking and automatic reconnection logic
+633/-0 
cdc_metrics.go
New comprehensive CDC monitoring and observability metrics

pkg/util/metric/v2/cdc_metrics.go

  • New comprehensive CDC metrics collection covering task lifecycle,
    table streams, and data processing
  • Defines watermark metrics including lag tracking and circuit breaker
    events
  • Includes sinker metrics for transaction operations, SQL execution, and
    retry attempts
  • Adds health metrics for heartbeats and stuck table detection
  • Covers initial sync and performance metrics with appropriate histogram
    buckets
+413/-0 
cdc_handle.go
Enhanced CDC request logging and observability                     

pkg/frontend/cdc_handle.go

  • Added operation tracking variable to identify CDC request types
    (drop/pause/resume/restart)
  • Enhanced logging with structured fields including operation type,
    target status, and task name
  • Improved observability by logging CDC task requests with full context
    information
+22/-9   
Code refactoring
1 files
sinker.go
Refactored sinker to use new v2 architecture                         

pkg/cdc/sinker.go

  • Removed legacy MySQL sink initialization code and SQL construction
    logic
  • Simplified NewSinker to delegate to new CreateMysqlSinker2
    implementation
  • Updated console sinker logging to use structured logging format
  • Removed unused imports and constants related to old implementation
+19/-911
Additional files
46 files
CDC_USER_GUIDE.md +2188/-0
error_consumption_test.go +289/-0 
error_handler.go +203/-0 
error_handler_test.go +416/-0 
observability_manager.go +96/-0   
reader.go +0/-632 
reader_test.go +0/-995 
reader_test_helpers.go +180/-0 
reader_v2_change_collector.go +260/-0 
reader_v2_change_collector_test.go +390/-0 
reader_v2_data_processor.go +448/-0 
reader_v2_data_processor.go.tmp +444/-0 
reader_v2_data_processor_empty_snapshot_test.go +67/-0   
reader_v2_state.go +245/-0 
reader_v2_state_test.go +199/-0 
reader_v2_txn_manager.go +357/-0 
reader_v2_txn_manager_test.go +451/-0 
classifier.go +60/-0   
mysql_classifier.go +60/-0   
policy.go +197/-0 
policy_test.go +173/-0 
sinker_v2_command.go +225/-0 
sinker_v2_command_test.go +318/-0 
sinker_v2_sql_builder.go +447/-0 
sql_builder.go +25/-1   
sql_builder_clear_errors_test.go +77/-0   
table_change_stream_error_clear_test.go +378/-0 
table_scanner.go +68/-24 
types.go +42/-8   
util.go +31/-3   
watermark_updater_test.go +328/-1 
cause.go +7/-1     
cdc_cancel_concurrent_test.go +390/-0 
cdc_dao.go +13/-3   
cdc_start_cleanup_test.go +347/-0 
cdc_state_machine.go +291/-0 
cdc_state_machine_bug_test.go +99/-0   
cdc_state_machine_test.go +330/-0 
cdc_util.go +6/-1     
mysql_protocol_test.go +44/-25 
resultset.go +3/-1     
routine_test.go +28/-18 
show_account_test.go +31/-6   
daemon_task.go +75/-10 
mysql_task_storage.go +20/-1   
appender.go +0/-10   

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working kind/enhancement kind/refactor Code refactor Review effort 5/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants