Skip to content

Commit

Permalink
codec(ticdc): simple decoder set the table id on handle key only even…
Browse files Browse the repository at this point in the history
…t and make consumer more strict on the ts order (#11089)

close #11088
  • Loading branch information
3AceShowHand committed May 17, 2024
1 parent 61efa5f commit f760b26
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 86 deletions.
113 changes: 56 additions & 57 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func main() {
flag.StringVar(&consumerOption.upstreamTiDBDSN, "upstream-tidb-dsn", "", "upstream TiDB DSN")
flag.StringVar(&consumerOption.groupID, "consumer-group-id", groupID, "consumer group id")
flag.StringVar(&consumerOption.logPath, "log-file", "cdc_kafka_consumer.log", "log file path")
flag.StringVar(&consumerOption.logLevel, "log-level", "debug", "log file path")
flag.StringVar(&consumerOption.logLevel, "log-level", "info", "log file path")
flag.StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of Kafka consumer")
flag.StringVar(&consumerOption.ca, "ca", "", "CA certificate path for Kafka SSL connection")
flag.StringVar(&consumerOption.cert, "cert", "", "Certificate path for Kafka SSL connection")
Expand Down Expand Up @@ -426,9 +426,6 @@ type Consumer struct {
sinks []*partitionSinks
sinksMu sync.Mutex

// initialize to 0 by default
globalResolvedTs uint64

eventRouter *dispatcher.EventRouter

option *consumerOption
Expand Down Expand Up @@ -628,14 +625,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if simple, ok := decoder.(*simple.Decoder); ok {
cachedEvents := simple.GetCachedEvents()
for _, row := range cachedEvents {
var partitionID int64
if row.TableInfo.IsPartitionTable() {
partitionID = row.PhysicalTableID
}
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), partitionID)
row.TableInfo.TableName.TableID = tableID

tableID := row.PhysicalTableID
group, ok := eventGroups[tableID]
if !ok {
group = newEventsGroup()
Expand All @@ -646,11 +636,26 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}

// the Query maybe empty if using simple protocol, it's comes from `bootstrap` event.
if partition == 0 && ddl.Query != "" {
c.appendDDL(ddl)
if partition != 0 || ddl.Query == "" {
continue
}

partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs)
if ddl.CommitTs < partitionResolvedTs {
log.Panic("DDL event commit-ts less than the resolved ts",
zap.Int32("partition", partition),
zap.Int64("offset", message.Offset),
zap.Uint64("partitionResolvedTs", partitionResolvedTs),
zap.Uint64("commitTs", ddl.CommitTs),
zap.String("DDL", ddl.Query))
}
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")
atomic.StoreUint64(&sink.resolvedTs, ddl.CommitTs)
log.Info("partition resolved ts updated by the DDL event",
zap.Int32("partition", partition),
zap.Int64("offset", message.Offset),
zap.Uint64("oldResolvedTs", partitionResolvedTs),
zap.Uint64("resolvedTs", ddl.CommitTs))
c.appendDDL(ddl)
case model.MessageTypeRow:
row, err := decoder.NextRowChangedEvent()
if err != nil {
Expand All @@ -660,7 +665,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
// when using simple protocol, the row may be nil, since it's table info not received yet,
// it's cached in the decoder, so just continue here.
if row == nil {
if c.option.protocol == config.ProtocolSimple && row == nil {
continue
}
target, _, err := c.eventRouter.GetPartitionForRowChange(row, c.option.partitionNum)
Expand All @@ -669,43 +674,42 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
if partition != target {
log.Panic("RowChangedEvent dispatched to wrong partition",
zap.Int32("partition", partition),
zap.Int64("offset", message.Offset),
zap.Int32("obtained", partition),
zap.Int32("expected", target),
zap.Int32("partitionNum", c.option.partitionNum),
zap.Any("row", row),
)
}

globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs)
if row.CommitTs <= globalResolvedTs || row.CommitTs <= partitionResolvedTs {
log.Warn("RowChangedEvent fallback row, ignore it",
if row.CommitTs < partitionResolvedTs {
log.Panic("RowChangedEvent commit-ts less than the resolved ts",
zap.Int32("partition", partition),
zap.Int64("offset", message.Offset),
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("partitionResolvedTs", partitionResolvedTs),
zap.Int32("partition", partition),
zap.Any("row", row))
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")
continue
}
var partitionID int64
if row.TableInfo.IsPartitionTable() {
partitionID = row.PhysicalTableID
}
tableID := c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), partitionID)
row.TableInfo.TableName.TableID = tableID

tableID := row.PhysicalTableID
// simple protocol decoder should have set the table id already.
if c.option.protocol != config.ProtocolSimple {
var partitionID int64
if row.TableInfo.IsPartitionTable() {
partitionID = row.PhysicalTableID
}
tableID = c.fakeTableIDGenerator.
generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), partitionID)
row.TableInfo.TableName.TableID = tableID
}
group, ok := eventGroups[tableID]
if !ok {
group = newEventsGroup()
eventGroups[tableID] = group
}

group.Append(row)
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")
case model.MessageTypeResolved:
ts, err := decoder.NextResolvedEvent()
if err != nil {
Expand All @@ -714,17 +718,15 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
zap.Error(err))
}

globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
partitionResolvedTs := atomic.LoadUint64(&sink.resolvedTs)
if ts < globalResolvedTs || ts < partitionResolvedTs {
log.Warn("partition resolved ts fallback, skip it",
if ts < partitionResolvedTs {
log.Panic("partition resolved ts fallback",
zap.Int32("partition", partition),
zap.Int64("offset", message.Offset),
zap.Uint64("ts", ts),
zap.Uint64("partitionResolvedTs", partitionResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Int32("partition", partition))
session.MarkMessage(message, "")
continue
zap.Uint64("partitionResolvedTs", partitionResolvedTs))
}
atomic.StoreUint64(&sink.resolvedTs, ts)

for tableID, group := range eventGroups {
events := group.Resolve(ts)
Expand All @@ -746,12 +748,8 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
sink.tablesCommitTsMap.Store(tableID, commitTs)
}
}
atomic.StoreUint64(&sink.resolvedTs, ts)
// todo: mark the offset after the DDL is fully synced to the downstream mysql.
session.MarkMessage(message, "")

}

session.MarkMessage(message, "")
}

if counter > c.option.maxBatchSize {
Expand Down Expand Up @@ -838,6 +836,8 @@ func (c *Consumer) getMinPartitionResolvedTs() (result uint64, err error) {
func (c *Consumer) Run(ctx context.Context) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

var globalResolvedTs uint64
for {
select {
case <-ctx.Done():
Expand All @@ -854,14 +854,14 @@ func (c *Consumer) Run(ctx context.Context) error {
todoDDL := c.getFrontDDL()
if todoDDL != nil && todoDDL.CommitTs <= minPartitionResolvedTs {
// flush DMLs
if err := c.forEachSink(func(sink *partitionSinks) error {
if err = c.forEachSink(func(sink *partitionSinks) error {
return syncFlushRowChangedEvents(ctx, sink, todoDDL.CommitTs)
}); err != nil {
return cerror.Trace(err)
}

// DDL can be executed, do it first.
if err := c.ddlSink.WriteDDLEvent(ctx, todoDDL); err != nil {
if err = c.ddlSink.WriteDDLEvent(ctx, todoDDL); err != nil {
return cerror.Trace(err)
}
c.popDDL()
Expand All @@ -874,19 +874,18 @@ func (c *Consumer) Run(ctx context.Context) error {
minPartitionResolvedTs = todoDDL.CommitTs
}

// update global resolved ts
if c.globalResolvedTs > minPartitionResolvedTs {
if globalResolvedTs > minPartitionResolvedTs {
log.Panic("global ResolvedTs fallback",
zap.Uint64("globalResolvedTs", c.globalResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("minPartitionResolvedTs", minPartitionResolvedTs))
}

if c.globalResolvedTs < minPartitionResolvedTs {
c.globalResolvedTs = minPartitionResolvedTs
if globalResolvedTs < minPartitionResolvedTs {
globalResolvedTs = minPartitionResolvedTs
}

if err := c.forEachSink(func(sink *partitionSinks) error {
return syncFlushRowChangedEvents(ctx, sink, c.globalResolvedTs)
if err = c.forEachSink(func(sink *partitionSinks) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
}); err != nil {
return cerror.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sink/codec/simple/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (d *Decoder) assembleHandleKeyOnlyRowChangedEvent(m *message) (*model.RowCh
Version: defaultVersion,
Schema: m.Schema,
Table: m.Table,
TableID: m.TableID,
Type: m.Type,
CommitTs: m.CommitTs,
SchemaVersion: m.SchemaVersion,
Expand Down
44 changes: 15 additions & 29 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,42 +319,28 @@ func newTableSchema(tableInfo *model.TableInfo) *TableSchema {
func newTableInfo(m *TableSchema) *model.TableInfo {
var (
database string
table string
tableID int64
schemaVersion uint64
)

tidbTableInfo := &timodel.TableInfo{}
if m != nil {
database = m.Schema
table = m.Table
tableID = m.TableID
schemaVersion = m.Version
}
tidbTableInfo := &timodel.TableInfo{
ID: tableID,
Name: timodel.NewCIStr(table),
UpdateTS: schemaVersion,
}

if m == nil {
return &model.TableInfo{
TableName: model.TableName{
Schema: database,
Table: table,
TableID: tableID,
},
TableInfo: tidbTableInfo,
}
}
tidbTableInfo.ID = m.TableID
tidbTableInfo.Name = timodel.NewCIStr(m.Table)
tidbTableInfo.UpdateTS = m.Version

nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
nextMockID := int64(100)
for _, col := range m.Columns {
tiCol := newTiColumnInfo(col, nextMockID, m.Indexes)
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
}
}
return model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo)
}
Expand Down

0 comments on commit f760b26

Please sign in to comment.