From 58636ede29de7dac8a003724f9644de23a427668 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Fri, 21 Jun 2024 05:10:18 +0800 Subject: [PATCH] simple (ticdc): support send all tables bootstrap message at changefeed start (#11239) close pingcap/tiflow#11315 --- cdc/api/v2/model.go | 9 +++ cdc/api/v2/model_test.go | 1 + cdc/entry/schema_test_helper.go | 5 ++ cdc/owner/changefeed.go | 4 +- cdc/owner/changefeed_test.go | 11 ++- cdc/owner/ddl_manager.go | 76 +++++++++++++++---- cdc/owner/ddl_manager_test.go | 42 +++++++++- cdc/owner/ddl_sink.go | 14 +++- pkg/cmd/util/helper_test.go | 2 + pkg/config/config_test_data.go | 3 + pkg/config/replica_config.go | 1 + pkg/config/sink.go | 16 +++- pkg/config/sink_test.go | 26 +++++++ pkg/orchestrator/reactor_state_test.go | 3 + .../kafka_simple_basic/conf/changefeed.toml | 3 +- 15 files changed, 190 insertions(+), 26 deletions(-) diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index 6a224ca2ee3..4ce64cb79e5 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -497,6 +497,10 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( if c.Sink.SendBootstrapToAllPartition != nil { res.Sink.SendBootstrapToAllPartition = util.AddressOf(*c.Sink.SendBootstrapToAllPartition) } + + if c.Sink.SendAllBootstrapAtStart != nil { + res.Sink.SendAllBootstrapAtStart = util.AddressOf(*c.Sink.SendAllBootstrapAtStart) + } } if c.Mounter != nil { res.Mounter = &config.MounterConfig{ @@ -792,6 +796,10 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { res.Sink.SendBootstrapToAllPartition = util.AddressOf(*cloned.Sink.SendBootstrapToAllPartition) } + if cloned.Sink.SendAllBootstrapAtStart != nil { + res.Sink.SendAllBootstrapAtStart = util.AddressOf(*cloned.Sink.SendAllBootstrapAtStart) + } + if cloned.Sink.DebeziumDisableSchema != nil { res.Sink.DebeziumDisableSchema = util.AddressOf(*cloned.Sink.DebeziumDisableSchema) } @@ -957,6 +965,7 @@ type SinkConfig struct { SendBootstrapIntervalInSec *int64 `json:"send_bootstrap_interval_in_sec,omitempty"` SendBootstrapInMsgCount *int32 `json:"send_bootstrap_in_msg_count,omitempty"` SendBootstrapToAllPartition *bool `json:"send_bootstrap_to_all_partition,omitempty"` + SendAllBootstrapAtStart *bool `json:"send-all-bootstrap-at-start,omitempty"` DebeziumDisableSchema *bool `json:"debezium_disable_schema,omitempty"` DebeziumConfig *DebeziumConfig `json:"debezium,omitempty"` OpenProtocolConfig *OpenProtocolConfig `json:"open,omitempty"` diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index 9afc4e18687..6a6978d5279 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -62,6 +62,7 @@ var defaultAPIConfig = &ReplicaConfig{ SendBootstrapIntervalInSec: util.AddressOf(int64(120)), SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), + SendAllBootstrapAtStart: util.AddressOf(false), DebeziumDisableSchema: util.AddressOf(false), OpenProtocolConfig: &OpenProtocolConfig{OutputOldValue: true}, DebeziumConfig: &DebeziumConfig{OutputOldValue: true}, diff --git a/cdc/entry/schema_test_helper.go b/cdc/entry/schema_test_helper.go index 51ae839d43c..fba513ba08c 100644 --- a/cdc/entry/schema_test_helper.go +++ b/cdc/entry/schema_test_helper.go @@ -297,3 +297,8 @@ func (s *SchemaTestHelper) Close() { s.domain.Close() s.storage.Close() //nolint:errcheck } + +// SchemaStorage returns the schema storage +func (s *SchemaTestHelper) SchemaStorage() SchemaStorage { + return s.schemaStorage +} diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 18fc88bdb08..8572cc10236 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -716,7 +716,9 @@ LOOP2: c.schema, c.redoDDLMgr, c.redoMetaMgr, - util.GetOrZero(cfInfo.Config.BDRMode)) + util.GetOrZero(cfInfo.Config.BDRMode), + cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(), + ) // create scheduler cfg := *c.cfg diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index c384ecd590a..29b70772d8c 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -89,7 +89,7 @@ type mockDDLSink struct { // whether to record the DDL history, only for rename table recordDDLHistory bool // a slice of DDL history, only for rename table - ddlHistory []string + ddlHistory []*model.DDLEvent mu struct { sync.Mutex checkpointTs model.Ts @@ -117,7 +117,7 @@ func (m *mockDDLSink) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo } }() if m.recordDDLHistory { - m.ddlHistory = append(m.ddlHistory, ddl.Query) + m.ddlHistory = append(m.ddlHistory, ddl) } else { m.ddlHistory = nil } @@ -155,6 +155,13 @@ func (m *mockDDLSink) Barrier(ctx context.Context) error { return nil } +func (m *mockDDLSink) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error { + if m.recordDDLHistory { + m.ddlHistory = append(m.ddlHistory, bootstrap) + } + return nil +} + type mockScheduler struct { currentTables []model.TableID } diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 205d363e438..0adb7208f71 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -130,6 +130,9 @@ type ddlManager struct { BDRMode bool ddlResolvedTs model.Ts + + shouldSendAllBootstrapAtStart bool + bootstraped bool } func newDDLManager( @@ -143,6 +146,7 @@ func newDDLManager( redoManager redo.DDLManager, redoMetaManager redo.MetaManager, bdrMode bool, + shouldSendAllBootstrapAtStart bool, ) *ddlManager { log.Info("owner create ddl manager", zap.String("namespace", changefeedID.Namespace), @@ -152,19 +156,56 @@ func newDDLManager( zap.Bool("bdrMode", bdrMode)) return &ddlManager{ - changfeedID: changefeedID, - ddlSink: ddlSink, - filter: filter, - ddlPuller: ddlPuller, - schema: schema, - redoDDLManager: redoManager, - redoMetaManager: redoMetaManager, - startTs: startTs, - checkpointTs: checkpointTs, - ddlResolvedTs: startTs, - BDRMode: bdrMode, - pendingDDLs: make(map[model.TableName][]*model.DDLEvent), + changfeedID: changefeedID, + ddlSink: ddlSink, + filter: filter, + ddlPuller: ddlPuller, + schema: schema, + redoDDLManager: redoManager, + redoMetaManager: redoMetaManager, + startTs: startTs, + checkpointTs: checkpointTs, + ddlResolvedTs: startTs, + BDRMode: bdrMode, + pendingDDLs: make(map[model.TableName][]*model.DDLEvent), + shouldSendAllBootstrapAtStart: shouldSendAllBootstrapAtStart, + } +} + +func (m *ddlManager) checkAndSendBootstrapMsgs(ctx context.Context) (bool, error) { + if !m.shouldSendAllBootstrapAtStart || m.bootstraped { + return true, nil + } + start := time.Now() + defer func() { + log.Info("send bootstrap messages finished", + zap.Stringer("changefeed", m.changfeedID), + zap.Duration("cost", time.Since(start))) + }() + // Send bootstrap messages to downstream. + tableInfo, err := m.allTables(ctx) + if err != nil { + return false, errors.Trace(err) + } + log.Info("start to send bootstrap messages", + zap.Stringer("changefeed", m.changfeedID), + zap.Int("tables", len(tableInfo))) + + for _, table := range tableInfo { + if table.TableInfo.IsView() { + continue + } + ddlEvent := &model.DDLEvent{ + TableInfo: table, + IsBootstrap: true, + } + err := m.ddlSink.emitBootstrap(ctx, ddlEvent) + if err != nil { + return false, errors.Trace(err) + } } + m.bootstraped = true + return true, nil } // tick the ddlHandler, it does the following things: @@ -183,6 +224,14 @@ func (m *ddlManager) tick( m.justSentDDL = nil m.checkpointTs = checkpointTs + ok, err := m.checkAndSendBootstrapMsgs(ctx) + if err != nil { + return nil, nil, errors.Trace(err) + } + if !ok { + return nil, nil, nil + } + currentTables, err := m.allTables(ctx) if err != nil { return nil, nil, errors.Trace(err) @@ -483,8 +532,7 @@ func (m *ddlManager) barrier() *schedulepb.BarrierWithMinTs { return barrier } -// allTables returns all tables in the schema that -// less or equal than the checkpointTs. +// allTables returns all tables in the schema in current checkpointTs. func (m *ddlManager) allTables(ctx context.Context) ([]*model.TableInfo, error) { if m.tableInfoCache == nil { ts := m.getSnapshotTs() diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index 8287cebe851..e72ecd846d8 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -50,7 +50,9 @@ func createDDLManagerForTest(t *testing.T) *ddlManager { schema, redo.NewDisabledDDLManager(), redo.NewDisabledMetaManager(), - false) + false, + false, + ) return res } @@ -246,9 +248,9 @@ func TestExecRenameTablesDDL(t *testing.T) { } require.Len(t, mockDDLSink.ddlHistory, 2) require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`", - mockDDLSink.ddlHistory[0]) + mockDDLSink.ddlHistory[0].Query) require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`", - mockDDLSink.ddlHistory[1]) + mockDDLSink.ddlHistory[1].Query) // mock all rename table statements have been done mockDDLSink.resetDDLDone = false @@ -459,3 +461,37 @@ func TestIsGlobalDDL(t *testing.T) { require.Equal(t, c.ret, isGlobalDDL(c.ddl)) } } + +func TestCheckAndSendBootstrapMsgs(t *testing.T) { + helper := entry.NewSchemaTestHelper(t) + defer helper.Close() + ddl1 := helper.DDL2Event("create table test.tb1(id int primary key)") + ddl2 := helper.DDL2Event("create table test.tb2(id int primary key)") + + ctx := context.Background() + dm := createDDLManagerForTest(t) + dm.schema = helper.SchemaStorage() + dm.startTs, dm.checkpointTs = ddl2.CommitTs, ddl2.CommitTs + + mockDDLSink := dm.ddlSink.(*mockDDLSink) + mockDDLSink.recordDDLHistory = true + + // do not send all bootstrap messages + send, err := dm.checkAndSendBootstrapMsgs(ctx) + require.Nil(t, err) + require.True(t, send) + require.False(t, dm.bootstraped) + require.Equal(t, 0, len(mockDDLSink.ddlHistory)) + + // send all bootstrap messages -> tb1 and tb2 + dm.shouldSendAllBootstrapAtStart = true + send, err = dm.checkAndSendBootstrapMsgs(ctx) + require.Nil(t, err) + require.True(t, send) + require.True(t, dm.bootstraped) + require.Equal(t, 2, len(mockDDLSink.ddlHistory)) + require.True(t, mockDDLSink.ddlHistory[0].IsBootstrap) + require.True(t, mockDDLSink.ddlHistory[1].IsBootstrap) + require.Equal(t, ddl1.TableInfo.TableName, mockDDLSink.ddlHistory[0].TableInfo.TableName) + require.Equal(t, ddl2.TableInfo.TableName, mockDDLSink.ddlHistory[1].TableInfo.TableName) +} diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 57e804fcf59..4de195ac0b5 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -54,6 +54,9 @@ type DDLSink interface { // the caller of this function can call again and again until a true returned emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error) emitSyncPoint(ctx context.Context, checkpointTs uint64) error + // emitBootstrap emits the table bootstrap event in a blocking way. + // It will return after the bootstrap event is sent. + emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error // close the ddlsink, cancel running goroutine. close(ctx context.Context) error } @@ -121,10 +124,6 @@ func ddlSinkInitializer(ctx context.Context, a *ddlSinkImpl) error { return errors.Trace(err) } a.sink = s - - if !util.GetOrZero(a.info.Config.EnableSyncPoint) { - return nil - } return nil } @@ -472,3 +471,10 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { return result, nil } + +func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error { + if err := s.makeSinkReady(ctx); err != nil { + return errors.Trace(err) + } + return s.sink.WriteDDLEvent(ctx, bootstrap) +} diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index b6ee7dfef06..b436a0551e5 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -213,6 +213,7 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { SendBootstrapIntervalInSec: util.AddressOf(int64(120)), SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), + SendAllBootstrapAtStart: util.AddressOf(false), DebeziumDisableSchema: util.AddressOf(false), OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true}, Debezium: &config.DebeziumConfig{OutputOldValue: true}, @@ -253,6 +254,7 @@ func TestAndWriteStorageSinkTOML(t *testing.T) { SendBootstrapIntervalInSec: util.AddressOf(int64(120)), SendBootstrapInMsgCount: util.AddressOf(int32(10000)), SendBootstrapToAllPartition: util.AddressOf(true), + SendAllBootstrapAtStart: util.AddressOf(false), DebeziumDisableSchema: util.AddressOf(false), OpenProtocol: &config.OpenProtocolConfig{OutputOldValue: true}, Debezium: &config.DebeziumConfig{OutputOldValue: true}, diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 7739addd13b..7f9a5720b90 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -70,6 +70,7 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, + "send-all-bootstrap-at-start": false, "debezium-disable-schema": false, "open": { "output-old-value": true @@ -337,6 +338,7 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, + "send-all-bootstrap-at-start": false, "debezium-disable-schema": false, "open": { "output-old-value": true @@ -511,6 +513,7 @@ const ( "send-bootstrap-interval-in-sec": 120, "send-bootstrap-in-msg-count": 10000, "send-bootstrap-to-all-partition": true, + "send-all-bootstrap-at-start": false, "debezium-disable-schema": false, "open": { "output-old-value": true diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index c2c436a61ee..bcdd03560f4 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -77,6 +77,7 @@ var defaultReplicaConfig = &ReplicaConfig{ SendBootstrapIntervalInSec: util.AddressOf(DefaultSendBootstrapIntervalInSec), SendBootstrapInMsgCount: util.AddressOf(DefaultSendBootstrapInMsgCount), SendBootstrapToAllPartition: util.AddressOf(DefaultSendBootstrapToAllPartition), + SendAllBootstrapAtStart: util.AddressOf(DefaultSendAllBootstrapAtStart), DebeziumDisableSchema: util.AddressOf(false), OpenProtocol: &OpenProtocolConfig{OutputOldValue: true}, Debezium: &DebeziumConfig{OutputOldValue: true}, diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9954fa4f51f..915cbd2c097 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -89,6 +89,9 @@ const ( // DefaultSendBootstrapToAllPartition is the default value of // whether to send bootstrap message to all partitions. DefaultSendBootstrapToAllPartition = true + // DefaultSendAllBootstrapAtStart is the default value of whether + // to send all tables bootstrap message at changefeed start. + DefaultSendAllBootstrapAtStart = false // DefaultMaxReconnectToPulsarBroker is the default max reconnect times to pulsar broker. // The pulsar client uses an exponential backoff with jitter to reconnect to the broker. @@ -188,7 +191,8 @@ type SinkConfig struct { // If set to false, bootstrap message will only be sent to the first partition of each topic. // Default value is true. SendBootstrapToAllPartition *bool `toml:"send-bootstrap-to-all-partition" json:"send-bootstrap-to-all-partition,omitempty"` - + // SendAllBootstrapAtStart determines whether to send all tables bootstrap message at changefeed start. + SendAllBootstrapAtStart *bool `toml:"send-all-bootstrap-at-start" json:"send-all-bootstrap-at-start,omitempty"` // Debezium only. Whether schema should be excluded in the output. DebeziumDisableSchema *bool `toml:"debezium-disable-schema" json:"debezium-disable-schema,omitempty"` @@ -227,6 +231,16 @@ func (s *SinkConfig) ShouldSendBootstrapMsg() bool { util.GetOrZero(s.SendBootstrapInMsgCount) > 0 } +// ShouldSendAllBootstrapAtStart returns whether the should send all bootstrap message at changefeed start. +func (s *SinkConfig) ShouldSendAllBootstrapAtStart() bool { + if s == nil { + return false + } + should := s.ShouldSendBootstrapMsg() && util.GetOrZero(s.SendAllBootstrapAtStart) + log.Info("should send all bootstrap at start", zap.Bool("should", should)) + return should +} + // CSVConfig defines a series of configuration items for csv codec. type CSVConfig struct { // delimiter between fields, it can be 1 character or at most 2 characters diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index a0378f3c4d2..84da55bc5ff 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -437,3 +437,29 @@ func TestValidateAndAdjustStorageConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, 16, util.GetOrZero(s.Sink.FileIndexWidth)) } + +func TestShouldSendBootstrapMsg(t *testing.T) { + t.Parallel() + sinkConfig := GetDefaultReplicaConfig().Sink + require.False(t, sinkConfig.ShouldSendBootstrapMsg()) + + protocol := "simple" + sinkConfig.Protocol = &protocol + require.True(t, sinkConfig.ShouldSendBootstrapMsg()) + + count := int32(0) + sinkConfig.SendBootstrapInMsgCount = &count + require.False(t, sinkConfig.ShouldSendBootstrapMsg()) +} + +func TestShouldSendAllBootstrapAtStart(t *testing.T) { + t.Parallel() + sinkConfig := GetDefaultReplicaConfig().Sink + protocol := "simple" + sinkConfig.Protocol = &protocol + require.False(t, sinkConfig.ShouldSendAllBootstrapAtStart()) + + should := true + sinkConfig.SendAllBootstrapAtStart = &should + require.True(t, sinkConfig.ShouldSendAllBootstrapAtStart()) +} diff --git a/pkg/orchestrator/reactor_state_test.go b/pkg/orchestrator/reactor_state_test.go index 362a004df3b..1a8e232dde7 100644 --- a/pkg/orchestrator/reactor_state_test.go +++ b/pkg/orchestrator/reactor_state_test.go @@ -130,6 +130,7 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec, SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, + SendAllBootstrapAtStart: config.GetDefaultReplicaConfig().Sink.SendAllBootstrapAtStart, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, @@ -199,6 +200,7 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec, SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, + SendAllBootstrapAtStart: config.GetDefaultReplicaConfig().Sink.SendAllBootstrapAtStart, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, @@ -274,6 +276,7 @@ func TestChangefeedStateUpdate(t *testing.T) { SendBootstrapIntervalInSec: config.GetDefaultReplicaConfig().Sink.SendBootstrapIntervalInSec, SendBootstrapInMsgCount: config.GetDefaultReplicaConfig().Sink.SendBootstrapInMsgCount, SendBootstrapToAllPartition: config.GetDefaultReplicaConfig().Sink.SendBootstrapToAllPartition, + SendAllBootstrapAtStart: config.GetDefaultReplicaConfig().Sink.SendAllBootstrapAtStart, DebeziumDisableSchema: config.GetDefaultReplicaConfig().Sink.DebeziumDisableSchema, Debezium: config.GetDefaultReplicaConfig().Sink.Debezium, OpenProtocol: config.GetDefaultReplicaConfig().Sink.OpenProtocol, diff --git a/tests/integration_tests/kafka_simple_basic/conf/changefeed.toml b/tests/integration_tests/kafka_simple_basic/conf/changefeed.toml index 8d766ab7c00..970123ce497 100644 --- a/tests/integration_tests/kafka_simple_basic/conf/changefeed.toml +++ b/tests/integration_tests/kafka_simple_basic/conf/changefeed.toml @@ -4,4 +4,5 @@ corruption-handle-level = "error" [sink] send-bootstrap-interval-in-sec = 5 -send-bootstrap-in-msg-count = 100 \ No newline at end of file +send-bootstrap-in-msg-count = 100 +send-all-bootstrap-at-start = true \ No newline at end of file