diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index a545e24acd72c..639e09e1c619f 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -123,6 +123,7 @@ go_library( "//util/ranger", "//util/resourcegrouptag", "//util/rowDecoder", + "//util/rowcodec", "//util/set", "//util/slice", "//util/sqlexec", diff --git a/ddl/column.go b/ddl/column.go index 843a4c4a0b72a..664fe480465e4 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -20,6 +20,7 @@ import ( "encoding/hex" "fmt" "math/bits" + "sort" "strings" "sync/atomic" "time" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" decoder "github.com/pingcap/tidb/util/rowDecoder" + "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" ) @@ -1180,6 +1182,9 @@ type updateColumnWorker struct { rowDecoder *decoder.RowDecoder rowMap map[int64]types.Datum + + checksumBuffer rowcodec.RowData + checksumNeeded bool } func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { @@ -1197,12 +1202,33 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT } } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) + checksumNeeded := false + failpoint.Inject("forceRowLevelChecksumOnUpdateColumnBackfill", func() { + orig := variable.EnableRowLevelChecksum.Load() + defer variable.EnableRowLevelChecksum.Store(orig) + variable.EnableRowLevelChecksum.Store(true) + }) + // We use global `EnableRowLevelChecksum` to detect whether checksum is enabled in ddl backfill worker because + // `SessionVars.IsRowLevelChecksumEnabled` will filter out internal sessions. + if variable.EnableRowLevelChecksum.Load() { + if numNonPubCols := len(t.DeletableCols()) - len(t.Cols()); numNonPubCols > 1 { + cols := make([]*model.ColumnInfo, len(t.DeletableCols())) + for i, col := range t.DeletableCols() { + cols[i] = col.ToInfo() + } + logutil.BgLogger().Warn("skip checksum in update-column backfill since the number of non-public columns is greater than 1", + zap.String("jobQuery", reorgInfo.Query), zap.String("reorgInfo", reorgInfo.String()), zap.Any("cols", cols)) + } else { + checksumNeeded = true + } + } return &updateColumnWorker{ - backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false), - oldColInfo: oldCol, - newColInfo: newCol, - rowDecoder: rowDecoder, - rowMap: make(map[int64]types.Datum, len(decodeColMap)), + backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false), + oldColInfo: oldCol, + newColInfo: newCol, + rowDecoder: rowDecoder, + rowMap: make(map[int64]types.Datum, len(decodeColMap)), + checksumNeeded: checksumNeeded, } } @@ -1332,15 +1358,15 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra if err != nil { return errors.Trace(err) } - newColumnIDs := make([]int64, 0, len(w.rowMap)) newRow := make([]types.Datum, 0, len(w.rowMap)) for colID, val := range w.rowMap { newColumnIDs = append(newColumnIDs, colID) newRow = append(newRow, val) } + checksums := w.calcChecksums() sctx, rd := w.sessCtx.GetSessionVars().StmtCtx, &w.sessCtx.GetSessionVars().RowEncoder - newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd) + newRowVal, err := tablecodec.EncodeRow(sctx, newRow, newColumnIDs, nil, nil, rd, checksums...) if err != nil { return errors.Trace(err) } @@ -1350,6 +1376,38 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra return nil } +func (w *updateColumnWorker) calcChecksums() []uint32 { + if !w.checksumNeeded { + return nil + } + // when w.checksumNeeded is true, it indicates that there is only one write-reorg column (the new column) and other + // columns are public, thus we have to calculate two checksums that one of which only contains the old column and + // the other only contains the new column. + var checksums [2]uint32 + for i, id := range []int64{w.newColInfo.ID, w.oldColInfo.ID} { + if len(w.checksumBuffer.Cols) > 0 { + w.checksumBuffer.Cols = w.checksumBuffer.Cols[:0] + } + for _, col := range w.table.DeletableCols() { + if col.ID == id || (col.IsGenerated() && !col.GeneratedStored) { + continue + } + d := w.rowMap[col.ID] + w.checksumBuffer.Cols = append(w.checksumBuffer.Cols, rowcodec.ColData{ColumnInfo: col.ToInfo(), Datum: &d}) + } + if !sort.IsSorted(w.checksumBuffer) { + sort.Sort(w.checksumBuffer) + } + checksum, err := w.checksumBuffer.Checksum() + if err != nil { + logutil.BgLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err)) + return nil + } + checksums[i] = checksum + } + return checksums[:] +} + // reformatErrors casted error because `convertTo` function couldn't package column name and datum value for some errors. func (w *updateColumnWorker) reformatErrors(err error) error { // Since row count is not precious in concurrent reorganization, here we substitute row count with datum value. diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 95b615f069d1c..0879993a984b1 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -43,6 +43,7 @@ import ( func TestColumnTypeChangeBetweenInteger(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Modify column from null to not null. @@ -108,6 +109,7 @@ func TestColumnTypeChangeBetweenInteger(t *testing.T) { func TestColumnTypeChangeStateBetweenInteger(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c1 int, c2 int)") @@ -176,6 +178,7 @@ func TestColumnTypeChangeStateBetweenInteger(t *testing.T) { func TestRollbackColumnTypeChangeBetweenInteger(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c1 bigint, c2 bigint)") @@ -255,6 +258,7 @@ func init() { func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") prepare := func(tk *testkit.TestKit) { @@ -406,6 +410,7 @@ func TestColumnTypeChangeFromIntegerToOthers(t *testing.T) { func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop database if exists col_type_change_char;") tk.MustExec("create database col_type_change_char;") @@ -434,6 +439,7 @@ func TestColumnTypeChangeBetweenVarcharAndNonVarchar(t *testing.T) { func TestColumnTypeChangeFromStringToOthers(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Set time zone to UTC. @@ -665,6 +671,7 @@ func TestColumnTypeChangeFromStringToOthers(t *testing.T) { func TestColumnTypeChangeFromNumericToOthers(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Set time zone to UTC. @@ -929,6 +936,7 @@ func TestColumnTypeChangeFromNumericToOthers(t *testing.T) { func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") var assertResult bool @@ -970,6 +978,7 @@ func TestColumnTypeChangeIgnoreDisplayLength(t *testing.T) { func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Set time zone to UTC. @@ -1148,6 +1157,7 @@ func TestColumnTypeChangeFromDateTimeTypeToOthers(t *testing.T) { func TestColumnTypeChangeFromJsonToOthers(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Set time zone to UTC. @@ -1541,6 +1551,7 @@ func TestColumnTypeChangeFromJsonToOthers(t *testing.T) { func TestUpdateDataAfterChangeTimestampToDate(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t, t1") tk.MustExec("create table t (col timestamp default '1971-06-09' not null, col1 int default 1, unique key(col1));") @@ -1580,6 +1591,50 @@ func TestRowFormat(t *testing.T) { tk.MustExec("drop table if exists t") } +func TestRowFormatWithChecksums(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key, v varchar(10))") + tk.MustExec("insert into t values (1, \"123\");") + tk.MustExec("alter table t modify column v varchar(5);") + + tbl := external.GetTableByName(t, tk, "test", "t") + encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + + h := helper.NewHelper(store.(helper.Storage)) + data, err := h.GetMvccByEncodedKey(encodedKey) + require.NoError(t, err) + // row value with checksums + require.Equal(t, []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x8, 0x52, 0x78, 0xc9, 0x28, 0x52, 0x78, 0xc9, 0x28}, data.Info.Writes[0].ShortValue) + tk.MustExec("drop table if exists t") +} + +func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key, v varchar(10))") + tk.MustExec("insert into t values (1, \"123\")") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill", "return")) + defer failpoint.Disable("github.com/pingcap/tidb/ddl/forceRowLevelChecksumOnUpdateColumnBackfill") + tk.MustExec("alter table t add column vv int, modify column v varchar(5)") + + tbl := external.GetTableByName(t, tk, "test", "t") + encodedKey := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + + h := helper.NewHelper(store.(helper.Storage)) + data, err := h.GetMvccByEncodedKey(encodedKey) + require.NoError(t, err) + // checksum skipped and with a null col vv + require.Equal(t, []byte{0x80, 0x0, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33}, data.Info.Writes[0].ShortValue) + tk.MustExec("drop table if exists t") +} + // Close issue #22395 // Background: // Since the changing column is implemented as adding a new column and substitute the old one when it finished. @@ -1589,6 +1644,7 @@ func TestRowFormat(t *testing.T) { func TestChangingColOriginDefaultValue(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk1 := testkit.NewTestKit(t, store) @@ -1666,6 +1722,7 @@ func TestChangingColOriginDefaultValue(t *testing.T) { func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk1 := testkit.NewTestKit(t, store) @@ -1753,6 +1810,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastSucc(t *testing.T) { func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk1 := testkit.NewTestKit(t, store) @@ -1810,6 +1868,7 @@ func TestChangingColOriginDefaultValueAfterAddColAndCastFail(t *testing.T) { func TestChangingAttributeOfColumnWithFK(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") prepare := func() { @@ -1845,6 +1904,7 @@ func TestChangingAttributeOfColumnWithFK(t *testing.T) { func TestAlterPrimaryKeyToNull(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t, t1") @@ -1881,6 +1941,7 @@ func TestChangeUnsignedIntToDatetime(t *testing.T) { func TestDDLExitWhenCancelMeetPanic(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -1923,6 +1984,7 @@ func TestDDLExitWhenCancelMeetPanic(t *testing.T) { func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -1953,6 +2015,7 @@ func TestChangeIntToBitWillPanicInBackfillIndexes(t *testing.T) { func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockInfiniteReorgLogic", `return(true)`)) @@ -2005,6 +2068,7 @@ func TestCancelCTCInReorgStateWillCauseGoroutineLeak(t *testing.T) { func TestCTCShouldCastTheDefaultValue(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -2036,6 +2100,7 @@ func TestCTCShouldCastTheDefaultValue(t *testing.T) { func TestCTCCastBitToBinary(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // For point 1: @@ -2200,6 +2265,7 @@ func TestChangeFromTimeToYear(t *testing.T) { func TestCastDateToTimestampInReorgAttribute(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 600*time.Millisecond) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("CREATE TABLE `t` (`a` DATE NULL DEFAULT '8497-01-06')") @@ -2263,6 +2329,7 @@ func TestForIssue24621(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a char(250));") @@ -2274,6 +2341,7 @@ func TestForIssue24621(t *testing.T) { func TestChangeNullValueFromOtherTypeToTimestamp(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // Some ddl cases. @@ -2316,6 +2384,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) { store := testkit.CreateMockStore(t) // issue #31372 tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") prepare := func(createTableStmt string) { @@ -2342,6 +2411,7 @@ func TestColumnTypeChangeBetweenFloatAndDouble(t *testing.T) { func TestColumnTypeChangeTimestampToInt(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") // 1. modify a timestamp column to bigint @@ -2430,6 +2500,7 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) { func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_row_level_checksum = 1") tk.MustExec("use test") tk.MustExec("create table t (a int)") diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index f9d5039b41351..eb2e20df13b63 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3310,6 +3310,16 @@ func ResolveAlterTableSpec(ctx sessionctx.Context, specs []*ast.AlterTableSpec) return validSpecs, nil } +func isMultiSchemaChanges(specs []*ast.AlterTableSpec) bool { + if len(specs) > 1 { + return true + } + if len(specs) == 1 && len(specs[0].NewColumns) > 1 && specs[0].Tp == ast.AlterTableAddColumns { + return true + } + return false +} + func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast.AlterTableStmt) (err error) { ident := ast.Ident{Schema: stmt.Table.Schema, Name: stmt.Table.Name} validSpecs, err := ResolveAlterTableSpec(sctx, stmt.Specs) @@ -3333,6 +3343,9 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast return dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Alter Table") } } + if isMultiSchemaChanges(validSpecs) && (sctx.GetSessionVars().EnableRowLevelChecksum || variable.EnableRowLevelChecksum.Load()) { + return dbterror.ErrRunMultiSchemaChanges.GenWithStack("Unsupported multi schema change when row level checksum is enabled") + } // set name for anonymous foreign key. maxForeignKeyID := tb.Meta().MaxForeignKeyID for _, spec := range validSpecs { diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 1f6a52bcce244..2c9a1751b2b29 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1260,6 +1261,27 @@ func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) { dom.DDL().SetHook(originHook) } +func TestMultiSchemaChangeBlockedByRowLevelChecksum(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + + orig := variable.EnableRowLevelChecksum.Load() + defer variable.EnableRowLevelChecksum.Store(orig) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (c int)") + + variable.EnableRowLevelChecksum.Store(true) + tk.Session().GetSessionVars().EnableRowLevelChecksum = false + tk.MustGetErrCode("alter table t add column c1 int, add column c2 int", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add (c1 int, c2 int)", errno.ErrUnsupportedDDLOperation) + + variable.EnableRowLevelChecksum.Store(false) + tk.Session().GetSessionVars().EnableRowLevelChecksum = true + tk.MustGetErrCode("alter table t add column c1 int, add column c2 int", errno.ErrUnsupportedDDLOperation) + tk.MustGetErrCode("alter table t add (c1 int, c2 int)", errno.ErrUnsupportedDDLOperation) +} + type cancelOnceHook struct { store kv.Storage triggered bool diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 7e75202ee5a4d..540815919fa98 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1415,6 +1415,9 @@ type SessionVars struct { // Enable late materialization: push down some selection condition to tablescan. EnableLateMaterialization bool + // EnableRowLevelChecksum indicates whether row level checksum is enabled. + EnableRowLevelChecksum bool + // TiFlashComputeDispatchPolicy indicates how to dipatch task to tiflash_compute nodes. // Only for disaggregated-tiflash mode. TiFlashComputeDispatchPolicy tiflashcompute.DispatchPolicy @@ -1617,6 +1620,12 @@ func (s *SessionVars) IsDynamicPartitionPruneEnabled() bool { return PartitionPruneMode(s.PartitionPruneMode.Load()) == Dynamic } +// IsRowLevelChecksumEnabled indicates whether row level checksum is enabled for current session, that is +// tidb_enable_row_level_checksum is on and tidb_row_format_version is 2 and it's not a internal session. +func (s *SessionVars) IsRowLevelChecksumEnabled() bool { + return s.EnableRowLevelChecksum && s.RowEncoder.Enable && !s.InRestrictedSQL +} + // BuildParserConfig generate parser.ParserConfig for initial parser func (s *SessionVars) BuildParserConfig() parser.ParserConfig { return parser.ParserConfig{ @@ -1932,6 +1941,9 @@ func NewSessionVars(hctx HookContext) *SessionVars { if !EnableLocalTxn.Load() { vars.TxnScope = kv.NewGlobalTxnScopeVar() } + if EnableRowLevelChecksum.Load() { + vars.EnableRowLevelChecksum = true + } vars.systems[CharacterSetConnection], vars.systems[CollationConnection] = charset.GetDefaultCharsetAndCollate() return vars } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index b5f774384348d..d43bfd274ded7 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -59,6 +59,7 @@ func TestSetSystemVariable(t *testing.T) { {variable.TiDBMemQuotaQuery, "1024", false}, {variable.TiDBMemQuotaApplyCache, "1024", false}, {variable.TiDBEnableStmtSummary, "1", true}, // now global only + {variable.TiDBEnableRowLevelChecksum, "1", true}, } for _, tc := range testCases { diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 0492be84acee9..015d2f33b1e01 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1304,6 +1304,15 @@ var defaultSysVars = []*SysVar{ } return nil }}, + {Scope: ScopeGlobal, Name: TiDBEnableRowLevelChecksum, Value: BoolToOnOff(DefTiDBEnableRowLevelChecksum), Type: TypeBool, + GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) { + return BoolToOnOff(EnableRowLevelChecksum.Load()), nil + }, + SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error { + EnableRowLevelChecksum.Store(TiDBOptOn(s)) + return nil + }, + }, {Scope: ScopeGlobal | ScopeSession, Name: SQLSelectLimit, Value: "18446744073709551615", Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64, SetSession: func(s *SessionVars, val string) error { result, err := strconv.ParseUint(val, 10, 64) if err != nil { diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index 339f6e1d637a5..568d7a45691dd 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -1211,3 +1211,30 @@ func TestTiDBEnableResourceControl(t *testing.T) { require.Equal(t, On, val) require.Equal(t, enable, true) } + +func TestTiDBEnableRowLevelChecksum(t *testing.T) { + ctx := context.Background() + vars := NewSessionVars(nil) + mock := NewMockGlobalAccessor4Tests() + mock.SessionVars = vars + vars.GlobalVarsAccessor = mock + + // default to false + val, err := mock.GetGlobalSysVar(TiDBEnableRowLevelChecksum) + require.NoError(t, err) + require.Equal(t, Off, val) + + // enable + err = mock.SetGlobalSysVar(ctx, TiDBEnableRowLevelChecksum, On) + require.NoError(t, err) + val, err = mock.GetGlobalSysVar(TiDBEnableRowLevelChecksum) + require.NoError(t, err) + require.Equal(t, On, val) + + // disable + err = mock.SetGlobalSysVar(ctx, TiDBEnableRowLevelChecksum, Off) + require.NoError(t, err) + val, err = mock.GetGlobalSysVar(TiDBEnableRowLevelChecksum) + require.NoError(t, err) + require.Equal(t, Off, val) +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 63d74e62cc209..cd3aa1018ad77 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -175,6 +175,9 @@ const ( // TiDBRowFormatVersion is used to control tidb row format version current. TiDBRowFormatVersion = "tidb_row_format_version" + // TiDBEnableRowLevelChecksum is used to control whether to append checksum to row values. + TiDBEnableRowLevelChecksum = "tidb_enable_row_level_checksum" + // TiDBEnableTablePartition is used to control table partition feature. // The valid value include auto/on/off: // on or auto: enable table partition if the partition type is implemented. @@ -1264,6 +1267,7 @@ const ( DefTiDBOptEnableLateMaterialization = true DefTiDBOptOrderingIdxSelThresh = 0.0 DefTiDBPlanCacheInvalidationOnFreshStats = true + DefTiDBEnableRowLevelChecksum = false ) // Process global variables. @@ -1317,6 +1321,8 @@ var ( // EnableForeignKey indicates whether to enable foreign key feature. EnableForeignKey = atomic.NewBool(true) EnableRCReadCheckTS = atomic.NewBool(false) + // EnableRowLevelChecksum indicates whether to append checksum to row values. + EnableRowLevelChecksum = atomic.NewBool(DefTiDBEnableRowLevelChecksum) // DefTiDBServerMemoryLimit indicates the default value of TiDBServerMemoryLimit(TotalMem * 80%). // It should be a const and shouldn't be modified after tidb is started. diff --git a/table/tables/BUILD.bazel b/table/tables/BUILD.bazel index a23c69e4e7239..b64133e7c471f 100644 --- a/table/tables/BUILD.bazel +++ b/table/tables/BUILD.bazel @@ -94,9 +94,11 @@ go_test( "//sessionctx/stmtctx", "//sessionctx/variable", "//sessiontxn", + "//store/helper", "//table", "//tablecodec", "//testkit", + "//testkit/external", "//testkit/testsetup", "//types", "//util", @@ -111,6 +113,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_tipb//go-binlog", "@com_github_prometheus_client_model//go", + "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", "@org_golang_google_grpc//:grpc", diff --git a/table/tables/tables.go b/table/tables/tables.go index 2d702b79b5c0a..98af2ae53a35a 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "math" + "sort" "strconv" "strings" "sync" @@ -47,6 +48,7 @@ import ( "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/generatedexpr" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/tracing" @@ -71,6 +73,7 @@ type TableCommon struct { meta *model.TableInfo allocs autoid.Allocators sequence *sequenceCommon + dependencyColumnOffsets []int // recordPrefix and indexPrefix are generated using physicalTableID. recordPrefix kv.Key @@ -180,6 +183,11 @@ func initTableCommon(t *TableCommon, tblInfo *model.TableInfo, physicalTableID i if tblInfo.IsSequence() { t.sequence = &sequenceCommon{meta: tblInfo.Sequence} } + for _, col := range cols { + if col.ChangeStateInfo != nil { + t.dependencyColumnOffsets = append(t.dependencyColumnOffsets, col.ChangeStateInfo.DependencyColumnOffset) + } + } } // initTableIndices initializes the indices of the TableCommon. @@ -387,6 +395,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, var colIDs, binlogColIDs []int64 var row, binlogOldRow, binlogNewRow []types.Datum + var checksums []uint32 numColsCap := len(newData) + 1 // +1 for the extra handle column that we may need to append. colIDs = make([]int64, 0, numColsCap) row = make([]types.Datum, 0, numColsCap) @@ -395,6 +404,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, binlogOldRow = make([]types.Datum, 0, numColsCap) binlogNewRow = make([]types.Datum, 0, numColsCap) } + checksumData := t.initChecksumData(sctx, h) + needChecksum := len(checksumData) > 0 for _, col := range t.Columns { var value types.Datum @@ -410,6 +421,22 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, oldData = append(oldData, value) touched = append(touched, touched[col.DependencyColumnOffset]) } + if needChecksum { + if col.ChangeStateInfo != nil { + // TODO: Check overflow or ignoreTruncate. + v, err := table.CastValue(sctx, newData[col.DependencyColumnOffset], col.ColumnInfo, false, false) + if err != nil { + return err + } + checksumData = t.appendInChangeColForChecksum(sctx, h, checksumData, col.ToInfo(), &newData[col.DependencyColumnOffset], &v) + } else { + v, err := table.GetColOriginDefaultValue(sctx, col.ToInfo()) + if err != nil { + return err + } + checksumData = t.appendNonPublicColForChecksum(sctx, h, checksumData, col.ToInfo(), &v) + } + } continue } if col.State != model.StatePublic { @@ -425,9 +452,13 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } newData[col.Offset] = value touched[col.Offset] = touched[col.DependencyColumnOffset] + checksumData = t.appendInChangeColForChecksum(sctx, h, checksumData, col.ToInfo(), &newData[col.DependencyColumnOffset], &value) + } else if needChecksum { + checksumData = t.appendNonPublicColForChecksum(sctx, h, checksumData, col.ToInfo(), &value) } } else { value = newData[col.Offset] + checksumData = t.appendPublicColForChecksum(sctx, h, checksumData, col.ToInfo(), &value) } if !t.canSkip(col, &value) { colIDs = append(colIDs, col.ID) @@ -458,13 +489,16 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } } + writeBufs := sessVars.GetWriteStmtBufs() + adjustRowValuesBuf(writeBufs, len(row)) key := t.RecordKey(h) sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder - value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil, rd) + checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, h, checksumData, writeBufs.RowValBuf) + writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...) if err != nil { return err } - if err = memBuffer.Set(key, value); err != nil { + if err = memBuffer.Set(key, writeBufs.RowValBuf); err != nil { return err } @@ -798,6 +832,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . var colIDs, binlogColIDs []int64 var row, binlogRow []types.Datum + var checksums []uint32 if recordCtx, ok := sctx.Value(addRecordCtxKey).(*CommonAddRecordCtx); ok { colIDs = recordCtx.colIDs[:0] row = recordCtx.row[:0] @@ -810,9 +845,30 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . defer memBuffer.Cleanup(sh) sessVars := sctx.GetSessionVars() + checksumData := t.initChecksumData(sctx, recordID) + needChecksum := len(checksumData) > 0 - for _, col := range t.WritableCols() { + for _, col := range t.Columns { var value types.Datum + if col.State == model.StateDeleteOnly || col.State == model.StateDeleteReorganization { + if needChecksum { + if col.ChangeStateInfo != nil { + // TODO: Check overflow or ignoreTruncate. + v, err := table.CastValue(sctx, r[col.DependencyColumnOffset], col.ColumnInfo, false, false) + if err != nil { + return nil, err + } + checksumData = t.appendInChangeColForChecksum(sctx, recordID, checksumData, col.ToInfo(), &r[col.DependencyColumnOffset], &v) + } else { + v, err := table.GetColOriginDefaultValue(sctx, col.ToInfo()) + if err != nil { + return nil, err + } + checksumData = t.appendNonPublicColForChecksum(sctx, recordID, checksumData, col.ToInfo(), &v) + } + } + continue + } // In column type change, since we have set the origin default value for changing col, but // for the new insert statement, we should use the casted value of relative column to insert. if col.ChangeStateInfo != nil && col.State != model.StatePublic { @@ -828,26 +884,36 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . } row = append(row, value) colIDs = append(colIDs, col.ID) + checksumData = t.appendInChangeColForChecksum(sctx, recordID, checksumData, col.ToInfo(), &r[col.DependencyColumnOffset], &value) continue } - if col.State != model.StatePublic && - // Update call `AddRecord` will already handle the write only column default value. - // Only insert should add default value for write only column. - !opt.IsUpdate { - // If col is in write only or write reorganization state, we must add it with its default value. - value, err = table.GetColOriginDefaultValue(sctx, col.ToInfo()) - if err != nil { - return nil, err - } - // add value to `r` for dirty db in transaction. - // Otherwise when update will panic cause by get value of column in write only state from dirty db. - if col.Offset < len(r) { - r[col.Offset] = value + if col.State == model.StatePublic { + value = r[col.Offset] + checksumData = t.appendPublicColForChecksum(sctx, recordID, checksumData, col.ToInfo(), &value) + } else { + // col.ChangeStateInfo must be nil here. + // because `col.State != model.StatePublic` is true here, if col.ChangeStateInfo is not nil, the col should + // be handle by the previous if-block. + + if opt.IsUpdate { + // If `AddRecord` is called by an update, the default value should be handled the update. + value = r[col.Offset] } else { - r = append(r, value) + // If `AddRecord` is called by an insert and the col is in write only or write reorganization state, we must + // add it with its default value. + value, err = table.GetColOriginDefaultValue(sctx, col.ToInfo()) + if err != nil { + return nil, err + } + // add value to `r` for dirty db in transaction. + // Otherwise when update will panic cause by get value of column in write only state from dirty db. + if col.Offset < len(r) { + r[col.Offset] = value + } else { + r = append(r, value) + } } - } else { - value = r[col.Offset] + checksumData = t.appendNonPublicColForChecksum(sctx, recordID, checksumData, col.ToInfo(), &value) } if !t.canSkip(col, &value) { colIDs = append(colIDs, col.ID) @@ -861,7 +927,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . logutil.BgLogger().Debug("addRecord", zap.Stringer("key", key)) sc, rd := sessVars.StmtCtx, &sessVars.RowEncoder - writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd) + checksums, writeBufs.RowValBuf = t.calcChecksums(sctx, recordID, checksumData, writeBufs.RowValBuf) + writeBufs.RowValBuf, err = tablecodec.EncodeRow(sc, row, colIDs, writeBufs.RowValBuf, writeBufs.AddRowValues, rd, checksums...) if err != nil { return nil, err } @@ -1649,6 +1716,153 @@ func (t *TableCommon) getMutation(ctx sessionctx.Context) *binlog.TableMutation return ctx.StmtGetMutation(t.tableID) } +// initChecksumData allocates data for checksum calculation, returns nil if checksum is disabled or unavailable. The +// length of returned data can be considered as the number of checksums we need to write. +func (t *TableCommon) initChecksumData(sctx sessionctx.Context, h kv.Handle) [][]rowcodec.ColData { + if !sctx.GetSessionVars().IsRowLevelChecksumEnabled() { + return nil + } + numNonPubCols := len(t.Columns) - len(t.Cols()) + if numNonPubCols > 1 { + logWithContext(sctx, logutil.BgLogger().Warn, + "skip checksum since the number of non-public columns is greater than 1", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("cols", t.meta.Columns)) + return nil + } + return make([][]rowcodec.ColData, 1+numNonPubCols) +} + +// calcChecksums calculates the checksums of input data. The arg `buf` is used to hold the temporary encoded col data +// and it will be reset for each col, so do NOT pass a buf that contains data you may use later. If the capacity of +// `buf` is enough, it gets returned directly, otherwise a new bytes with larger capacity will be returned, and you can +// hold the returned buf for later use (to avoid memory allocation). +func (t *TableCommon) calcChecksums(sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, buf []byte) ([]uint32, []byte) { + if len(data) == 0 { + return nil, buf + } + checksums := make([]uint32, len(data)) + for i, cols := range data { + row := rowcodec.RowData{Cols: cols, Data: buf} + if !sort.IsSorted(row) { + sort.Sort(row) + } + checksum, err := row.Checksum() + buf = row.Data + if err != nil { + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to encode error", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Error(err)) + return nil, buf + } + checksums[i] = checksum + } + return checksums, buf +} + +// appendPublicColForChecksum appends a public column data for checksum. If the column is in changing, that is, it's the +// old column of an on-going modify-column ddl, then skip it since it will be handle by `appendInChangeColForChecksum`. +func (t *TableCommon) appendPublicColForChecksum( + sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, +) [][]rowcodec.ColData { + if len(data) == 0 { // no need for checksum + return nil + } + if c.State != model.StatePublic { // assert col is public + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to inconsistent column state", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) + return nil + } + for _, offset := range t.dependencyColumnOffsets { + if c.Offset == offset { + // the col is in changing, skip it. + return data + } + } + // calculate the checksum with this col + data[0] = appendColForChecksum(data[0], t, c, d) + if len(data) > 1 { + // calculate the extra checksum with this col + data[1] = appendColForChecksum(data[1], t, c, d) + } + return data +} + +// appendNonPublicColForChecksum appends a non-public (but not in-changing) column data for checksum. Two checksums are +// required because there is a non-public column. The first checksum should be calculate with the original (or default) +// value of this column. The extra checksum shall be calculated without this non-public column, thus nothing to do with +// data[1]. +func (t *TableCommon) appendNonPublicColForChecksum( + sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, +) [][]rowcodec.ColData { + if size := len(data); size == 0 { // no need for checksum + return nil + } else if size == 1 { // assert that 2 checksums are required + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to inconsistent length of column data", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID)) + return nil + } + if c.State == model.StatePublic || c.ChangeStateInfo != nil { // assert col is not public and is not in changing + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to inconsistent column state", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) + return nil + } + data[0] = appendColForChecksum(data[0], t, c, d) + + return data +} + +// appendInChangeColForChecksum appends an in-changing column data for checksum. Two checksums are required because +// there is a non-public column. The first checksum should be calculate with the old version of this column and the extra +// checksum should be calculated with the new version of column. +func (t *TableCommon) appendInChangeColForChecksum( + sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, oldVal *types.Datum, newVal *types.Datum, +) [][]rowcodec.ColData { + if size := len(data); size == 0 { // no need for checksum + return nil + } else if size == 1 { // assert that 2 checksums are required + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to inconsistent length of column data", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID)) + return nil + } + if c.State == model.StatePublic || c.ChangeStateInfo == nil { // assert col is not public and is in changing + logWithContext(sctx, logutil.BgLogger().Error, + "skip checksum due to inconsistent column state", + zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) + return nil + } + // calculate the checksum with the old version of col + data[0] = appendColForChecksum(data[0], t, t.meta.Columns[c.DependencyColumnOffset], oldVal) + // calculate the extra checksum with the new version of col + data[1] = appendColForChecksum(data[1], t, c, newVal) + + return data +} + +func appendColForChecksum(dst []rowcodec.ColData, t *TableCommon, c *model.ColumnInfo, d *types.Datum) []rowcodec.ColData { + if c.IsGenerated() && !c.GeneratedStored { + return dst + } + if dst == nil { + dst = make([]rowcodec.ColData, 0, len(t.Columns)) + } + return append(dst, rowcodec.ColData{ColumnInfo: c, Datum: d}) +} + +func logWithContext(sctx sessionctx.Context, log func(msg string, fields ...zap.Field), msg string, fields ...zap.Field) { + sessVars := sctx.GetSessionVars() + ctxFields := make([]zap.Field, 0, len(fields)+2) + ctxFields = append(ctxFields, zap.Uint64("conn", sessVars.ConnectionID)) + if sessVars.TxnCtx != nil { + ctxFields = append(ctxFields, zap.Uint64("txnStartTS", sessVars.TxnCtx.StartTS)) + } + ctxFields = append(ctxFields, fields...) + log(msg, ctxFields...) +} + func (t *TableCommon) canSkip(col *table.Column, value *types.Datum) bool { return CanSkip(t.Meta(), col, value) } diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index 46981958fbeb5..5d857d5adedfb 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -18,7 +18,9 @@ import ( "context" "fmt" "math" + "sort" "strconv" + "sync/atomic" "testing" "time" @@ -33,13 +35,17 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/external" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tipb/go-binlog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" ) @@ -943,3 +949,836 @@ func TestTxnAssertion(t *testing.T) { testUntouchedIndexImpl("OFF", false) testUntouchedIndexImpl("OFF", true) } + +func TestWriteWithChecksums(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + h := helper.NewHelper(store.(helper.Storage)) + + tkDDL := testkit.NewTestKit(t, store) + tkDDL.MustExec("set global tidb_enable_row_level_checksum = 1") + tkDDL.MustExec("use test") + + tkDML := testkit.NewTestKit(t, store) + tkDML.MustExec("use test") + + type col struct { + ID int64 + Type byte + Data types.Datum + } + isDMLBeforeDDL := func(seq int64) bool { return seq == -1 } + isDMLAfterDDL := func(seq int64) bool { return seq == -2 } + + for _, tt := range []struct { + name string + init []string + schema []col + ddl string + dml func(seq int64, job *model.Job) ([]byte, [][]col) + }{ + { + name: "AddRecord/AddColumn", + init: []string{"create table t (id int primary key, c1 int)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "AddRecord/AddColumnWithDefault", + init: []string{"create table t (id int primary key, c1 int)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int default 42", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(42)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "AddRecord/AddColumnNotNull", + init: []string{"create table t (id int primary key, c1 int)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int not null", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + if isDMLAfterDDL(seq) { + tkDML.MustExec("insert into t (id, c1, c2) values (?, ?, ?)", seq, seq+1, seq+2) + } else { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + } + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(seq + 2)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "AddRecord/DropColumn", + init: []string{"create table t (id int primary key, c1 int, c2 int)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "AddRecord/DropColumnWithDefault", + init: []string{"create table t (id int primary key, c1 int, c2 int default 42)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(42)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + return key, [][]col{col2, col3} + }, + }, + { + name: "AddRecord/DropColumnNotNull", + init: []string{"create table t (id int primary key, c1 int, c2 int not null)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + if isDMLBeforeDDL(seq) { + tkDML.MustExec("insert into t (id, c1, c2) values (?, ?, ?)", seq, seq+1, seq+2) + } else { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + } + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(seq + 2)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + return key, [][]col{col2, col3} + }, + }, + { + name: "AddRecord/ChangeColumnType", + init: []string{"create table t (id int primary key, c1 int)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeVarchar}, + }, + ddl: "alter table t change column c1 c1 varchar(10)", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeVarchar, types.NewDatum(strconv.FormatInt(seq+1, 10))}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "AddRecord/ChangeColumnTypeFloat", + init: []string{"create table t (id int primary key, c1 float)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeFloat}, + {ID: 3, Type: mysql.TypeDouble}, + }, + ddl: "alter table t change column c1 c1 double", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + v := float64(seq) * 3.14 + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, v) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeFloat, types.NewDatum(float32(v))}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeFloat, types.NewDatum(float64(float32(v)))}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeDouble, types.NewDatum(v)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "AddRecord/ChangeColumnTypeDouble", + init: []string{"create table t (id int primary key, c1 double)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeDouble}, + {ID: 3, Type: mysql.TypeFloat}, + }, + ddl: "alter table t change column c1 c1 float", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + v := float64(seq) * 3.14 + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, v) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeDouble, types.NewDatum(v)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeFloat, types.NewDatum(float32(v))}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "AddRecord/SetColumnDefault", + init: []string{"create table t (id int primary key, c1 int, c2 int default 1)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t alter column c2 set default 42", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(1)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(42)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, nil + }, + }, + { + name: "AddRecord/DropColumnDefault", + init: []string{"create table t (id int primary key, c1 int, c2 int default 42)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t alter column c2 drop default", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + if isDMLAfterDDL(seq) { + tkDML.MustExec("insert into t (id, c1, c2) values (?, ?, ?)", seq, seq+1, seq+2) + } else { + tkDML.MustExec("insert into t (id, c1) values (?, ?)", seq, seq+1) + } + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(seq)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(42)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(seq)}, + {2, mysql.TypeLong, types.NewDatum(seq + 1)}, + {3, mysql.TypeLong, types.NewDatum(seq + 2)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, nil + }, + }, + { + name: "UpdateRecord/AddColumn", + init: []string{"create table t (id int primary key, c1 int)", "insert into t values (1, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "UpdateRecord/AddColumnWithDefault", + init: []string{"create table t (id int primary key, c1 int)", "insert into t values (1, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int default 42", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(42)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "UpdateRecord/AddColumnNotNull", + init: []string{"create table t (id int primary key, c1 int)", "insert into t values (1, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t add column c2 int not null", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col2, col1} + }, + }, + { + name: "UpdateRecord/DropColumn", + init: []string{"create table t (id int primary key, c1 int, c2 int)", "insert into t values (1, 0, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + if job.SchemaState == model.StateWriteOnly { + return key, [][]col{col1, col3} + } + return key, [][]col{col2, col3} + }, + }, + { + name: "UpdateRecord/DropColumnWithDefault", + init: []string{"create table t (id int primary key, c1 int, c2 int default 42)", "insert into t values (1, 0, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(nil)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + if job.SchemaState == model.StateWriteOnly { + return key, [][]col{col1, col3} + } + return key, [][]col{col2, col3} + }, + }, + { + name: "UpdateRecord/DropColumnNotNull", + init: []string{"create table t (id int primary key, c1 int, c2 int not null)", "insert into t values (1, 0, 10)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeLong}, + }, + ddl: "alter table t drop column c2", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(10)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + {3, mysql.TypeLong, types.NewDatum(0)}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + if job.SchemaState == model.StateWriteOnly { + return key, [][]col{col1, col3} + } + return key, [][]col{col2, col3} + }, + }, + { + name: "UpdateRecord/ChangeColumnType", + init: []string{"create table t (id int primary key, c1 int)", "insert into t values (1, 0)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeLong}, + {ID: 3, Type: mysql.TypeVarchar}, + }, + ddl: "alter table t change column c1 c1 varchar(10)", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + tkDML.MustExec("update t set c1 = ? where id = 1", seq) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeLong, types.NewDatum(seq)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {3, mysql.TypeVarchar, types.NewDatum(strconv.FormatInt(seq, 10))}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "UpdateRecord/ChangeColumnTypeFloat", + init: []string{"create table t (id int primary key, c1 float)", "insert into t values (1, 3.14)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeFloat}, + {ID: 3, Type: mysql.TypeDouble}, + }, + ddl: "alter table t change column c1 c1 double", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + v := float64(seq) * 3.14 + tkDML.MustExec("update t set c1 = ? where id = 1", v) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeFloat, types.NewDatum(float32(v))}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeFloat, types.NewDatum(float64(float32(v)))}, + } + col3 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {3, mysql.TypeDouble, types.NewDatum(v)}, + } + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col3} + } + return key, [][]col{col1, col2} + }, + }, + { + name: "UpdateRecord/ChangeColumnTypeDouble", + init: []string{"create table t (id int primary key, c1 double)", "insert into t values (1, 3.14)"}, + schema: []col{ + {ID: 1, Type: mysql.TypeLong}, + {ID: 2, Type: mysql.TypeDouble}, + {ID: 3, Type: mysql.TypeFloat}, + }, + ddl: "alter table t change column c1 c1 float", + dml: func(seq int64, job *model.Job) ([]byte, [][]col) { + v := float64(seq) * 3.14 + tkDML.MustExec("update t set c1 = ? where id = 1", v) + tbl := external.GetTableByName(t, tkDML, "test", "t") + key := tablecodec.EncodeRowKeyWithHandle(tbl.Meta().ID, kv.IntHandle(1)) + col1 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeDouble, types.NewDatum(v)}, + } + col2 := []col{ + {1, mysql.TypeLong, types.NewDatum(1)}, + {2, mysql.TypeFloat, types.NewDatum(float32(v))}, + } + + if isDMLBeforeDDL(seq) { + return key, [][]col{col1} + } + if isDMLAfterDDL(seq) { + return key, [][]col{col2} + } + return key, [][]col{col1, col2} + }, + }, + } { + // build row decoder for extracting checksums from row + cols := make([]rowcodec.ColInfo, len(tt.schema)) + for i, col := range tt.schema { + cols[i] = rowcodec.ColInfo{ID: col.ID, Ft: types.NewFieldType(col.Type)} + } + dec := rowcodec.NewDatumMapDecoder(cols, time.UTC) + // build a function for executing dml and validating results + doDML := func(t *testing.T, seq int64, job *model.Job) { + key, rows := tt.dml(seq, job) + // get actualChecksums in row value + actualChecksums := make([]uint32, 0, 2) + data, err := h.GetMvccByEncodedKey(key) + assert.NoError(t, err) + _, err = dec.DecodeToDatumMap(data.Info.Writes[0].ShortValue, nil) + assert.NoError(t, err) + if checksum, ok := dec.GetChecksum(); ok { + actualChecksums = append(actualChecksums, checksum) + if checksum, ok := dec.GetExtraChecksum(); ok { + actualChecksums = append(actualChecksums, checksum) + } + } + // calc expected checksums from row data + expectChecksums := make([]uint32, 0, 2) + for _, row := range rows { + cols := make([]rowcodec.ColData, len(row)) + for i := range row { + ft := types.NewFieldType(row[i].Type) + cols[i] = rowcodec.ColData{ + ColumnInfo: &model.ColumnInfo{ID: row[i].ID, FieldType: *ft}, + Datum: &row[i].Data, + } + } + data := rowcodec.RowData{Cols: cols} + sort.Sort(data) + checksum, err := data.Checksum() + assert.NoError(t, err) + expectChecksums = append(expectChecksums, checksum) + } + // validate checksums + assert.Equal(t, expectChecksums, actualChecksums) + } + + // init and run sub test + tkDDL.MustExec("drop table if exists t") + for _, sql := range tt.init { + tkDDL.MustExec(sql) + } + t.Run(tt.name, func(t *testing.T) { + origHook := dom.DDL().GetHook() + defer dom.DDL().SetHook(origHook) + + var seq int64 + dom.DDL().SetHook(&ddlCallback{ + onJobUpdated: func(job *model.Job) { + if job.State != model.JobStateRunning { + return + } + doDML(t, atomic.AddInt64(&seq, 1), job) + }, + }) + + doDML(t, -1, nil) + tkDDL.MustExec(tt.ddl) + doDML(t, -2, nil) + }) + tkDDL.MustExec("admin check table t") + } +} + +type ddlCallback struct { + onChanged func(err error) error + onSchemaStateChanged func(schemaVer int64) + onJobRunBefore func(job *model.Job) + onJobRunAfter func(job *model.Job) + onJobUpdated func(job *model.Job) + onWatched func(ctx context.Context) + onGetJobBefore func(jobType string) + onGetJobAfter func(jobType string, job *model.Job) +} + +func (cb *ddlCallback) OnChanged(err error) error { + if cb.onChanged != nil { + return cb.onChanged(err) + } + return err +} + +func (cb *ddlCallback) OnSchemaStateChanged(schemaVer int64) { + if cb.onSchemaStateChanged != nil { + cb.onSchemaStateChanged(schemaVer) + } +} + +func (cb *ddlCallback) OnJobRunBefore(job *model.Job) { + if cb.onJobRunBefore != nil { + cb.onJobRunBefore(job) + } +} + +func (cb *ddlCallback) OnJobRunAfter(job *model.Job) { + if cb.onJobRunAfter != nil { + cb.onJobRunAfter(job) + } +} +func (cb *ddlCallback) OnJobUpdated(job *model.Job) { + if cb.onJobUpdated != nil { + cb.onJobUpdated(job) + } +} +func (cb *ddlCallback) OnWatched(ctx context.Context) { + if cb.onWatched != nil { + cb.onWatched(ctx) + } +} +func (cb *ddlCallback) OnGetJobBefore(jobType string) { + if cb.onGetJobBefore != nil { + cb.onGetJobBefore(jobType) + } +} + +func (cb *ddlCallback) OnGetJobAfter(jobType string, job *model.Job) { + if cb.onGetJobAfter != nil { + cb.onGetJobAfter(jobType, job) + } +} diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 8b01fb3ca8962..c4061ec5c89b1 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -322,12 +322,12 @@ func EncodeValue(sc *stmtctx.StatementContext, b []byte, raw types.Datum) ([]byt // EncodeRow encode row data and column ids into a slice of byte. // valBuf and values pass by caller, for reducing EncodeRow allocates temporary bufs. If you pass valBuf and values as nil, // EncodeRow will allocate it. -func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64, valBuf []byte, values []types.Datum, e *rowcodec.Encoder) ([]byte, error) { +func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64, valBuf []byte, values []types.Datum, e *rowcodec.Encoder, checksums ...uint32) ([]byte, error) { if len(row) != len(colIDs) { return nil, errors.Errorf("EncodeRow error: data and columnID count not match %d vs %d", len(row), len(colIDs)) } if e.Enable { - return e.Encode(sc, colIDs, row, valBuf) + return e.Encode(sc, colIDs, row, valBuf, checksums...) } return EncodeOldRow(sc, row, colIDs, valBuf, values) } diff --git a/util/rowcodec/common.go b/util/rowcodec/common.go index 33cb2dee7760b..7cd31a32ebc22 100644 --- a/util/rowcodec/common.go +++ b/util/rowcodec/common.go @@ -309,7 +309,7 @@ func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte, defer func() { if x := recover(); x != nil { // catch panic when datum and type mismatch - err = errors.Annotate(x.(error), "encode datum for checksum") + err = errors.Annotatef(x.(error), "encode datum(%s) as %s for checksum", dat.String(), types.TypeStr(typ)) } }() if dat.IsNull() {