diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 5bb6e356870..6a224ca2ee3 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -317,6 +317,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( BasicPassword: c.Sink.PulsarConfig.BasicPassword, AuthTLSCertificatePath: c.Sink.PulsarConfig.AuthTLSCertificatePath, AuthTLSPrivateKeyPath: c.Sink.PulsarConfig.AuthTLSPrivateKeyPath, + OutputRawChangeEvent: c.Sink.PulsarConfig.OutputRawChangeEvent, } if c.Sink.PulsarConfig.OAuth2 != nil { pulsarConfig.OAuth2 = &config.OAuth2{ @@ -402,6 +403,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, GlueSchemaRegistryConfig: glueSchemaRegistryConfig, + OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *config.MySQLConfig @@ -427,13 +429,14 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( var cloudStorageConfig *config.CloudStorageConfig if c.Sink.CloudStorageConfig != nil { cloudStorageConfig = &config.CloudStorageConfig{ - WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, - FileSize: c.Sink.CloudStorageConfig.FileSize, - OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: c.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: c.Sink.CloudStorageConfig.FlushInterval, + FileSize: c.Sink.CloudStorageConfig.FileSize, + OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent, } } var debeziumConfig *config.DebeziumConfig @@ -666,6 +669,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { CodecConfig: codeConfig, LargeMessageHandle: largeMessageHandle, GlueSchemaRegistryConfig: glueSchemaRegistryConfig, + OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent, } } var mysqlConfig *MySQLConfig @@ -708,6 +712,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { BasicPassword: cloned.Sink.PulsarConfig.BasicPassword, AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath, AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath, + OutputRawChangeEvent: cloned.Sink.PulsarConfig.OutputRawChangeEvent, } if cloned.Sink.PulsarConfig.OAuth2 != nil { pulsarConfig.OAuth2 = &PulsarOAuth2{ @@ -722,13 +727,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { var cloudStorageConfig *CloudStorageConfig if cloned.Sink.CloudStorageConfig != nil { cloudStorageConfig = &CloudStorageConfig{ - WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, - FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, - FileSize: cloned.Sink.CloudStorageConfig.FileSize, - OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, - FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, - FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, - FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount, + FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval, + FileSize: cloned.Sink.CloudStorageConfig.FileSize, + OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID, + FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays, + FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec, + FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency, + OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent, } } var debeziumConfig *DebeziumConfig @@ -1194,6 +1200,7 @@ type PulsarConfig struct { AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"` AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"` OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"` + OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"` } // PulsarOAuth2 is the configuration for OAuth2 @@ -1243,6 +1250,7 @@ type KafkaConfig struct { CodecConfig *CodecConfig `json:"codec_config,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"` GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // MySQLConfig represents a MySQL sink configuration @@ -1266,13 +1274,14 @@ type MySQLConfig struct { // CloudStorageConfig represents a cloud storage sink configuration type CloudStorageConfig struct { - WorkerCount *int `json:"worker_count,omitempty"` - FlushInterval *string `json:"flush_interval,omitempty"` - FileSize *int `json:"file_size,omitempty"` - OutputColumnID *bool `json:"output_column_id,omitempty"` - FileExpirationDays *int `json:"file_expiration_days,omitempty"` - FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` - FlushConcurrency *int `json:"flush_concurrency,omitempty"` + WorkerCount *int `json:"worker_count,omitempty"` + FlushInterval *string `json:"flush_interval,omitempty"` + FileSize *int `json:"file_size,omitempty"` + OutputColumnID *bool `json:"output_column_id,omitempty"` + FileExpirationDays *int `json:"file_expiration_days,omitempty"` + FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"` + FlushConcurrency *int `json:"flush_concurrency,omitempty"` + OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"` } // ChangefeedStatus holds common information of a changefeed in cdc diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 217df3ae825..7d12b07d9a0 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -272,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts { } // TrySplitAndSortUpdateEvent redo log do nothing -func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -444,7 +444,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 { } // TrySplitAndSortUpdateEvent do nothing -func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error { +func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error { return nil } @@ -1140,10 +1140,19 @@ func (t *SingleTableTxn) GetPhysicalTableID() int64 { } // TrySplitAndSortUpdateEvent split update events if unique key is updated -func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { - if !t.shouldSplitUpdateEvent(scheme) { +func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error { + if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent { + // For MySQL Sink, all update events will be split into insert and delete at the puller side + // according to whether the changefeed is in safemode. We don't split update event here(in sink) + // since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062. + // + // For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control + // split behavior. TiCDC only output original change event if outputRawChangeEvent is true. return nil } + + // Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false. + // Note it is only for backward compatibility, and we should remove this logic in the future. newRows, err := trySplitAndSortUpdateEvent(t.Rows) if err != nil { return errors.Trace(err) @@ -1152,21 +1161,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error { return nil } -// Whether split a single update event into delete and insert events? -// -// For the MySQL Sink, we don't split any update event. -// This may cause error like "duplicate entry" when sink to the downstream. -// This kind of error will cause the changefeed to restart, -// and then the related update rows will be splitted to insert and delete at puller side. -// -// For the Kafka and Storage sink, always split a single unique key changed update event, since: -// 1. Avro and CSV does not output the previous column values for the update event, so it would -// cause consumer missing data if the unique key changed event is not split. -// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split. -func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool { - return !sink.IsMySQLCompatibleScheme(sinkScheme) -} - // trySplitAndSortUpdateEvent try to split update events if unique key is updated // returns true if some updated events is split func trySplitAndSortUpdateEvent( @@ -1176,8 +1170,7 @@ func trySplitAndSortUpdateEvent( split := false for _, e := range events { if e == nil { - log.Warn("skip emit nil event", - zap.Any("event", e)) + log.Warn("skip emit nil event", zap.Any("event", e)) continue } @@ -1187,8 +1180,7 @@ func trySplitAndSortUpdateEvent( // begin; insert into t (id) values (1); delete from t where id=1; commit; // Just ignore these row changed events. if colLen == 0 && preColLen == 0 { - log.Warn("skip emit empty row event", - zap.Any("event", e)) + log.Warn("skip emit empty row event", zap.Any("event", e)) continue } @@ -1222,7 +1214,7 @@ func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool { // ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on // whether the handle key column or unique key has been modified. -// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. +// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event. func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool { // nil event will never be split. if updateEvent == nil { @@ -1265,6 +1257,13 @@ func SplitUpdateEvent( // NOTICE: clean up pre cols for insert event. insertEvent.PreColumns = nil + log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs), + zap.Uint64("commitTs", updateEvent.CommitTs), + zap.String("schema", updateEvent.TableInfo.TableName.Schema), + zap.String("table", updateEvent.TableInfo.GetTableName()), + zap.Any("preCols", updateEvent.PreColumns), + zap.Any("cols", updateEvent.Columns)) + return &deleteEvent, &insertEvent, nil } diff --git a/cdc/model/sink_test.go b/cdc/model/sink_test.go index cd285b3fff3..f77e0f2dff0 100644 --- a/cdc/model/sink_test.go +++ b/cdc/model/sink_test.go @@ -604,21 +604,32 @@ func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) { Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme) + outputRawChangeEvent := true + notOutputRawChangeEvent := false + err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 2) txn = &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn.Rows, 1) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn.Rows, 1) txn2 := &SingleTableTxn{ Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent}, } - err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent) + require.NoError(t, err) + require.Len(t, txn2.Rows, 2) + err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent) require.NoError(t, err) require.Len(t, txn2.Rows, 2) } diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 2a986ec133f..f94f889f8a5 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -52,8 +52,8 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh return nil } -func (m *mockSink) Scheme() string { - return sink.BlackHoleScheme +func (m *mockSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false } func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] { diff --git a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go index be470b6d357..83f321dcdfd 100644 --- a/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go +++ b/cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go @@ -47,9 +47,9 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang return } -// Scheme return the scheme of the sink. -func (s *DMLSink) Scheme() string { - return sink.BlackHoleScheme +// SchemeOption returns the scheme and the option. +func (s *DMLSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, true } // Close do nothing. diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index e1dca4941af..f93315305e6 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -72,8 +72,9 @@ type eventFragment struct { // messages to individual dmlWorkers. // The dmlWorkers will write the encoded messages to external storage in parallel between different tables. type DMLSink struct { - changefeedID model.ChangeFeedID - scheme string + changefeedID model.ChangeFeedID + scheme string + outputRawChangeEvent bool // last sequence number lastSeqNum uint64 // encodingWorkers defines a group of workers for encoding events. @@ -144,13 +145,14 @@ func NewDMLSink(ctx context.Context, wgCtx, wgCancel := context.WithCancel(ctx) s := &DMLSink{ - changefeedID: changefeedID, - scheme: strings.ToLower(sinkURI.Scheme), - encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), - workers: make([]*dmlWorker, cfg.WorkerCount), - statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), - cancel: wgCancel, - dead: make(chan struct{}), + changefeedID: changefeedID, + scheme: strings.ToLower(sinkURI.Scheme), + outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(), + encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), + workers: make([]*dmlWorker, cfg.WorkerCount), + statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink), + cancel: wgCancel, + dead: make(chan struct{}), } s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]() @@ -295,7 +297,7 @@ func (s *DMLSink) Dead() <-chan struct{} { return s.dead } -// Scheme returns the sink scheme. -func (s *DMLSink) Scheme() string { - return s.scheme +// SchemeOption returns the scheme and the option. +func (s *DMLSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } diff --git a/cdc/sink/dmlsink/event.go b/cdc/sink/dmlsink/event.go index 8df85cab249..7126be06bbe 100644 --- a/cdc/sink/dmlsink/event.go +++ b/cdc/sink/dmlsink/event.go @@ -23,7 +23,7 @@ type TableEvent interface { // GetCommitTs returns the commit timestamp of the event. GetCommitTs() uint64 // TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated - TrySplitAndSortUpdateEvent(scheme string) error + TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error } // CallbackFunc is the callback function for callbackable event. diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index 76a179ecc9c..660ed190019 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,10 +18,8 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - - // Scheme returns the sink scheme. - Scheme() string - + // SchemeOption returns the sink scheme and whether the sink should output raw change event. + SchemeOption() (scheme string, outputRawChangeEvent bool) // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. diff --git a/cdc/sink/dmlsink/mq/kafka_dml_sink.go b/cdc/sink/dmlsink/mq/kafka_dml_sink.go index 3841c2a7298..a0b0c4193cc 100644 --- a/cdc/sink/dmlsink/mq/kafka_dml_sink.go +++ b/cdc/sink/dmlsink/mq/kafka_dml_sink.go @@ -122,8 +122,8 @@ func NewKafkaDMLSink( metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient) dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh) encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) - s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, - eventRouter, trans, encoderGroup, protocol, scheme, errCh) + s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup, + protocol, scheme, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh) log.Info("DML sink producer created", zap.String("namespace", changefeedID.Namespace), zap.String("changefeedID", changefeedID.ID)) diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index e6d2105b69f..5bed1893625 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -70,7 +70,8 @@ type dmlSink struct { wg sync.WaitGroup dead chan struct{} - scheme string + scheme string + outputRawChangeEvent bool } func newDMLSink( @@ -84,6 +85,7 @@ func newDMLSink( encoderGroup codec.EncoderGroup, protocol config.Protocol, scheme string, + outputRawChangeEvent bool, errCh chan error, ) *dmlSink { ctx, cancel := context.WithCancelCause(ctx) @@ -91,13 +93,14 @@ func newDMLSink( worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics) s := &dmlSink{ - id: changefeedID, - protocol: protocol, - adminClient: adminClient, - ctx: ctx, - cancel: cancel, - dead: make(chan struct{}), - scheme: scheme, + id: changefeedID, + protocol: protocol, + adminClient: adminClient, + ctx: ctx, + cancel: cancel, + dead: make(chan struct{}), + scheme: scheme, + outputRawChangeEvent: outputRawChangeEvent, } s.alive.transformer = transformer s.alive.eventRouter = eventRouter @@ -236,6 +239,6 @@ func (s *dmlSink) Dead() <-chan struct{} { } // Scheme returns the scheme of this sink. -func (s *dmlSink) Scheme() string { - return s.scheme +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, s.outputRawChangeEvent } diff --git a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go index 089f521fd37..f9934b59147 100644 --- a/cdc/sink/dmlsink/mq/pulsar_dml_sink.go +++ b/cdc/sink/dmlsink/mq/pulsar_dml_sink.go @@ -123,8 +123,8 @@ func NewPulsarDMLSink( encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID) - s := newDMLSink(ctx, changefeedID, p, nil, topicManager, - eventRouter, trans, encoderGroup, protocol, scheme, errCh) + s := newDMLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, trans, encoderGroup, + protocol, scheme, pConfig.GetOutputRawChangeEvent(), errCh) return s, nil } diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index 964d11b6eca..f5b2cb78edb 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -180,6 +180,6 @@ func (s *dmlSink) Dead() <-chan struct{} { return s.dead } -func (s *dmlSink) Scheme() string { - return s.scheme +func (s *dmlSink) SchemeOption() (string, bool) { + return s.scheme, false } diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 167f30fc948..7e6b7d7154c 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -138,7 +138,7 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err resolvedCallbackableEvents := make([]*dmlsink.CallbackableEvent[E], 0, len(resolvedEvents)) for _, ev := range resolvedEvents { - if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.Scheme()); err != nil { + if err := ev.TrySplitAndSortUpdateEvent(e.backendSink.SchemeOption()); err != nil { return SinkInternalError{err} } // We have to record the event ID for the callback. diff --git a/cdc/sink/tablesink/table_sink_impl_test.go b/cdc/sink/tablesink/table_sink_impl_test.go index 87fd1af4abc..b7c55d38f52 100644 --- a/cdc/sink/tablesink/table_sink_impl_test.go +++ b/cdc/sink/tablesink/table_sink_impl_test.go @@ -50,8 +50,8 @@ func (m *mockEventSink) Dead() <-chan struct{} { return m.dead } -func (m *mockEventSink) Scheme() string { - return sink.BlackHoleScheme +func (m *mockEventSink) SchemeOption() (string, bool) { + return sink.BlackHoleScheme, false } // acknowledge the txn events by call the callback function. diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 4f605257db2..ad976303486 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1447,6 +1447,10 @@ var doc = `{ "output-column-id": { "type": "boolean" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1602,6 +1606,10 @@ var doc = `{ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1817,6 +1825,10 @@ var doc = `{ "description": "Set the operation timeout (default: 30 seconds)\nProducer-create, subscribe and unsubscribe operations will be retried until this interval, after which the\noperation will be marked as failed", "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "pulsar-producer-cache-size": { "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", "type": "integer" @@ -2452,6 +2464,9 @@ var doc = `{ "output_column_id": { "type": "boolean" }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2723,6 +2738,9 @@ var doc = `{ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2944,6 +2962,9 @@ var doc = `{ "operation-timeout": { "type": "integer" }, + "output-raw-change-event": { + "type": "boolean" + }, "pulsar-producer-cache-size": { "type": "integer" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 38f0c2008e5..8d4f5d1907a 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1428,6 +1428,10 @@ "output-column-id": { "type": "boolean" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "worker-count": { "type": "integer" } @@ -1583,6 +1587,10 @@ "max-message-bytes": { "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "partition-num": { "type": "integer" }, @@ -1798,6 +1806,10 @@ "description": "Set the operation timeout (default: 30 seconds)\nProducer-create, subscribe and unsubscribe operations will be retried until this interval, after which the\noperation will be marked as failed", "type": "integer" }, + "output-raw-change-event": { + "description": "OutputRawChangeEvent controls whether to split the update pk/uk events.", + "type": "boolean" + }, "pulsar-producer-cache-size": { "description": "PulsarProducerCacheSize is the size of the cache of pulsar producers", "type": "integer" @@ -2433,6 +2445,9 @@ "output_column_id": { "type": "boolean" }, + "output_raw_change_event": { + "type": "boolean" + }, "worker_count": { "type": "integer" } @@ -2704,6 +2719,9 @@ "max_message_bytes": { "type": "integer" }, + "output_raw_change_event": { + "type": "boolean" + }, "partition_num": { "type": "integer" }, @@ -2925,6 +2943,9 @@ "operation-timeout": { "type": "integer" }, + "output-raw-change-event": { + "type": "boolean" + }, "pulsar-producer-cache-size": { "type": "integer" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 11113b710e8..9cb8b63d7de 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -40,6 +40,10 @@ definitions: type: string output-column-id: type: boolean + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean worker-count: type: integer type: object @@ -147,6 +151,10 @@ definitions: $ref: '#/definitions/config.LargeMessageHandleConfig' max-message-bytes: type: integer + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean partition-num: type: integer read-timeout: @@ -306,6 +314,10 @@ definitions: Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the operation will be marked as failed type: integer + output-raw-change-event: + description: OutputRawChangeEvent controls whether to split the update pk/uk + events. + type: boolean pulsar-producer-cache-size: description: PulsarProducerCacheSize is the size of the cache of pulsar producers type: integer @@ -764,6 +776,8 @@ definitions: type: string output_column_id: type: boolean + output_raw_change_event: + type: boolean worker_count: type: integer type: object @@ -943,6 +957,8 @@ definitions: $ref: '#/definitions/v2.LargeMessageHandleConfig' max_message_bytes: type: integer + output_raw_change_event: + type: boolean partition_num: type: integer read_timeout: @@ -1088,6 +1104,8 @@ definitions: $ref: '#/definitions/v2.PulsarOAuth2' operation-timeout: type: integer + output-raw-change-event: + type: boolean pulsar-producer-cache-size: type: integer pulsar-version: diff --git a/errors.toml b/errors.toml index ac4413f463f..8ae2a3abbf1 100755 --- a/errors.toml +++ b/errors.toml @@ -916,6 +916,11 @@ error = ''' cdc server is not ready ''' +["CDC:ErrSinkIncompatibleConfig"] +error = ''' +incompatible configuration %s +''' + ["CDC:ErrSinkInvalidConfig"] error = ''' sink config invalid diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 44e1a3dbcd3..9954fa4f51f 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -134,8 +134,6 @@ type SinkConfig struct { // DispatchRules is only available when the downstream is MQ. DispatchRules []*DispatchRule `toml:"dispatchers" json:"dispatchers,omitempty"` - // CSVConfig is only available when the downstream is Storage. - CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` ColumnSelectors []*ColumnSelector `toml:"column-selectors" json:"column-selectors,omitempty"` // SchemaRegistry is only available when the downstream is MQ using avro protocol. @@ -194,9 +192,10 @@ type SinkConfig struct { // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"` + // CSVConfig is only available when the downstream is Storage. + CSVConfig *CSVConfig `toml:"csv" json:"csv,omitempty"` // OpenProtocol related configurations OpenProtocol *OpenProtocolConfig `toml:"open" json:"open,omitempty"` - // DebeziumConfig related configurations Debezium *DebeziumConfig `toml:"debezium" json:"debezium,omitempty"` } @@ -434,6 +433,17 @@ type KafkaConfig struct { CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (k *KafkaConfig) GetOutputRawChangeEvent() bool { + if k == nil || k.OutputRawChangeEvent == nil { + return false + } + return *k.OutputRawChangeEvent } // MaskSensitiveData masks sensitive data in KafkaConfig @@ -588,6 +598,9 @@ type PulsarConfig struct { // and 'type' always use 'client_credentials' OAuth2 *OAuth2 `toml:"oauth2" json:"oauth2,omitempty"` + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` + // BrokerURL is used to configure service brokerUrl for the Pulsar service. // This parameter is a part of the `sink-uri`. Internal use only. BrokerURL string `toml:"-" json:"-"` @@ -595,6 +608,14 @@ type PulsarConfig struct { SinkURI *url.URL `toml:"-" json:"-"` } +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (c *PulsarConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { + return false + } + return *c.OutputRawChangeEvent +} + // MaskSensitiveData masks sensitive data in PulsarConfig func (c *PulsarConfig) MaskSensitiveData() { if c.AuthenticationToken != nil { @@ -653,6 +674,17 @@ type CloudStorageConfig struct { FileExpirationDays *int `toml:"file-expiration-days" json:"file-expiration-days,omitempty"` FileCleanupCronSpec *string `toml:"file-cleanup-cron-spec" json:"file-cleanup-cron-spec,omitempty"` FlushConcurrency *int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + + // OutputRawChangeEvent controls whether to split the update pk/uk events. + OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` +} + +// GetOutputRawChangeEvent returns the value of OutputRawChangeEvent +func (c *CloudStorageConfig) GetOutputRawChangeEvent() bool { + if c == nil || c.OutputRawChangeEvent == nil { + return false + } + return *c.OutputRawChangeEvent } func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { @@ -796,21 +828,64 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { return err } - // Adjust that protocol is compatible with the scheme. For testing purposes, - // any protocol should be legal for blackhole. - if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { - _, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) - if err != nil { - return err - } - } else if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != nil { + log.Info("succeed to parse parameter from sink uri", + zap.String("protocol", util.GetOrZero(s.Protocol)), + zap.String("txnAtomicity", string(util.GetOrZero(s.TxnAtomicity)))) + + // Check that protocol config is compatible with the scheme. + if sink.IsMySQLCompatibleScheme(sinkURI.Scheme) && s.Protocol != nil { return cerror.ErrSinkURIInvalid.GenWithStackByArgs(fmt.Sprintf("protocol %s "+ "is incompatible with %s scheme", util.GetOrZero(s.Protocol), sinkURI.Scheme)) } + // For testing purposes, any protocol should be legal for blackhole. + if sink.IsMQScheme(sinkURI.Scheme) || sink.IsStorageScheme(sinkURI.Scheme) { + return s.ValidateProtocol(sinkURI.Scheme) + } + return nil +} - log.Info("succeed to parse parameter from sink uri", - zap.String("protocol", util.GetOrZero(s.Protocol)), - zap.String("txnAtomicity", string(util.GetOrZero(s.TxnAtomicity)))) +// ValidateProtocol validates the protocol configuration. +func (s *SinkConfig) ValidateProtocol(scheme string) error { + protocol, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) + if err != nil { + return err + } + outputOldValue := false + switch protocol { + case ProtocolOpen: + if s.OpenProtocol != nil { + outputOldValue = s.OpenProtocol.OutputOldValue + } + case ProtocolDebezium: + if s.Debezium != nil { + outputOldValue = s.Debezium.OutputOldValue + } + case ProtocolCsv: + if s.CSVConfig != nil { + outputOldValue = s.CSVConfig.OutputOldValue + } + case ProtocolAvro: + outputOldValue = false + default: + return nil + } + + outputRawChangeEvent := false + switch scheme { + case sink.KafkaScheme, sink.KafkaSSLScheme: + outputRawChangeEvent = s.KafkaConfig.GetOutputRawChangeEvent() + case sink.PulsarScheme, sink.PulsarSSLScheme: + outputRawChangeEvent = s.PulsarConfig.GetOutputRawChangeEvent() + default: + outputRawChangeEvent = s.CloudStorageConfig.GetOutputRawChangeEvent() + } + + if outputRawChangeEvent && !outputOldValue { + // TODO: return error if we do not need to keep backward compatibility. + log.Warn(fmt.Sprintf("TiCDC will not split the update pk/uk events if output-raw-change-event is true(scheme: %s).", scheme) + + fmt.Sprintf("It is not recommended to set output-old-value to false(protocol: %s) in this case.", protocol) + + "Otherwise, there may be data consistency issues in update pk/uk scenarios due to lack of old value information.") + } return nil } diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index e3a2fada8b5..cba2637a105 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -400,6 +400,10 @@ var ( "sink config invalid", errors.RFCCodeText("CDC:ErrSinkInvalidConfig"), ) + ErrSinkIncompatibleConfig = errors.Normalize( + "incompatible configuration %s", + errors.RFCCodeText("CDC:ErrSinkIncompatibleConfig"), + ) ErrCraftCodecInvalidData = errors.Normalize( "craft codec invalid data", errors.RFCCodeText("CDC:ErrCraftCodecInvalidData"), diff --git a/pkg/sink/codec/csv/csv_message.go b/pkg/sink/codec/csv/csv_message.go index ccea08f280f..32486b2fe82 100644 --- a/pkg/sink/codec/csv/csv_message.go +++ b/pkg/sink/codec/csv/csv_message.go @@ -174,6 +174,11 @@ func (c *csvMessage) decode(datums []types.Datum) error { } else { c.commitTs = 0 } + if c.config.OutputOldValue { + // When c.config.OutputOldValue, we need an extra column "is-updated". + // TODO: use this flag to guarantee data consistency in update uk/pk scenario. + dataColIdx++ + } c.columns = c.columns[:0] for i := dataColIdx; i < len(datums); i++ { diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml new file mode 100644 index 00000000000..ff4fe5d6765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml new file mode 100644 index 00000000000..96057f4dd48 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml new file mode 100644 index 00000000000..8edf2368fa4 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_clustered/sync_diff/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql new file mode 100644 index 00000000000..506a6e75765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/prepare.sql @@ -0,0 +1,27 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY CLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY CLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..ad6016c8059 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +"U","update_pk","test",450253245485940753,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..ebe3a635252 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,1,1,"example1" +"D","update_uk","test",450253245499047940,2,2,"example2" +"I","update_uk","test",450253245499047940,1,2,"example1" +"I","update_uk","test",450253245499047940,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,10,10,"example10" +"D","update_uk","test",450253245499047943,20,20,"example20" +"I","update_uk","test",450253245499047943,10,30,"example10" +"I","update_uk","test",450253245499047943,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,100,100,"example100" +"I","update_uk","test",450253245499047946,100,200,"example100" + +"U","update_uk","test",450253245512155140,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..5e7f2ce0e71 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450253245499047940,true,1,1,"example1" +"I","update_uk","test",450253245499047940,true,1,2,"example1" +"D","update_uk","test",450253245499047940,true,2,2,"example2" +"I","update_uk","test",450253245499047940,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450253245499047943,true,10,10,"example10" +"I","update_uk","test",450253245499047943,true,10,30,"example10" +"D","update_uk","test",450253245499047943,true,20,20,"example20" +"I","update_uk","test",450253245499047943,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450253245499047946,true,100,100,"example100" +"I","update_uk","test",450253245499047946,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..9f4448e5d9a --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450253245302439944,1,"example1" +"I","update_pk","test",450253245302439944,2,"example2" +"I","update_pk","test",450253245302439946,10,"example10" +"I","update_pk","test",450253245302439946,20,"example20" +"I","update_pk","test",450253245302439947,100,"example100" +"I","update_pk","test",450253245302439948,1000,"example1000" + +# output raw change event +# translate to normal update in upstream +"U","update_pk","test",450253245485940746,1,"example2" +"U","update_pk","test",450253245485940746,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,10,"example10" +"D","update_pk","test",450253245485940749,20,"example20" +"I","update_pk","test",450253245485940749,30,"example10" +"I","update_pk","test",450253245485940749,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,100,"example100" +"I","update_pk","test",450253245485940752,200,"example100" + +# normal update event +"U","update_pk","test",450253245485940753,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..66348746ded --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450253245446619144,1,1,"example1" +"I","update_uk","test",450253245446619144,2,2,"example2" +"I","update_uk","test",450253245446619146,10,10,"example10" +"I","update_uk","test",450253245446619146,20,20,"example20" +"I","update_uk","test",450253245446619147,100,100,"example100" +"I","update_uk","test",450253245446619148,1000,1000,"example1000" + +# output raw change event, data is consistent since replace by pk/uk +"U","update_uk","test",450253245499047940,1,2,"example1" +"U","update_uk","test",450253245499047940,2,1,"example2" +"U","update_uk","test",450253245499047943,10,30,"example10" +"U","update_uk","test",450253245499047943,20,40,"example20" +"U","update_uk","test",450253245499047946,100,200,"example100" +"U","update_uk","test",450253245512155140,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..fc3ea45b65d --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450253245302439944,false,1,"example1" +"I","update_pk","test",450253245302439944,false,2,"example2" +"I","update_pk","test",450253245302439946,false,10,"example10" +"I","update_pk","test",450253245302439946,false,20,"example20" +"I","update_pk","test",450253245302439947,false,100,"example100" +"I","update_pk","test",450253245302439948,false,1000,"example1000" + +# translate to normal update in upstream, split in csv encoder +"D","update_pk","test",450253245485940746,true,1,"example1" +"I","update_pk","test",450253245485940746,true,1,"example2" +"D","update_pk","test",450253245485940746,true,2,"example2" +"I","update_pk","test",450253245485940746,true,2,"example1" + +# split and sort in upstream +"D","update_pk","test",450253245485940749,false,10,"example10" +"D","update_pk","test",450253245485940749,false,20,"example20" +"I","update_pk","test",450253245485940749,false,30,"example10" +"I","update_pk","test",450253245485940749,false,40,"example20" + +# split and sort in upstream +"D","update_pk","test",450253245485940752,false,100,"example100" +"I","update_pk","test",450253245485940752,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450253245485940753,true,1000,"example1000" +"I","update_pk","test",450253245485940753,true,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..ea644868640 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450253245446619144,false,1,1,"example1" +"I","update_uk","test",450253245446619144,false,2,2,"example2" +"I","update_uk","test",450253245446619146,false,10,10,"example10" +"I","update_uk","test",450253245446619146,false,20,20,"example20" +"I","update_uk","test",450253245446619147,false,100,100,"example100" +"I","update_uk","test",450253245446619148,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450253245499047940,false,1,1,"example1" +"D","update_uk","test",450253245499047940,false,2,2,"example2" +"I","update_uk","test",450253245499047940,false,1,2,"example1" +"I","update_uk","test",450253245499047940,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450253245499047943,false,10,10,"example10" +"D","update_uk","test",450253245499047943,false,20,20,"example20" +"I","update_uk","test",450253245499047943,false,10,30,"example10" +"I","update_uk","test",450253245499047943,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450253245499047946,false,100,100,"example100" +"I","update_uk","test",450253245499047946,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450253245512155140,true,1000,1000,"example1000" +"I","update_uk","test",450253245512155140,true,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_clustered/run.sh b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh new file mode 100644 index 00000000000..8606518da0a --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_clustered/run.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100 + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 5 + run_changefeed "changefeed2" $start_ts 5 + run_changefeed "changefeed3" $start_ts 5 + run_changefeed "changefeed4" $start_ts 10 +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml new file mode 100644 index 00000000000..ff4fe5d6765 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed1.toml @@ -0,0 +1,27 @@ +# Case 1: default configuration where `csv.output-old-value=false` and `sink.cloud-storage-config.output-raw-change-event=false` +# Split and sort update pk/uk events in table sink. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml new file mode 100644 index 00000000000..96057f4dd48 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed2.toml @@ -0,0 +1,26 @@ +# Case 2: Split all update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml new file mode 100644 index 00000000000..033e2bd1a4f --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed3.toml @@ -0,0 +1,26 @@ +# Case 3: Don't split any update event + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = true + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = false \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml new file mode 100644 index 00000000000..112cfe4a012 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/changefeed4.toml @@ -0,0 +1,26 @@ +# Case 4: Split and sort update pk/uk events in table sink. Split other update events in csv encoder. + +[filter] +rules = ['test.*'] + +[sink] +protocol = "csv" +# Line terminator. Empty value means "\r\n" (CRLF) is line terminators. The default value is empty. +terminator = "\n" +# Directory date separator, Optional values are `none`, `year`, `month`, `date`. The default value is none. +date-separator = 'day' + +[sink.cloud-storage-config] +output-raw-change-event = false + +[sink.csv] +# Delimiter between fields. Must be ASCII characters. The default value is ','. +delimiter = ',' +# Quoting character. Empty value means no quoting. The default value is '"'. +quote = '"' +# Representation of null values in CSV files, the default value is '\N' +null = '\N' +# Include commit-ts in the row data. The default value is false. +include-commit-ts = true +# Whether to output the value before the row data changes. The default value is false. +output-old-value = true \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml new file mode 100644 index 00000000000..0714c0c18d9 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/conf/diff_config.toml @@ -0,0 +1,29 @@ +# diff Configuration. + +check-thread-count = 4 + +export-fix-sql = true + +check-struct-only = false + +[task] + output-dir = "/tmp/tidb_cdc_test/csv_storage_update_pk_nonclustered/sync_diff-/output" + + source-instances = ["mysql1"] + + target-instance = "tidb0" + + target-check-tables = ["test.?*"] + +[data-sources] +[data-sources.mysql1] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + +[data-sources.tidb0] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql new file mode 100644 index 00000000000..f3cd4ca4d24 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/prepare.sql @@ -0,0 +1,28 @@ +drop database if exists `test`; +create database `test`; +use `test`; + +CREATE TABLE `update_pk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `pad` varchar(100) NOT NULL +); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1, 'example1'), (2, 'example2'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (10, 'example10'), (20, 'example20'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (100, 'example100'); +INSERT INTO `update_pk` (`id`, `pad`) VALUES (1000, 'example1000'); + + +SHOW INDEX FROM update_pk; + +CREATE TABLE `update_uk` ( + `id` int PRIMARY KEY NONCLUSTERED, + `uk` int NOT NULL, + `pad` varchar(100) NOT NULL, + UNIQUE KEY `uk` (`uk`) +); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1, 1, 'example1'), (2, 2, 'example2'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (10, 10, 'example10'), (20, 20, 'example20'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (100, 100, 'example100'); +INSERT INTO `update_uk` (`id`, `uk`, `pad`) VALUES (1000, 1000, 'example1000'); + +SHOW INDEX FROM update_uk; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql new file mode 100644 index 00000000000..86e7d7e7d77 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/data/run.sql @@ -0,0 +1,40 @@ +USE `test`; + +-- update_pk -- + +BEGIN; -- Note: multi-row exchange +UPDATE update_pk SET id = 3 WHERE id = 1; +UPDATE update_pk SET id = 1 WHERE id = 2; +UPDATE update_pk SET id = 2 WHERE id = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_pk SET id = 30 WHERE id = 10; +UPDATE update_pk SET id = 40 WHERE id = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_pk SET id = 200 WHERE id = 100; +COMMIT; + +-- Normal update +UPDATE update_pk SET pad='example1001' WHERE id = 1000; + +-- update_uk -- +BEGIN; -- Note: multi-row exchange +UPDATE update_uk SET uk = 3 WHERE uk = 1; +UPDATE update_uk SET uk = 1 WHERE uk = 2; +UPDATE update_uk SET uk = 2 WHERE uk = 3; +COMMIT; + +BEGIN; -- Note: multi-row update with no order dependency +UPDATE update_uk SET uk = 30 WHERE uk = 10; +UPDATE update_uk SET uk = 40 WHERE uk = 20; +COMMIT; + +BEGIN; -- Single row update +UPDATE update_uk SET uk = 200 WHERE uk = 100; +COMMIT; + +-- Normal update +UPDATE update_uk SET pad='example1001' WHERE uk = 1000; \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res new file mode 100644 index 00000000000..08f6eedb804 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_pk.res @@ -0,0 +1,24 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,1,"example1" +"D","update_pk","test",450250823807270922,2,"example2" +"I","update_pk","test",450250823807270922,2,"example1" +"I","update_pk","test",450250823807270922,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,10,"example10" +"D","update_pk","test",450250823807270925,20,"example20" +"I","update_pk","test",450250823807270925,30,"example10" +"I","update_pk","test",450250823807270925,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,100,"example100" +"I","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res new file mode 100644 index 00000000000..b26f2219af2 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed1_uk.res @@ -0,0 +1,24 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,1,1,"example1" +"D","update_uk","test",450250823807270931,2,2,"example2" +"I","update_uk","test",450250823807270931,1,2,"example1" +"I","update_uk","test",450250823807270931,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,10,10,"example10" +"D","update_uk","test",450250823820115970,20,20,"example20" +"I","update_uk","test",450250823820115970,10,30,"example10" +"I","update_uk","test",450250823820115970,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,100,100,"example100" +"I","update_uk","test",450250823820115973,100,200,"example100" + +"U","update_uk","test",450250823820115977,1000,1000,"example1001" diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res new file mode 100644 index 00000000000..e2713a94f63 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_pk.res @@ -0,0 +1,28 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split in csv encoder +# DIFF_RES: REPLACE INTO `test`.`update_pk`(`id`,`pad`) VALUES (2,'example1'); +# lost id=2 since delete are not sorted before insert within single txn +"D","update_pk","test",450250823807270922,true,1,"example1" +"I","update_pk","test",450250823807270922,true,2,"example1" +"D","update_pk","test",450250823807270922,true,2,"example2" +"I","update_pk","test",450250823807270922,true,1,"example2" + +# split in csv encoder +"D","update_pk","test",450250823807270925,true,10,"example10" +"I","update_pk","test",450250823807270925,true,30,"example10" +"D","update_pk","test",450250823807270925,true,20,"example20" +"I","update_pk","test",450250823807270925,true,40,"example20" + +# split in csv encoder +"D","update_pk","test",450250823807270927,true,100,"example100" +"I","update_pk","test",450250823807270927,true,200,"example100" + +# normal update event, also split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res new file mode 100644 index 00000000000..1783ee5a0dd --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed2_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split in csv encoder, data is consistent since delete by pk +"D","update_uk","test",450250823807270931,true,1,1,"example1" +"I","update_uk","test",450250823807270931,true,1,2,"example1" +"D","update_uk","test",450250823807270931,true,2,2,"example2" +"I","update_uk","test",450250823807270931,true,2,1,"example2" + +# split in csv encoder +"D","update_uk","test",450250823820115970,true,10,10,"example10" +"I","update_uk","test",450250823820115970,true,10,30,"example10" +"D","update_uk","test",450250823820115970,true,20,20,"example20" +"I","update_uk","test",450250823820115970,true,20,40,"example20" + +# split in csv encoder +"D","update_uk","test",450250823820115973,true,100,100,"example100" +"I","update_uk","test",450250823820115973,true,100,200,"example100" + +# normal update event, also split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res new file mode 100644 index 00000000000..54595b6f49b --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_pk.res @@ -0,0 +1,22 @@ +"I","update_pk","test",450250823741472787,1,"example1" +"I","update_pk","test",450250823741472787,2,"example2" +"I","update_pk","test",450250823741472790,10,"example10" +"I","update_pk","test",450250823741472790,20,"example20" +"I","update_pk","test",450250823741472791,100,"example100" +"I","update_pk","test",450250823741472792,1000,"example1000" + +# output raw change event +"U","update_pk","test",450250823807270922,2,"example1" +"U","update_pk","test",450250823807270922,1,"example2" + + +# DIFF_RES: +# DELETE FROM `test`.`update_pk` WHERE `id` = 10 AND `pad` = 'example10' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 20 AND `pad` = 'example20' LIMIT 1; +# DELETE FROM `test`.`update_pk` WHERE `id` = 100 AND `pad` = 'example100' LIMIT 1; +# old row is not deleted since lack of old value +"U","update_pk","test",450250823807270925,30,"example10" +"U","update_pk","test",450250823807270925,40,"example20" +"U","update_pk","test",450250823807270927,200,"example100" + +"U","update_pk","test",450250823807270928,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res new file mode 100644 index 00000000000..ed01246a1ed --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed3_uk.res @@ -0,0 +1,14 @@ +"I","update_uk","test",450250823780794385,1,1,"example1" +"I","update_uk","test",450250823780794385,2,2,"example2" +"I","update_uk","test",450250823780794387,10,10,"example10" +"I","update_uk","test",450250823780794387,20,20,"example20" +"I","update_uk","test",450250823780794389,100,100,"example100" +"I","update_uk","test",450250823780794390,1000,1000,"example1000" + +# output raw change event, data is consistent since replace by pk/uk +"U","update_uk","test",450250823807270931,1,2,"example1" +"U","update_uk","test",450250823807270931,2,1,"example2" +"U","update_uk","test",450250823820115970,10,30,"example10" +"U","update_uk","test",450250823820115970,20,40,"example20" +"U","update_uk","test",450250823820115973,100,200,"example100" +"U","update_uk","test",450250823820115977,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res new file mode 100644 index 00000000000..79dc50dbf3c --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_pk.res @@ -0,0 +1,26 @@ +"I","update_pk","test",450250823741472787,false,1,"example1" +"I","update_pk","test",450250823741472787,false,2,"example2" +"I","update_pk","test",450250823741472790,false,10,"example10" +"I","update_pk","test",450250823741472790,false,20,"example20" +"I","update_pk","test",450250823741472791,false,100,"example100" +"I","update_pk","test",450250823741472792,false,1000,"example1000" + +# split and sort in table sink +"D","update_pk","test",450250823807270922,false,1,"example1" +"D","update_pk","test",450250823807270922,false,2,"example2" +"I","update_pk","test",450250823807270922,false,2,"example1" +"I","update_pk","test",450250823807270922,false,1,"example2" + +# split and sort in table sink +"D","update_pk","test",450250823807270925,false,10,"example10" +"D","update_pk","test",450250823807270925,false,20,"example20" +"I","update_pk","test",450250823807270925,false,30,"example10" +"I","update_pk","test",450250823807270925,false,40,"example20" + +# split and sort in table sink +"D","update_pk","test",450250823807270927,false,100,"example100" +"I","update_pk","test",450250823807270927,false,200,"example100" + +# normal update event, split in csv encoder +"D","update_pk","test",450250823807270928,true,1000,"example1000" +"I","update_pk","test",450250823807270928,true,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res new file mode 100644 index 00000000000..2225f55ff95 --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/result/changefeed4_uk.res @@ -0,0 +1,26 @@ +"I","update_uk","test",450250823780794385,false,1,1,"example1" +"I","update_uk","test",450250823780794385,false,2,2,"example2" +"I","update_uk","test",450250823780794387,false,10,10,"example10" +"I","update_uk","test",450250823780794387,false,20,20,"example20" +"I","update_uk","test",450250823780794389,false,100,100,"example100" +"I","update_uk","test",450250823780794390,false,1000,1000,"example1000" + +# split and sort in table sink +"D","update_uk","test",450250823807270931,false,1,1,"example1" +"D","update_uk","test",450250823807270931,false,2,2,"example2" +"I","update_uk","test",450250823807270931,false,1,2,"example1" +"I","update_uk","test",450250823807270931,false,2,1,"example2" + +# split and sort in table sink +"D","update_uk","test",450250823820115970,false,10,10,"example10" +"D","update_uk","test",450250823820115970,false,20,20,"example20" +"I","update_uk","test",450250823820115970,false,10,30,"example10" +"I","update_uk","test",450250823820115970,false,20,40,"example20" + +# split and sort in table sink +"D","update_uk","test",450250823820115973,false,100,100,"example100" +"I","update_uk","test",450250823820115973,false,100,200,"example100" + +# normal update event, split in csv encoder +"D","update_uk","test",450250823820115977,true,1000,1000,"example1000" +"I","update_uk","test",450250823820115977,true,1000,1000,"example1001" \ No newline at end of file diff --git a/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh new file mode 100644 index 00000000000..adbf3392ffd --- /dev/null +++ b/tests/integration_tests/csv_storage_update_pk_nonclustered/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run_changefeed() { + local changefeed_id=$1 + local start_ts=$2 + local expected_split_count=$3 + local should_pass_check=$4 + SINK_URI="file://$WORK_DIR/storage_test/$changefeed_id?flush-interval=5s" + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --config=$CUR/conf/$changefeed_id.toml -c "$changefeed_id" + + run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/$changefeed_id.toml $changefeed_id + sleep 8 + + cp $CUR/conf/diff_config.toml $WORK_DIR/diff_config.toml + sed -i "s//$changefeed_id/" $WORK_DIR/diff_config.toml + if [[ $should_pass_check == true ]]; then + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 100 + else + check_sync_diff $WORK_DIR $WORK_DIR/diff_config.toml 30 && exit 1 || echo "check_sync_diff failed as expected for $changefeed_id" + fi + + real_split_count=$(grep "split update event" $WORK_DIR/cdc.log | wc -l) + if [[ $real_split_count -ne $expected_split_count ]]; then + echo "expected split count $expected_split_count, real split count $real_split_count" + exit 1 + fi + run_sql "drop database if exists test" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} +} + +function run() { + if [ "$SINK_TYPE" != "storage" ]; then + return + fi + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1}) + + run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql_file $CUR/data/run.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + run_changefeed "changefeed1" $start_ts 10 true + # changefeed2 fail since delete events are not sorted + run_changefeed "changefeed2" $start_ts 10 false + # changefeed3 fail since update pk/uk events are not split + run_changefeed "changefeed3" $start_ts 10 false + run_changefeed "changefeed4" $start_ts 20 true +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/integration_tests/run_group.sh b/tests/integration_tests/run_group.sh index bbed3165515..ebcb416811e 100755 --- a/tests/integration_tests/run_group.sh +++ b/tests/integration_tests/run_group.sh @@ -19,7 +19,7 @@ kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error kafka_only_protocol="kafka_simple_basic kafka_simple_basic_avro kafka_simple_handle_key_only kafka_simple_handle_key_only_avro kafka_simple_claim_check kafka_simple_claim_check_avro canal_json_adapter_compatibility canal_json_basic canal_json_content_compatible multi_topics avro_basic canal_json_handle_key_only open_protocol_handle_key_only canal_json_claim_check open_protocol_claim_check" kafka_only_v2="kafka_big_messages_v2 multi_tables_ddl_v2 multi_topics_v2" -storage_only="lossy_ddl storage_csv_update" +storage_only="lossy_ddl storage_csv_update csv_storage_update_pk_clustered csv_storage_update_pk_nonclustered" storage_only_csv="storage_cleanup csv_storage_basic csv_storage_multi_tables_ddl csv_storage_partition_table" storage_only_canal_json="canal_json_storage_basic canal_json_storage_partition_table"