Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mounter: support more column flag #796

Merged
merged 6 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,18 @@ func transColumnFlag(col *timodel.ColumnInfo) model.ColumnFlagType {
if col.IsGenerated() {
flag.SetIsGeneratedColumn()
}
if mysql.HasPriKeyFlag(col.Flag) {
flag.SetIsPrimaryKey()
}
if mysql.HasUniKeyFlag(col.Flag) {
flag.SetIsUniqueKey()
}
if !mysql.HasNotNullFlag(col.Flag) {
flag.SetIsNullable()
}
if mysql.HasMultipleKeyFlag(col.Flag) {
flag.SetIsMultipleKey()
}
return flag
}

Expand Down
68 changes: 68 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ const (
HandleKeyFlag
// GeneratedColumnFlag means the column is a generated column
GeneratedColumnFlag
// PrimaryKeyFlag means the column is primary key
PrimaryKeyFlag
// UniqueKeyFlag means the column is unique key
UniqueKeyFlag
// MultipleKeyFlag means the column is multiple key
MultipleKeyFlag
// NullableFlag means the column is nullable
NullableFlag
)

//SetIsBinary set BinaryFlag
Expand Down Expand Up @@ -94,6 +102,66 @@ func (b *ColumnFlagType) IsGeneratedColumn() bool {
return (*util.Flag)(b).HasAll(util.Flag(GeneratedColumnFlag))
}

//SetIsPrimaryKey set PrimaryKeyFlag
func (b *ColumnFlagType) SetIsPrimaryKey() {
(*util.Flag)(b).Add(util.Flag(PrimaryKeyFlag))
}

//UnsetIsPrimaryKey unset PrimaryKeyFlag
func (b *ColumnFlagType) UnsetIsPrimaryKey() {
(*util.Flag)(b).Remove(util.Flag(PrimaryKeyFlag))
}

//IsPrimaryKey show whether PrimaryKeyFlag is set
func (b *ColumnFlagType) IsPrimaryKey() bool {
return (*util.Flag)(b).HasAll(util.Flag(PrimaryKeyFlag))
}

//SetIsUniqueKey set UniqueKeyFlag
func (b *ColumnFlagType) SetIsUniqueKey() {
(*util.Flag)(b).Add(util.Flag(UniqueKeyFlag))
}

//UnsetIsUniqueKey unset UniqueKeyFlag
func (b *ColumnFlagType) UnsetIsUniqueKey() {
(*util.Flag)(b).Remove(util.Flag(UniqueKeyFlag))
}

//IsUniqueKey show whether UniqueKeyFlag is set
func (b *ColumnFlagType) IsUniqueKey() bool {
return (*util.Flag)(b).HasAll(util.Flag(UniqueKeyFlag))
}

//IsMultipleKey show whether MultipleKeyFlag is set
func (b *ColumnFlagType) IsMultipleKey() bool {
return (*util.Flag)(b).HasAll(util.Flag(MultipleKeyFlag))
}

//SetIsMultipleKey set MultipleKeyFlag
func (b *ColumnFlagType) SetIsMultipleKey() {
(*util.Flag)(b).Add(util.Flag(MultipleKeyFlag))
}

//UnsetIsMultipleKey unset MultipleKeyFlag
func (b *ColumnFlagType) UnsetIsMultipleKey() {
(*util.Flag)(b).Remove(util.Flag(MultipleKeyFlag))
}

//IsNullable show whether NullableFlag is set
func (b *ColumnFlagType) IsNullable() bool {
return (*util.Flag)(b).HasAll(util.Flag(NullableFlag))
}

//SetIsNullable set NullableFlag
func (b *ColumnFlagType) SetIsNullable() {
(*util.Flag)(b).Add(util.Flag(NullableFlag))
}

//UnsetIsNullable unset NullableFlag
func (b *ColumnFlagType) UnsetIsNullable() {
(*util.Flag)(b).Remove(util.Flag(NullableFlag))
}

// TableName represents name of a table, includes table name and schema name.
type TableName struct {
Schema string `toml:"db-name" json:"db-name"`
Expand Down
17 changes: 16 additions & 1 deletion cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type columnFlagTypeSuite struct{}

var _ = check.Suite(&columnFlagTypeSuite{})

func (s *configSuite) TestBinaryFlag(c *check.C) {
func (s *configSuite) TestSetFlag(c *check.C) {
var flag ColumnFlagType
flag.SetIsBinary()
flag.SetIsGeneratedColumn()
Expand All @@ -28,4 +28,19 @@ func (s *configSuite) TestBinaryFlag(c *check.C) {
c.Assert(flag.IsGeneratedColumn(), check.IsTrue)
flag.UnsetIsBinary()
c.Assert(flag.IsBinary(), check.IsFalse)
flag.SetIsMultipleKey()
flag.SetIsUniqueKey()
c.Assert(flag.IsMultipleKey() && flag.IsUniqueKey(), check.IsTrue)
flag.UnsetIsUniqueKey()
c.Assert(flag.IsUniqueKey(), check.IsFalse)
}

func (s *configSuite) TestFlagValue(c *check.C) {
c.Assert(BinaryFlag, check.Equals, ColumnFlagType(0b1))
c.Assert(HandleKeyFlag, check.Equals, ColumnFlagType(0b10))
c.Assert(GeneratedColumnFlag, check.Equals, ColumnFlagType(0b100))
c.Assert(PrimaryKeyFlag, check.Equals, ColumnFlagType(0b1000))
c.Assert(UniqueKeyFlag, check.Equals, ColumnFlagType(0b10000))
c.Assert(MultipleKeyFlag, check.Equals, ColumnFlagType(0b100000))
c.Assert(NullableFlag, check.Equals, ColumnFlagType(0b1000000))
}
2 changes: 1 addition & 1 deletion cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated
}
}

isKey := c.WhereHandle != nil && *c.WhereHandle
isKey := c.Flag.IsPrimaryKey()
isNull := c.Value == nil
value := ""
if !isNull {
Expand Down
5 changes: 2 additions & 3 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ type canalEntrySuite struct{}
var _ = check.Suite(&canalEntrySuite{})

func (s *canalEntrySuite) TestConvertEntry(c *check.C) {
trueVar := true
testCaseUpdate := &model.RowChangedEvent{
CommitTs: 417318403368288260,
Table: &model.TableName{
Expand All @@ -138,7 +137,7 @@ func (s *canalEntrySuite) TestConvertEntry(c *check.C) {
},
Delete: false,
Columns: map[string]*model.Column{
"id": {Type: mysql.TypeLong, WhereHandle: &trueVar, Value: 1},
"id": {Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1},
"name": {Type: mysql.TypeVarchar, Value: "Bob"},
"tiny": {Type: mysql.TypeTiny, Value: 255},
"comment": {Type: mysql.TypeBlob, Value: []byte("测试")},
Expand All @@ -153,7 +152,7 @@ func (s *canalEntrySuite) TestConvertEntry(c *check.C) {
},
Delete: true,
PreColumns: map[string]*model.Column{
"id": {Type: mysql.TypeLong, WhereHandle: &trueVar, Value: 1},
"id": {Type: mysql.TypeLong, Flag: model.PrimaryKeyFlag, Value: 1},
},
}
testCaseDdl := &model.DDLEvent{
Expand Down