Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion downstreamadapter/sink/eventrouter/partition/index_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (r *IndexValuePartitionGenerator) GeneratePartitionIndexAndKey(
continue
}
if tableInfo.IsHandleKey(col.ID) {
r.hasher.Write([]byte(col.Name.O), []byte(common.ColumnValueString(common.ExtractColVal(&rowData, col, idx))))
val := common.ExtractColVal(&rowData, col, idx)
if val == nil {
continue
}
r.hasher.Write([]byte(col.Name.O), []byte(common.ColumnValueString(val)))
}
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/util"
)

Expand All @@ -39,7 +40,7 @@ func ColumnValueString(c interface{}) string {
var data string
switch v := c.(type) {
case nil:
data = "null"
log.Panic("should not reach here when column value is nil")
case bool:
if v {
data = "1"
Expand Down
6 changes: 1 addition & 5 deletions pkg/common/table_info_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,11 +693,7 @@ func (s *columnSchema) initIndexColumns() {
indexColOffset := make([]int64, 0, len(idx.Columns))
for _, idxCol := range idx.Columns {
colInfo := s.Columns[idxCol.Offset]
if IsColCDCVisible(colInfo) {
indexColOffset = append(indexColOffset, colInfo.ID)
} else {
hasNotNullUK = false
}
indexColOffset = append(indexColOffset, colInfo.ID)
if !mysql.HasNotNullFlag(colInfo.GetFlag()) {
hasNotNullUK = false
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/sink/mysql/mysql_writer_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err
for _, sortedEventGroups := range eventsGroupSortedByUpdateTs {
for _, eventsInGroup := range sortedEventGroups {
tableInfo := eventsInGroup[0].TableInfo
if !w.shouldGenBatchSQL(tableInfo.HasPKOrNotNullUK, tableInfo.HasVirtualColumns(), eventsInGroup) {
if !w.shouldGenBatchSQL(tableInfo, eventsInGroup) {
queryList, argsList = w.generateNormalSQLs(eventsInGroup)
} else {
queryList, argsList = w.generateBatchSQL(eventsInGroup)
Expand All @@ -129,17 +129,32 @@ func (w *Writer) prepareDMLs(events []*commonEvent.DMLEvent) (*preparedDMLs, err
// Batch SQL generation is used when:
// 1. BatchDMLEnable = true, and rows > 1
// 2. The table has a pk or not null unique key
// 3. The table doesn't have virtual columns
// 3. The handle key contains no virtual generated columns
// 4. There's more than one row in the group
// 5. All events have the same safe mode status
func (w *Writer) shouldGenBatchSQL(hasPKOrNotNullUK bool, hasVirtualCols bool, events []*commonEvent.DMLEvent) bool {
func (w *Writer) shouldGenBatchSQL(tableInfo *common.TableInfo, events []*commonEvent.DMLEvent) bool {
if !w.cfg.BatchDMLEnable {
return false
}

if !hasPKOrNotNullUK || hasVirtualCols {
if !tableInfo.HasPKOrNotNullUK {
return false
}
// if the table has pk or uk, but the handle key contains virtual generated columns,
// we can't batch the events by pk or uk,
// because the value of the virtual generated column is calculated by other column,
// and we can't guarantee the value of the virtual generated column is the same for the same pk or uk.
colIDs := tableInfo.GetOrderedHandleKeyColumnIDs()
for _, colID := range colIDs {
info, exist := tableInfo.GetColumnInfo(colID)
if !exist {
continue
}
if info.IsVirtualGenerated() {
return false
}
}
// if tableInfo.HasVirtualColumns()
if len(events) == 1 && events[0].Len() == 1 {
return false
}
Expand Down
233 changes: 177 additions & 56 deletions pkg/sink/mysql/mysql_writer_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,109 +20,230 @@ import (
"testing"
"time"

"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
tidbmodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
tidbmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/stretchr/testify/require"
)

// TestShouldGenBatchSQL tests the shouldGenBatchSQL function
func TestShouldGenBatchSQL(t *testing.T) {
t.Parallel()

writer, _, _ := newTestMysqlWriter(t)
defer writer.db.Close()
buildTableInfo := func(tableName string, pkIsHandle bool) *common.TableInfo {
idFieldType := types.NewFieldType(tidbmysql.TypeLong)
nameFieldType := types.NewFieldType(tidbmysql.TypeVarchar)
if pkIsHandle {
idFieldType.AddFlag(tidbmysql.PriKeyFlag)
idFieldType.AddFlag(tidbmysql.NotNullFlag)
}
ti := &tidbmodel.TableInfo{
ID: 100,
Name: ast.NewCIStr(tableName),
PKIsHandle: pkIsHandle,
Columns: []*tidbmodel.ColumnInfo{
{
ID: 1,
Name: ast.NewCIStr("id"),
Offset: 0,
FieldType: *idFieldType,
State: tidbmodel.StatePublic,
},
{
ID: 2,
Name: ast.NewCIStr("name"),
Offset: 1,
FieldType: *nameFieldType,
State: tidbmodel.StatePublic,
},
},
}
info := common.NewTableInfo4Decoder("test", ti)
require.NotNil(t, info)
return info
}

pkTableInfo := buildTableInfo("t_with_pk", true)
noPKTableInfo := buildTableInfo("t_no_pk", false)

buildTableInfoWithVirtualGeneratedHandleKey := func(tableName string) *common.TableInfo {
aFieldType := types.NewFieldType(tidbmysql.TypeLong)
aFieldType.AddFlag(tidbmysql.NotNullFlag)
bFieldType := types.NewFieldType(tidbmysql.TypeLong)
bFieldType.AddFlag(tidbmysql.NotNullFlag)
cFieldType := types.NewFieldType(tidbmysql.TypeLong)
cFieldType.AddFlag(tidbmysql.NotNullFlag)

ti := &tidbmodel.TableInfo{
ID: 101,
Name: ast.NewCIStr(tableName),
Columns: []*tidbmodel.ColumnInfo{
{
ID: 1,
Name: ast.NewCIStr("a"),
Offset: 0,
FieldType: *aFieldType,
State: tidbmodel.StatePublic,
},
{
ID: 2,
Name: ast.NewCIStr("b"),
Offset: 1,
FieldType: *bFieldType,
State: tidbmodel.StatePublic,
GeneratedExprString: "a + c",
GeneratedStored: false,
},
{
ID: 3,
Name: ast.NewCIStr("c"),
Offset: 2,
FieldType: *cFieldType,
State: tidbmodel.StatePublic,
},
},
Indices: []*tidbmodel.IndexInfo{
{
ID: 1,
Name: ast.NewCIStr("idx1"),
Unique: true,
State: tidbmodel.StatePublic,
Columns: []*tidbmodel.IndexColumn{
{
Name: ast.NewCIStr("b"),
Offset: 1,
Length: types.UnspecifiedLength,
},
{
Name: ast.NewCIStr("c"),
Offset: 2,
Length: types.UnspecifiedLength,
},
},
},
},
}
info := common.NewTableInfo4Decoder("test", ti)
require.NotNil(t, info)
return info
}
virtualHandleKeyTableInfo := buildTableInfoWithVirtualGeneratedHandleKey("t_virtual_uk")

tests := []struct {
name string
hasPK bool
hasVirtualCols bool
events []*commonEvent.DMLEvent
config *Config
safemode bool
want bool
name string
tableInfo *common.TableInfo
events []*commonEvent.DMLEvent
adjustWriter func(w *Writer)
want bool
}{
{
name: "table without primary key should not use batch SQL",
hasPK: false,
hasVirtualCols: false,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 1, 1, 1)},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: false,
},
{
name: "table with virtual columns should not use batch SQL",
hasPK: true,
hasVirtualCols: true,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 1, 1, 1)},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: false,
name: "table without primary key should not use batch SQL",
tableInfo: noPKTableInfo,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 1, 1, 1), newDMLEvent(t, 1, 1, 1)},
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: false,
},
{
name: "single row event should not use batch SQL",
hasPK: true,
hasVirtualCols: false,
name: "single row event should not use batch SQL",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{
newDMLEvent(t, 1, 1, 1),
},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: false,
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: false,
},
{
name: "all rows in same safe mode should use batch SQL",
hasPK: true,
hasVirtualCols: false,
name: "all rows in same safe mode should use batch SQL",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{
newDMLEvent(t, 1, 2, 2),
newDMLEvent(t, 2, 3, 2),
},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: true,
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: true,
},
{
name: "multiple rows with primary key in different safe mode should not use batch SQL",
hasPK: true,
hasVirtualCols: false,
name: "multiple rows with primary key in different safe mode should not use batch SQL",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{
newDMLEvent(t, 2, 1, 2),
newDMLEvent(t, 1, 2, 2),
},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: false,
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: false,
},
{
name: "multiple rows with primary key in unsafe mode should use batch SQL",
hasPK: true,
hasVirtualCols: false,
name: "multiple rows with primary key in unsafe mode should use batch SQL",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{
newDMLEvent(t, 2, 1, 2),
newDMLEvent(t, 3, 1, 2),
},
config: &Config{SafeMode: false, BatchDMLEnable: true},
want: true,
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: true,
},
{
name: "global safe mode should use batch SQL",
hasPK: true,
hasVirtualCols: false,
name: "global safe mode should use batch SQL when multiple rows",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{
newDMLEvent(t, 2, 1, 2),
newDMLEvent(t, 2, 1, 1),
newDMLEvent(t, 1, 2, 1),
},
config: &Config{SafeMode: true, BatchDMLEnable: true},
want: true,
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = true
w.cfg.BatchDMLEnable = true
},
want: true,
},
{
name: "batch dml is disabled",
tableInfo: pkTableInfo,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 2, 1, 1), newDMLEvent(t, 3, 1, 1)},
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = false
},
want: false,
},
{
name: "batch dml is disabled",
hasPK: true,
hasVirtualCols: false,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 1, 1, 1)},
config: &Config{SafeMode: false, BatchDMLEnable: false},
want: false,
name: "handle key contains virtual generated column should not use batch SQL",
tableInfo: virtualHandleKeyTableInfo,
events: []*commonEvent.DMLEvent{newDMLEvent(t, 2, 1, 1), newDMLEvent(t, 3, 1, 1)},
adjustWriter: func(w *Writer) {
w.cfg.SafeMode = false
w.cfg.BatchDMLEnable = true
},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got := writer.shouldGenBatchSQL(tt.hasPK, tt.hasVirtualCols, tt.events)
writer, db, _ := newTestMysqlWriter(t)
defer db.Close()
if tt.adjustWriter != nil {
tt.adjustWriter(writer)
}
got := writer.shouldGenBatchSQL(tt.tableInfo, tt.events)
require.Equal(t, tt.want, got)
})
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/sink/mysql/sql_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
colNames := make([]string, 0, len(tableInfo.GetColumns()))
// Try to use unique key values when available
for i, col := range tableInfo.GetColumns() {
if col == nil || !tableInfo.IsHandleKey(col.ID) {
if col == nil || !tableInfo.IsHandleKey(col.ID) || col.IsVirtualGenerated() {
continue
}
colNames = append(colNames, col.Name.O)
Expand All @@ -269,9 +269,11 @@ func whereSlice(row *chunk.Row, tableInfo *common.TableInfo) ([]string, []interf
// if no explicit row id, use all key-values in where condition
if len(colNames) == 0 {
for i, col := range tableInfo.GetColumns() {
colNames = append(colNames, col.Name.O)
v := common.ExtractColVal(row, col, i)
args = append(args, v)
if col != nil && !col.IsVirtualGenerated() {
colNames = append(colNames, col.Name.O)
v := common.ExtractColVal(row, col, i)
args = append(args, v)
}
}
}
return colNames, args
Expand Down
Loading
Loading