Skip to content

Commit

Permalink
codec(ticdc): simple decoder set column flag incorrectly since miss i…
Browse files Browse the repository at this point in the history
…ndex id (#11235)

close #11236
  • Loading branch information
3AceShowHand committed Jun 4, 2024
1 parent 92e2909 commit 667cea0
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 7 deletions.
56 changes: 56 additions & 0 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,62 @@ func TestEncodeDDLEvent(t *testing.T) {
}
}

func TestColumnFlags(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

createTableDDL := `create table test.t(
a bigint(20) unsigned not null,
b bigint(20) default null,
c varbinary(767) default null,
d int(11) unsigned not null,
e int(11) default null,
primary key (a),
key idx_c(c),
key idx_b(b),
unique key idx_de(d, e))`
createTableDDLEvent := helper.DDL2Event(createTableDDL)

ctx := context.Background()
codecConfig := common.NewConfig(config.ProtocolSimple)
for _, format := range []common.EncodingFormatType{
common.EncodingFormatAvro,
common.EncodingFormatJSON,
} {
codecConfig.EncodingFormat = format
b, err := NewBuilder(ctx, codecConfig)
require.NoError(t, err)
enc := b.Build()

m, err := enc.EncodeDDLEvent(createTableDDLEvent)
require.NoError(t, err)

dec, err := NewDecoder(ctx, codecConfig, nil)
require.NoError(t, err)

err = dec.AddKeyValue(m.Key, m.Value)
require.NoError(t, err)

messageType, hasNext, err := dec.HasNext()
require.NoError(t, err)
require.True(t, hasNext)
require.Equal(t, model.MessageTypeDDL, messageType)

decodedDDLEvent, err := dec.NextDDLEvent()
require.NoError(t, err)

originFlags := createTableDDLEvent.TableInfo.ColumnsFlag
obtainedFlags := decodedDDLEvent.TableInfo.ColumnsFlag

for colID, expected := range originFlags {
name := createTableDDLEvent.TableInfo.ForceGetColumnName(colID)
actualID := decodedDDLEvent.TableInfo.ForceGetColumnIDByName(name)
actual := obtainedFlags[actualID]
require.Equal(t, expected, actual)
}
}
}

func TestEncodeIntegerTypes(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
Expand Down
20 changes: 13 additions & 7 deletions pkg/sink/codec/simple/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,16 @@ func newTiColumnInfo(
}

for _, index := range indexes {
if index.Primary {
for _, name := range index.Columns {
if name == column.Name {
for _, name := range index.Columns {
if name == column.Name {
if index.Primary {
col.AddFlag(mysql.PriKeyFlag)
break
} else if index.Unique {
col.AddFlag(mysql.UniqueKeyFlag)
} else {
col.AddFlag(mysql.MultipleKeyFlag)
}
}
break
}
}

Expand Down Expand Up @@ -242,7 +244,7 @@ func newIndexSchema(index *timodel.IndexInfo, columns []*timodel.ColumnInfo) *In
}

// newTiIndexInfo convert IndexSchema to a tidb index info.
func newTiIndexInfo(indexSchema *IndexSchema, columns []*timodel.ColumnInfo) *timodel.IndexInfo {
func newTiIndexInfo(indexSchema *IndexSchema, columns []*timodel.ColumnInfo, indexID int64) *timodel.IndexInfo {
indexColumns := make([]*timodel.IndexColumn, len(indexSchema.Columns))
for i, col := range indexSchema.Columns {
var offset int
Expand All @@ -259,6 +261,7 @@ func newTiIndexInfo(indexSchema *IndexSchema, columns []*timodel.ColumnInfo) *ti
}

return &timodel.IndexInfo{
ID: indexID,
Name: timodel.NewCIStr(indexSchema.Name),
Columns: indexColumns,
Unique: indexSchema.Unique,
Expand Down Expand Up @@ -344,9 +347,12 @@ func newTableInfo(m *TableSchema) *model.TableInfo {
nextMockID += 100
tidbTableInfo.Columns = append(tidbTableInfo.Columns, tiCol)
}

mockIndexID := int64(1)
for _, idx := range m.Indexes {
index := newTiIndexInfo(idx, tidbTableInfo.Columns)
index := newTiIndexInfo(idx, tidbTableInfo.Columns, mockIndexID)
tidbTableInfo.Indices = append(tidbTableInfo.Indices, index)
mockIndexID += 1
}
}
return model.WrapTableInfo(100, database, schemaVersion, tidbTableInfo)
Expand Down

0 comments on commit 667cea0

Please sign in to comment.