Skip to content

Commit

Permalink
drainer: Refactor translator/kafka.go to make it simpler (#668)
Browse files Browse the repository at this point in the history
* drainer: Refactor, break TiBinlogToSlaveBinlog into multiple functions

* Preallocate slice

* Simplify code

* gofmt

* Rename function
  • Loading branch information
suzaku committed Jul 16, 2019
1 parent 0ad4e52 commit 458a188
Showing 1 changed file with 78 additions and 60 deletions.
138 changes: 78 additions & 60 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ import (
)

// TiBinlogToSlaveBinlog translates the format to slave binlog
func TiBinlogToSlaveBinlog(infoGetter TableInfoGetter, schema string, table string,
tiBinlog *pb.Binlog, pv *pb.PrewriteValue) (slaveBinlog *obinlog.Binlog, err error) {
func TiBinlogToSlaveBinlog(
infoGetter TableInfoGetter,
schema string,
table string,
tiBinlog *pb.Binlog,
pv *pb.PrewriteValue,
) (*obinlog.Binlog, error) {
if tiBinlog.DdlJobId > 0 { // DDL
slaveBinlog = &obinlog.Binlog{
slaveBinlog := &obinlog.Binlog{
Type: obinlog.BinlogType_DDL,
CommitTs: tiBinlog.GetCommitTs(),
DdlData: &obinlog.DDLData{
Expand All @@ -45,81 +50,51 @@ func TiBinlogToSlaveBinlog(infoGetter TableInfoGetter, schema string, table stri
DdlQuery: tiBinlog.GetDdlQuery(),
},
}
} else {
slaveBinlog = &obinlog.Binlog{
Type: obinlog.BinlogType_DML,
CommitTs: tiBinlog.GetCommitTs(),
DmlData: new(obinlog.DMLData),
}

for _, mut := range pv.GetMutations() {
info, ok := infoGetter.TableByID(mut.GetTableId())
if !ok {
return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}
return slaveBinlog, nil
}
slaveBinlog := &obinlog.Binlog{
Type: obinlog.BinlogType_DML,
CommitTs: tiBinlog.GetCommitTs(),
DmlData: new(obinlog.DMLData),
}

schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
if !ok {
return nil, errors.Errorf("SchemaAndTableName empty table id: %d", mut.GetTableId())
}
for _, mut := range pv.GetMutations() {
info, ok := infoGetter.TableByID(mut.GetTableId())
if !ok {
return nil, errors.Errorf("TableByID empty table id: %d", mut.GetTableId())
}

iter := newSequenceIterator(&mut)
for {
mutType, row, err := iter.next()
if err != nil {
if err == io.EOF {
break
}
return nil, errors.Trace(err)
}
schema, _, ok = infoGetter.SchemaAndTableName(mut.GetTableId())
if !ok {
return nil, errors.Errorf("SchemaAndTableName empty table id: %d", mut.GetTableId())
}

table := genTable(schema, info)
slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table)
tableMutation := new(obinlog.TableMutation)
table.Mutations = append(table.Mutations, tableMutation)

switch mutType {
case pb.MutationType_Insert:
tableMutation.Type = obinlog.MutationType_Insert.Enum()
tableMutation.Row, err = insertRowToRow(info, row)
if err != nil {
return nil, err
}
case pb.MutationType_Update:
tableMutation.Type = obinlog.MutationType_Update.Enum()
tableMutation.Row, tableMutation.ChangeRow, err = updateRowToRow(info, row)
if err != nil {
return nil, err
}
case pb.MutationType_DeleteRow:
tableMutation.Type = obinlog.MutationType_Delete.Enum()
tableMutation.Row, err = deleteRowToRow(info, row)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown mutation type: %v", mutType)
iter := newSequenceIterator(&mut)
for {
table, err := nextRow(schema, info, iter)
if err != nil {
if errors.Cause(err) == io.EOF {
break
}
return nil, errors.Trace(err)
}
slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table)
}
}

return
return slaveBinlog, nil
}

func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table) {
table = new(obinlog.Table)
table.SchemaName = proto.String(schema)
table.TableName = proto.String(tableInfo.Name.O)
// get obinlog.ColumnInfo
var columnInfos []*obinlog.ColumnInfo
columnInfos := make([]*obinlog.ColumnInfo, 0, len(tableInfo.Columns))
for _, col := range tableInfo.Columns {
info := new(obinlog.ColumnInfo)
info.Name = col.Name.O
info.MysqlType = types.TypeToStr(col.Tp, col.Charset)
if mysql.HasPriKeyFlag(col.Flag) {
info.IsPrimaryKey = true
}
info.IsPrimaryKey = mysql.HasPriKeyFlag(col.Flag)
columnInfos = append(columnInfos, info)
}
table.ColumnInfo = columnInfos
Expand Down Expand Up @@ -269,3 +244,46 @@ func DatumToColumn(colInfo *model.ColumnInfo, datum types.Datum) (col *obinlog.C

return
}

func createTableMutation(tp pb.MutationType, info *model.TableInfo, row []byte) (*obinlog.TableMutation, error) {
var err error
mut := new(obinlog.TableMutation)
switch tp {
case pb.MutationType_Insert:
mut.Type = obinlog.MutationType_Insert.Enum()
mut.Row, err = insertRowToRow(info, row)
if err != nil {
return nil, err
}
case pb.MutationType_Update:
mut.Type = obinlog.MutationType_Update.Enum()
mut.Row, mut.ChangeRow, err = updateRowToRow(info, row)
if err != nil {
return nil, err
}
case pb.MutationType_DeleteRow:
mut.Type = obinlog.MutationType_Delete.Enum()
mut.Row, err = deleteRowToRow(info, row)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown mutation type: %v", tp)
}
return mut, nil
}

func nextRow(schema string, info *model.TableInfo, iter *sequenceIterator) (*obinlog.Table, error) {
mutType, row, err := iter.next()
if err != nil {
return nil, errors.Trace(err)
}

tableMutation, err := createTableMutation(mutType, info, row)
if err != nil {
return nil, errors.Trace(err)
}
table := genTable(schema, info)
table.Mutations = append(table.Mutations, tableMutation)
return table, nil
}

0 comments on commit 458a188

Please sign in to comment.