Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: chose one index to output delete events #787

Merged
merged 25 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 59 additions & 68 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math/rand"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -378,11 +376,7 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
if !exist {
return nil, errors.NotFoundf("column info, colID: %d", index)
}
// the judge about `fillWithDefaultValue` is tricky
// if the `fillWithDefaultValue` is true, the event must be deletion
// we should output the generated column in deletion event
// this tricky code will be improve after pingcap/ticdc#787 merged
if !tableInfo.IsColWritable(colInfo) && fillWithDefaultValue {
if !tableInfo.IsColCDCVisible(colInfo) {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
continue
}
colName := colInfo.Name.O
Expand All @@ -401,24 +395,28 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
}
cols[colName] = col
}
if !fillWithDefaultValue {
return cols, nil
}
for _, col := range tableInfo.Columns {
_, ok := cols[col.Name.O]
if !ok && tableInfo.IsColWritable(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
if fillWithDefaultValue {
for _, col := range tableInfo.Columns {
_, ok := cols[col.Name.O]
if !ok && tableInfo.IsColCDCVisible(col) {
column := &model.Column{
Type: col.Tp,
Value: getDefaultOrZeroValue(col),
Flag: transColumnFlag(col),
}
if tableInfo.IsColumnUnique(col.ID) {
whereHandle := true
column.WhereHandle = &whereHandle
}
cols[col.Name.O] = column
}
cols[col.Name.O] = column
}
}

err := setHandleKeyFlag(tableInfo, cols)
if err != nil {
return nil, errors.Trace(err)
}
return cols, nil
}

Expand Down Expand Up @@ -480,11 +478,43 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
}, nil
}

func setHandleKeyFlag(tableInfo *model.TableInfo, colValues map[string]*model.Column) error {
switch tableInfo.HandleIndexID {
case model.HandleIndexTableIneligible:
log.Fatal("this table is not eligible", zap.Int64("tableID", tableInfo.ID))
case model.HandleIndexPKIsHandle:
// pk is handle
if !tableInfo.PKIsHandle {
log.Fatal("the pk of this table is not handle", zap.Int64("tableID", tableInfo.ID))
}
for _, colInfo := range tableInfo.Columns {
if mysql.HasPriKeyFlag(colInfo.Flag) {
colValues[colInfo.Name.O].Flag.SetIsHandleKey()
break
}
}
default:
handleIndexInfo, exist := tableInfo.GetIndexInfo(tableInfo.HandleIndexID)
if !exist {
return errors.NotFoundf("handle index info(%d) in table(%d)", tableInfo.HandleIndexID, tableInfo.ID)
}
for _, colInfo := range handleIndexInfo.Columns {
colName := tableInfo.Columns[colInfo.Offset].Name.O
colValues[colName].Flag.SetIsHandleKey()
}
}
return nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete || m.enableOldValue {
return nil, nil
}
// skip any index that is not the handle
if idx.IndexID != tableInfo.HandleIndexID {
return nil, nil
}

indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID)
if !exist {
Expand All @@ -508,12 +538,14 @@ func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKV
return nil, errors.Trace(err)
}
whereHandle := true
preCols[idxCol.Name.O] = &model.Column{
col := &model.Column{
Type: tableInfo.Columns[idxCol.Offset].Tp,
WhereHandle: &whereHandle,
Value: value,
Flag: transColumnFlag(tableInfo.Columns[idxCol.Offset]),
}
col.Flag.SetIsHandleKey()
preCols[idxCol.Name.O] = col
}
return &model.RowChangedEvent{
StartTs: idx.StartTs,
Expand Down Expand Up @@ -664,55 +696,14 @@ func genMultipleKeys(ti *timodel.TableInfo, preCols, cols map[string]*model.Colu
return multipleKeys
}

func columnValue(value interface{}) string {
var data string
switch v := value.(type) {
case nil:
data = "null"
case bool:
if v {
data = "1"
} else {
data = "0"
}
case int:
data = strconv.FormatInt(int64(v), 10)
case int8:
data = strconv.FormatInt(int64(v), 10)
case int16:
data = strconv.FormatInt(int64(v), 10)
case int32:
data = strconv.FormatInt(int64(v), 10)
case int64:
data = strconv.FormatInt(int64(v), 10)
case uint8:
data = strconv.FormatUint(uint64(v), 10)
case uint16:
data = strconv.FormatUint(uint64(v), 10)
case uint32:
data = strconv.FormatUint(uint64(v), 10)
case uint64:
data = strconv.FormatUint(uint64(v), 10)
case float32:
data = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
data = strconv.FormatFloat(float64(v), 'f', -1, 64)
case string:
data = v
case []byte:
data = string(v)
default:
data = fmt.Sprintf("%v", v)
}

return data
}

func transColumnFlag(col *timodel.ColumnInfo) model.ColumnFlagType {
var flag model.ColumnFlagType
if col.Charset == "binary" {
flag.SetIsBinary()
}
if col.IsGenerated() {
flag.SetIsGeneratedColumn()
}
return flag
}

Expand All @@ -724,7 +715,7 @@ func genKeyList(table string, columns []*timodel.ColumnInfo, values map[string]*
log.L().Debug("ignore null value", zap.String("column", col.Name.O), zap.String("table", table))
continue // ignore `null` value.
}
buf.WriteString(columnValue(val.Value))
buf.WriteString(model.ColumnValueString(val.Value))
}
if buf.Len() == 0 {
log.L().Debug("all value are nil, no key generated", zap.String("table", table))
Expand Down
67 changes: 61 additions & 6 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import (
"github.com/pingcap/tidb/util/rowcodec"
)

const (
// HandleIndexPKIsHandle represents that the handle index is the pk and the pk is the handle
HandleIndexPKIsHandle = -1
// HandleIndexTableIneligible represents that the table is ineligible
HandleIndexTableIneligible = -2
)

// TableInfo provides meta data describing a DB table.
type TableInfo struct {
*model.TableInfo
Expand All @@ -30,7 +37,15 @@ type TableInfo struct {
columnsOffset map[int64]int
indicesOffset map[int64]int
uniqueColumns map[int64]struct{}
handleColID int64

// only for new row format decoder
handleColID int64

// the mounter will choose this index to output delete events
// special value:
// HandleIndexPKIsHandle(-1) : pk is handle
// HandleIndexTableIneligible(-2) : the table is not eligible
HandleIndexID int64

// if the table of this row only has one unique index(includes primary key),
// IndieMarkCol will be set to the name of the unique index
Expand All @@ -49,6 +64,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
indicesOffset: make(map[int64]int, len(info.Indices)),
uniqueColumns: make(map[int64]struct{}),
handleColID: -1,
HandleIndexID: HandleIndexTableIneligible,
rowColInfos: make([]rowcodec.ColInfo, len(info.Columns)),
}

Expand All @@ -59,6 +75,7 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
isPK := (ti.PKIsHandle && mysql.HasPriKeyFlag(col.Flag)) || col.ID == model.ExtraHandleID
if isPK {
ti.handleColID = col.ID
ti.HandleIndexID = HandleIndexPKIsHandle
ti.uniqueColumns[col.ID] = struct{}{}
uniqueIndexNum++
}
Expand Down Expand Up @@ -90,10 +107,39 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
}
}
}

ti.findHandleIndex()
return ti
}

func (ti *TableInfo) findHandleIndex() {
if ti.HandleIndexID == HandleIndexPKIsHandle {
// pk is handle
return
}
handleIndexOffset := -1
for i, idx := range ti.Indices {
if !ti.IsIndexUnique(idx) {
continue
}
if idx.Primary {
handleIndexOffset = i
break
}
if handleIndexOffset < 0 {
handleIndexOffset = i
} else {
if len(ti.Indices[handleIndexOffset].Columns) > len(ti.Indices[i].Columns) ||
(len(ti.Indices[handleIndexOffset].Columns) == len(ti.Indices[i].Columns) &&
ti.Indices[handleIndexOffset].ID > ti.Indices[i].ID) {
handleIndexOffset = i
}
}
}
if handleIndexOffset >= 0 {
ti.HandleIndexID = ti.Indices[handleIndexOffset].ID
}
}

// GetColumnInfo returns the column info by ID
func (ti *TableInfo) GetColumnInfo(colID int64) (info *model.ColumnInfo, exist bool) {
colOffset, exist := ti.columnsOffset[colID]
Expand Down Expand Up @@ -121,9 +167,13 @@ func (ti *TableInfo) GetRowColInfos() (int64, []rowcodec.ColInfo) {
return ti.handleColID, ti.rowColInfos
}

// IsColWritable returns is the col is writeable
func (ti *TableInfo) IsColWritable(col *model.ColumnInfo) bool {
return col.State == model.StatePublic && !col.IsGenerated()
// IsColCDCVisible returns whether the col is visible for CDC
func (ti *TableInfo) IsColCDCVisible(col *model.ColumnInfo) bool {
// this column is a virtual generated column
if col.IsGenerated() && !col.GeneratedStored {
return false
}
return col.State == model.StatePublic
}

// GetUniqueKeys returns all unique keys of the table as a slice of column names
Expand Down Expand Up @@ -180,7 +230,12 @@ func (ti *TableInfo) IsIndexUnique(indexInfo *model.IndexInfo) bool {
}
if indexInfo.Unique {
for _, col := range indexInfo.Columns {
if !mysql.HasNotNullFlag(ti.Columns[col.Offset].Flag) {
colInfo := ti.Columns[col.Offset]
if !mysql.HasNotNullFlag(colInfo.Flag) {
return false
}
// this column is a virtual generated column
if colInfo.IsGenerated() && !colInfo.GeneratedStored {
return false
}
}
Expand Down
Loading