Skip to content

Commit

Permalink
test(ticdc): add more unit test to cover checksum functionality (#8990)
Browse files Browse the repository at this point in the history
close #8991
  • Loading branch information
3AceShowHand committed May 23, 2023
1 parent fbb363a commit 899a96a
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 123 deletions.
247 changes: 196 additions & 51 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -461,6 +462,23 @@ func walkTableSpanInStore(t *testing.T, store tidbkv.Storage, tableID int64, f f
}
}

func getLastKeyValueInStore(t *testing.T, store tidbkv.Storage, tableID int64) (key, value []byte) {
txn, err := store.Begin()
require.NoError(t, err)
defer txn.Rollback() //nolint:errcheck
startKey, endKey := spanz.GetTableRange(tableID)
kvIter, err := txn.Iter(startKey, endKey)
require.NoError(t, err)
defer kvIter.Close()
for kvIter.Valid() {
key = kvIter.Key()
value = kvIter.Value()
err = kvIter.Next()
require.NoError(t, err)
}
return key, value
}

// We use OriginDefaultValue instead of DefaultValue in the ut, pls ref to
// https://github.com/pingcap/tiflow/issues/4048
// FIXME: OriginDefaultValue seems always to be string, and test more corner case
Expand Down Expand Up @@ -990,9 +1008,141 @@ func TestGetDefaultZeroValue(t *testing.T) {
}
}

func TestDecodeRowEnableChecksum(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

tk := helper.Tk()

tk.MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
filter, err := filter.NewFilter(replicaConfig, "")
require.NoError(t, err)

ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
require.NoError(t, err)

changefeed := model.DefaultChangeFeedID("changefeed-test-decode-row")
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)
require.NotNil(t, schemaStorage)

createTableDDL := "create table t (id int primary key, a int)"
job := helper.DDL2Job(createTableDDL)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
schemaStorage.AdvanceResolvedTs(ver.Ver)

mounter := NewMounter(schemaStorage, changefeed, time.Local,
filter, true, replicaConfig.Integrity).(*mounter)

ctx := context.Background()

tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "t")
require.True(t, ok)

// row without checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = false
tk.MustExec("insert into t values (1, 10)")

key, value := getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV := &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
// the upstream tidb does not enable checksum, so the checksum is nil
require.Nil(t, row.Checksum)

// row with one checksum
tk.Session().GetSessionVars().EnableRowLevelChecksum = true
tk.MustExec("insert into t values (2, 20)")

key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV = &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row.Checksum)

expected, ok := mounter.decoder.GetChecksum()
require.True(t, ok)
require.Equal(t, expected, row.Checksum.Current)
require.False(t, row.Checksum.Corrupted)

// row with 2 checksum
tk.MustExec("insert into t values (3, 30)")
job = helper.DDL2Job("alter table t change column a a varchar(10)")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

key, value = getLastKeyValueInStore(t, helper.Storage(), tableInfo.ID)
rawKV = &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
require.NotNil(t, row.Checksum)

first, ok := mounter.decoder.GetChecksum()
require.True(t, ok)

extra, ok := mounter.decoder.GetExtraChecksum()
require.True(t, ok)

if row.Checksum.Current != first {
require.Equal(t, extra, row.Checksum.Current)
} else {
require.Equal(t, first, row.Checksum.Current)
}
require.False(t, row.Checksum.Corrupted)

// hack the table info to make the checksum corrupted
tableInfo.Columns[0].ID = 3

// corrupt-handle-level default to warn, so no error, but the checksum is corrupted
row, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row.Checksum)
require.True(t, row.Checksum.Corrupted)

mounter.integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError
_, err = mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.Error(t, err)
require.ErrorIs(t, err, cerror.ErrCorruptedDataMutation)

job = helper.DDL2Job("drop table t")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}

func TestDecodeRow(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("set @@tidb_enable_clustered_index=1;")
helper.Tk().MustExec("use test;")

Expand All @@ -1003,72 +1153,67 @@ func TestDecodeRow(t *testing.T) {

cfg := config.GetDefaultReplicaConfig()

cfgWithChecksumEnabled := config.GetDefaultReplicaConfig()
cfgWithChecksumEnabled.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness

for _, c := range []*config.ReplicaConfig{cfg, cfgWithChecksumEnabled} {
filter, err := filter.NewFilter(c, "")
require.NoError(t, err)

schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)
filter, err := filter.NewFilter(cfg, "")
require.NoError(t, err)

// apply ddl to schemaStorage
ddl := "create table test.student(id int primary key, name char(50), age int, gender char(10))"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
schemaStorage, err := NewSchemaStorage(helper.GetCurrentMeta(),
ver.Ver, false, changefeed, util.RoleTester, filter)
require.NoError(t, err)

ts := schemaStorage.GetLastSnapshot().CurrentTs()
// apply ddl to schemaStorage
ddl := "create table test.student(id int primary key, name char(50), age int, gender char(10))"
job := helper.DDL2Job(ddl)
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)

schemaStorage.AdvanceResolvedTs(ver.Ver)
ts := schemaStorage.GetLastSnapshot().CurrentTs()

mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)
schemaStorage.AdvanceResolvedTs(ver.Ver)

helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)
mounter := NewMounter(
schemaStorage, changefeed, time.Local, filter, true, cfg.Integrity).(*mounter)

ctx := context.Background()
decodeAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) {
walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) {
rawKV := f(key, value)
helper.Tk().MustExec(`insert into student values(1, "dongmen", 20, "male")`)
helper.Tk().MustExec(`update student set age = 27 where id = 1`)

row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)
ctx := context.Background()
decodeAndCheckRowInTable := func(tableID int64, f func(key []byte, value []byte) *model.RawKVEntry) {
walkTableSpanInStore(t, helper.Storage(), tableID, func(key []byte, value []byte) {
rawKV := f(key, value)

if row.Columns != nil {
require.NotNil(t, mounter.decoder)
}
row, err := mounter.unmarshalAndMountRowChanged(ctx, rawKV)
require.NoError(t, err)
require.NotNil(t, row)

if row.PreColumns != nil {
require.NotNil(t, mounter.preDecoder)
}
})
}
if row.Columns != nil {
require.NotNil(t, mounter.decoder)
}

toRawKV := func(key []byte, value []byte) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
if row.PreColumns != nil {
require.NotNil(t, mounter.preDecoder)
}
})
}

toRawKV := func(key []byte, value []byte) *model.RawKVEntry {
return &model.RawKVEntry{
OpType: model.OpTypePut,
Key: key,
Value: value,
StartTs: ts - 1,
CRTs: ts + 1,
}
}

tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)
tableInfo, ok := schemaStorage.GetLastSnapshot().TableByName("test", "student")
require.True(t, ok)

decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)
decodeAndCheckRowInTable(tableInfo.ID, toRawKV)

job = helper.DDL2Job("drop table student")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}
job = helper.DDL2Job("drop table student")
err = schemaStorage.HandleDDLJob(job)
require.NoError(t, err)
}

// TestDecodeEventIgnoreRow tests a PolymorphicEvent.Row is nil
Expand Down

0 comments on commit 899a96a

Please sign in to comment.