Skip to content

Commit

Permalink
pkg/config, sink(ticdc): support output raw change event for mq and c…
Browse files Browse the repository at this point in the history
…loud storage sink (#11226)

close #11211
  • Loading branch information
CharlesCheung96 committed Jun 11, 2024
1 parent e3b0bc7 commit 3887861
Show file tree
Hide file tree
Showing 54 changed files with 1,174 additions and 104 deletions.
51 changes: 30 additions & 21 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
BasicPassword: c.Sink.PulsarConfig.BasicPassword,
AuthTLSCertificatePath: c.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: c.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: c.Sink.PulsarConfig.OutputRawChangeEvent,
}
if c.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &config.OAuth2{
Expand Down Expand Up @@ -402,6 +403,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
}
}
var mysqlConfig *config.MySQLConfig
Expand All @@ -427,13 +429,14 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
var debeziumConfig *config.DebeziumConfig
Expand Down Expand Up @@ -666,6 +669,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -708,6 +712,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
BasicPassword: cloned.Sink.PulsarConfig.BasicPassword,
AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: cloned.Sink.PulsarConfig.OutputRawChangeEvent,
}
if cloned.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &PulsarOAuth2{
Expand All @@ -722,13 +727,14 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
var debeziumConfig *DebeziumConfig
Expand Down Expand Up @@ -1194,6 +1200,7 @@ type PulsarConfig struct {
AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"`
AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"`
OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"`
OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"`
}

// PulsarOAuth2 is the configuration for OAuth2
Expand Down Expand Up @@ -1243,6 +1250,7 @@ type KafkaConfig struct {
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand All @@ -1266,13 +1274,14 @@ type MySQLConfig struct {

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
47 changes: 23 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -444,7 +444,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -1140,10 +1140,19 @@ func (t *SingleTableTxn) GetPhysicalTableID() int64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if !t.shouldSplitUpdateEvent(scheme) {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
// For MySQL Sink, all update events will be split into insert and delete at the puller side
// according to whether the changefeed is in safemode. We don't split update event here(in sink)
// since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062.
//
// For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control
// split behavior. TiCDC only output original change event if outputRawChangeEvent is true.
return nil
}

// Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false.
// Note it is only for backward compatibility, and we should remove this logic in the future.
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -1152,21 +1161,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand All @@ -1176,8 +1170,7 @@ func trySplitAndSortUpdateEvent(
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
log.Warn("skip emit nil event", zap.Any("event", e))
continue
}

Expand All @@ -1187,8 +1180,7 @@ func trySplitAndSortUpdateEvent(
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
log.Warn("skip emit empty row event", zap.Any("event", e))
continue
}

Expand Down Expand Up @@ -1222,7 +1214,7 @@ func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool {

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
Expand Down Expand Up @@ -1265,6 +1257,13 @@ func SplitUpdateEvent(
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
zap.String("schema", updateEvent.TableInfo.TableName.Schema),
zap.String("table", updateEvent.TableInfo.GetTableName()),
zap.Any("preCols", updateEvent.PreColumns),
zap.Any("cols", updateEvent.Columns))

return &deleteEvent, &insertEvent, nil
}

Expand Down
17 changes: 14 additions & 3 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,21 +604,32 @@ func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
outputRawChangeEvent := true
notOutputRawChangeEvent := false
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh
return nil
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
func (m *mockSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, false
}

func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang
return
}

// Scheme return the scheme of the sink.
func (s *DMLSink) Scheme() string {
return sink.BlackHoleScheme
// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, true
}

// Close do nothing.
Expand Down
26 changes: 14 additions & 12 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type eventFragment struct {
// messages to individual dmlWorkers.
// The dmlWorkers will write the encoded messages to external storage in parallel between different tables.
type DMLSink struct {
changefeedID model.ChangeFeedID
scheme string
changefeedID model.ChangeFeedID
scheme string
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -144,13 +145,14 @@ func NewDMLSink(ctx context.Context,

wgCtx, wgCancel := context.WithCancel(ctx)
s := &DMLSink{
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

Expand Down Expand Up @@ -295,7 +297,7 @@ func (s *DMLSink) Dead() <-chan struct{} {
return s.dead
}

// Scheme returns the sink scheme.
func (s *DMLSink) Scheme() string {
return s.scheme
// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return s.scheme, s.outputRawChangeEvent
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent(scheme string) error
TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/dmlsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error

// Scheme returns the sink scheme.
Scheme() string

// SchemeOption returns the sink scheme and whether the sink should output raw change event.
SchemeOption() (scheme string, outputRawChangeEvent bool)
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func NewKafkaDMLSink(
metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup,
protocol, scheme, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
Loading

0 comments on commit 3887861

Please sign in to comment.