diff --git a/cmd/storage-consumer/consumer.go b/cmd/storage-consumer/consumer.go index e3ee2d6eb5..ce2fec450b 100644 --- a/cmd/storage-consumer/consumer.go +++ b/cmd/storage-consumer/consumer.go @@ -35,6 +35,9 @@ import ( "github.com/pingcap/ticdc/pkg/sink/codec/csv" putil "github.com/pingcap/ticdc/pkg/util" "github.com/pingcap/tidb/br/pkg/storage" + timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -510,6 +513,42 @@ func (c *consumer) mustGetTableDef(key cloudstorage.SchemaPathKey) cloudstorage. return *tableDef } +func getRenameTableOldTableKey(tableDef cloudstorage.TableDefinition) (string, bool) { + if tableDef.Type != byte(timodel.ActionRenameTable) { + return "", false + } + schemaName := tableDef.Schema + tableName := tableDef.Table + stmt, err := parser.New().ParseOneStmt(tableDef.Query, "", "") + if err != nil { + log.Panic("parse statement failed", zap.Any("DDL", tableDef.Query), zap.Error(err)) + } + // The query in job maybe "RENAME TABLE table1 to table2" + renameStmt, ok := stmt.(*ast.RenameTableStmt) + if !ok || len(renameStmt.TableToTables) == 0 { + log.Panic("invalid rename table statement", zap.Any("DDL", tableDef.Query)) + } + oldTable := renameStmt.TableToTables[0].OldTable + if oldTable.Schema.O != "" { + schemaName = oldTable.Schema.O + } + tableName = oldTable.Name.O + return commonType.QuoteSchema(schemaName, tableName), true +} + +func (c *consumer) updateTableDDLWatermark(tableDef cloudstorage.TableDefinition) string { + key := commonType.QuoteSchema(tableDef.Schema, tableDef.Table) + if c.tableDDLWatermark[key] < tableDef.TableVersion { + c.tableDDLWatermark[key] = tableDef.TableVersion + } + if oldTableKey, ok := getRenameTableOldTableKey(tableDef); ok { + if c.tableDDLWatermark[oldTableKey] < tableDef.TableVersion { + c.tableDDLWatermark[oldTableKey] = tableDef.TableVersion + } + } + return key +} + func (c *consumer) handleNewFiles( ctx context.Context, dmlFileMap map[cloudstorage.DmlPathKey]fileIndexRange, @@ -582,12 +621,13 @@ func (c *consumer) handleNewFiles( if err := c.sink.WriteBlockEvent(ddlEvent); err != nil { return errors.Trace(err) } - c.tableDDLWatermark[tableKey] = key.TableVersion + watermarkKey := c.updateTableDDLWatermark(tableDef) // TODO: need to cleanup tableDefMap in the future. log.Info("execute ddl event successfully", zap.String("query", tableDef.Query), zap.String("schema", key.Schema), zap.String("table", key.Table), - zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey])) + zap.Uint64("ddlWatermark", c.tableDDLWatermark[tableKey]), + zap.String("watermarkKey", watermarkKey)) continue }