Skip to content

Commit

Permalink
codec(ticdc): csv and maxwell unit test use schema test helper to red…
Browse files Browse the repository at this point in the history
…uce code complexity (#11223)

close #11225
  • Loading branch information
3AceShowHand authored Jun 7, 2024
1 parent 277dd7d commit 7dfd11c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 147 deletions.
48 changes: 11 additions & 37 deletions pkg/sink/codec/csv/csv_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ import (
"context"
"testing"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
Expand All @@ -32,45 +30,21 @@ func TestCSVBatchDecoder(t *testing.T) {
"U","employee","hr",433305438660591630,102,"Alex","Alice","2018-06-15","Beijing"
`
ctx := context.Background()
tableInfo := &model.TableInfo{
TableName: model.TableName{
Schema: "hr",
Table: "employee",
},
TableInfo: &timodel.TableInfo{
Name: timodel.NewCIStr("employee"),
Columns: []*timodel.ColumnInfo{
{
Name: timodel.NewCIStr("Id"),
FieldType: *types.NewFieldType(mysql.TypeInt24),
},
{
Name: timodel.NewCIStr("LastName"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
{
Name: timodel.NewCIStr("FirstName"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
{
Name: timodel.NewCIStr("HireDate"),
FieldType: *types.NewFieldType(mysql.TypeDate),
},
{
Name: timodel.NewCIStr("OfficeLocation"),
FieldType: *types.NewFieldType(mysql.TypeVarchar),
},
},
},
}
decoder, err := NewBatchDecoder(ctx, &common.Config{
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

_ = helper.DDL2Event("create database hr")
createTableDDL := helper.DDL2Event("create table hr.employee(Id int, LastName varchar(255), FirstName varchar(255), HireDate date, OfficeLocation varchar(255))")

codecConfig := &common.Config{
Delimiter: ",",
Quote: "\"",
Terminator: "\n",
NullString: "\\N",
IncludeCommitTs: true,
}, tableInfo, []byte(csvData))
require.Nil(t, err)
}
decoder, err := NewBatchDecoder(ctx, codecConfig, createTableDDL.TableInfo, []byte(csvData))
require.NoError(t, err)

for i := 0; i < 5; i++ {
tp, hasNext, err := decoder.HasNext()
Expand Down
48 changes: 16 additions & 32 deletions pkg/sink/codec/csv/csv_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,29 @@ package csv
import (
"testing"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)

func TestCSVBatchCodec(t *testing.T) {
tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{
Name: "tiny",
Type: mysql.TypeTiny,
}}, nil)
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ddl := helper.DDL2Event("create table test.table1(col1 int primary key)")
event1 := helper.DML2Event("insert into test.table1 values (1)", "test", "table1")
event2 := helper.DML2Event("insert into test.table1 values (2)", "test", "table1")

testCases := []*model.SingleTableTxn{
{
Rows: []*model.RowChangedEvent{
{
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{
Name: "tiny",
Value: int64(1),
}}, tableInfo),
},
{
CommitTs: 2,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{
Name: "tiny",
Value: int64(2),
}}, tableInfo),
},
event1,
event2,
},
},
{
TableInfo: tableInfo,
TableInfo: ddl.TableInfo,
Rows: nil,
},
}
Expand Down Expand Up @@ -85,19 +74,14 @@ func TestCSVAppendRowChangedEventWithCallback(t *testing.T) {
require.NotNil(t, encoder)

count := 0
tableInfo := model.BuildTableInfo("test", "table1", []*model.Column{{
Name: "tiny",
Value: int64(1), Type: mysql.TypeTiny,
}}, nil)
row := &model.RowChangedEvent{
CommitTs: 1,
TableInfo: tableInfo,

Columns: model.Columns2ColumnDatas([]*model.Column{{Name: "tiny", Value: int64(1)}}, tableInfo),
}
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

_ = helper.DDL2Event("create table test.table1(col1 int primary key)")
row := helper.DML2Event("insert into test.table1 values (1)", "test", "table1")
txn := &model.SingleTableTxn{
TableInfo: tableInfo,
TableInfo: row.TableInfo,
Rows: []*model.RowChangedEvent{row},
}
callback := func() {
Expand Down
53 changes: 15 additions & 38 deletions pkg/sink/codec/maxwell/maxwell_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,22 @@ import (
"context"
"testing"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)

func TestMaxwellBatchCodec(t *testing.T) {
t.Parallel()
newEncoder := newBatchEncoder
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

tableInfo := model.BuildTableInfo("a", "b", []*model.Column{{Name: "col1", Type: mysql.TypeLong}}, nil)
rowCases := [][]*model.RowChangedEvent{{{
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{
{
Name: "col1",
Value: 10,
},
}, tableInfo),
}}, {}}
ddlEvent := helper.DDL2Event("create table test.t(col1 int primary key)")
dmlEvent := helper.DML2Event("insert into test.t values (10)", "test", "t")

rowCases := [][]*model.RowChangedEvent{{dmlEvent}, {}}
for _, cs := range rowCases {
encoder := newEncoder(&common.Config{})
encoder := newBatchEncoder(&common.Config{})
for _, row := range cs {
err := encoder.AppendRowChangedEvent(context.Background(), "", row, nil)
require.Nil(t, err)
Expand All @@ -54,19 +46,9 @@ func TestMaxwellBatchCodec(t *testing.T) {
require.Equal(t, len(cs), messages[0].GetRowsCount())
}

ddlCases := [][]*model.DDLEvent{{{
CommitTs: 1,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "a", Table: "b",
},
TableInfo: &timodel.TableInfo{},
},
Query: "create table a",
Type: 1,
}}}
ddlCases := [][]*model.DDLEvent{{ddlEvent}}
for _, cs := range ddlCases {
encoder := newEncoder(&common.Config{})
encoder := newBatchEncoder(&common.Config{})
for _, ddl := range cs {
msg, err := encoder.EncodeDDLEvent(ddl)
require.Nil(t, err)
Expand All @@ -79,17 +61,12 @@ func TestMaxwellAppendRowChangedEventWithCallback(t *testing.T) {
encoder := newBatchEncoder(&common.Config{})
require.NotNil(t, encoder)

count := 0
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

tableInfo := model.BuildTableInfo("a", "b", []*model.Column{{Name: "col1", Type: mysql.TypeVarchar}}, nil)
row := &model.RowChangedEvent{
CommitTs: 1,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas([]*model.Column{{
Name: "col1",
Value: []byte("aa"),
}}, tableInfo),
}
_ = helper.DDL2Event("create table test.t(col1 varchar(255) primary key)")
row := helper.DML2Event("insert into test.t values ('aa')", "test", "t")
count := 0

tests := []struct {
row *model.RowChangedEvent
Expand Down
20 changes: 5 additions & 15 deletions pkg/sink/codec/maxwell/maxwell_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ package maxwell
import (
"testing"

"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/stretchr/testify/require"
)

Expand All @@ -42,20 +41,11 @@ func TestMaxwellFormatCol(t *testing.T) {
}

func TestEncodeBinaryToMaxwell(t *testing.T) {
t.Parallel()

cols := []*model.Column{{
Name: "varbinary",
Type: mysql.TypeVarchar,
Value: []uint8("测试varbinary"),
Flag: model.BinaryFlag,
}}
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

tableInfo := model.BuildTableInfo("a", "b", cols, nil)
e := &model.RowChangedEvent{
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas(cols, tableInfo),
}
_ = helper.DDL2Event("create table test.t(col1 varbinary(255) primary key)")
e := helper.DML2Event("insert into test.t values ('测试varbinary')", "test", "t")

key, msg := rowChangeToMaxwellMsg(e, false)
require.NotNil(t, key)
Expand Down
25 changes: 0 additions & 25 deletions pkg/sink/codec/utils/field_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,8 @@ package utils

import (
"strings"

"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/types"
"github.com/pingcap/tiflow/cdc/model"
)

// SetBinChsClnFlag set the binary charset flag.
func SetBinChsClnFlag(ft *types.FieldType) *types.FieldType {
ft.SetCharset(charset.CharsetBin)
ft.SetCollate(charset.CollationBin)
ft.AddFlag(mysql.BinaryFlag)
return ft
}

// SetUnsigned set the unsigned flag.
func SetUnsigned(ft *types.FieldType) *types.FieldType {
ft.SetFlag(uint(model.UnsignedFlag))
return ft
}

// SetElems set the elems to the ft
func SetElems(ft *types.FieldType, elems []string) *types.FieldType {
ft.SetElems(elems)
return ft
}

// when encoding the canal format, for unsigned mysql type, add `unsigned` keyword.
// it should have the form `t unsigned`, such as `int unsigned`
func withUnsigned4MySQLType(mysqlType string, unsigned bool) string {
Expand Down

0 comments on commit 7dfd11c

Please sign in to comment.