Skip to content

Commit

Permalink
cdc/codec: fix string for unsigned value in canal-json. (#4629)
Browse files Browse the repository at this point in the history
close #4635
  • Loading branch information
3AceShowHand committed Feb 22, 2022
1 parent d4dbf71 commit e4ef2b2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
39 changes: 26 additions & 13 deletions cdc/sink/codec/canal.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math"
"reflect"
"strconv"
"strings"

Expand Down Expand Up @@ -133,38 +134,47 @@ func (b *canalEntryBuilder) buildHeader(commitTs uint64, schema string, table st
return h
}

func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) {
func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType, err error) {
javaType := mySQLType2JavaType(c.Type, c.Flag.IsBinary())

switch javaType {
case JavaSQLTypeBINARY, JavaSQLTypeVARBINARY, JavaSQLTypeLONGVARBINARY:
if strings.Contains(mysqlType, "text") {
return JavaSQLTypeCLOB
return JavaSQLTypeCLOB, nil
}
return JavaSQLTypeBLOB
return JavaSQLTypeBLOB, nil
}

// flag `isUnsigned` only for `numerical` and `bit`, `year` data type.
if !c.Flag.IsUnsigned() {
return javaType
return javaType, nil
}

// for year, to `int64`, others to `uint64`.
// no need to promote type for `year` and `bit`
if c.Type == mysql.TypeYear || c.Type == mysql.TypeBit {
return javaType
return javaType, nil
}

if c.Type == mysql.TypeFloat || c.Type == mysql.TypeDouble || c.Type == mysql.TypeNewDecimal {
return javaType
return javaType, nil
}

// for **unsigned** integral types, should have type in `uint64`. see reference:
// https://github.com/pingcap/ticdc/blob/f0a38a7aaf9f3b11a4d807da275b567642733f58/cdc/entry/mounter.go#L493
// for **unsigned** integral types, type would be `uint64` or `string`. see reference:
// https://github.com/pingcap/tiflow/blob/1e3dd155049417e3fd7bf9b0a0c7b08723b33791/cdc/entry/mounter.go#L501
// https://github.com/pingcap/tidb/blob/6495a5a116a016a3e077d181b8c8ad81f76ac31b/types/datum.go#L423-L455
number, ok := c.Value.(uint64)
if !ok {
log.Panic("unsigned value not in type uint64", zap.Any("column", c))
var number uint64
switch v := c.Value.(type) {
case uint64:
number = v
case string:
a, err := strconv.ParseUint(v, 10, 64)
if err != nil {
return javaType, err
}
number = a
default:
return javaType, errors.Errorf("unexpected type for unsigned value: %+v, column: %+v", reflect.TypeOf(v), c)
}

// Some special cases handled in canal
Expand Down Expand Up @@ -193,7 +203,7 @@ func getJavaSQLType(c *model.Column, mysqlType string) (result JavaSQLType) {
}
}

return javaType
return javaType, nil
}

// In the official canal-json implementation, value were extracted from binlog buffer.
Expand Down Expand Up @@ -277,7 +287,10 @@ func getMySQLType(c *model.Column) string {
// see https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java#L756-L872
func (b *canalEntryBuilder) buildColumn(c *model.Column, colName string, updated bool) (*canal.Column, error) {
mysqlType := getMySQLType(c)
javaType := getJavaSQLType(c, mysqlType)
javaType, err := getJavaSQLType(c, mysqlType)
if err != nil {
return nil, cerror.WrapError(cerror.ErrCanalEncodeFailed, err)
}

value, err := b.formatValue(c.Value, javaType)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal_test.go
Expand Up @@ -411,6 +411,7 @@ var testColumnsTable = []*testColumnTuple{
{&model.Column{Name: "bigint", Type: mysql.TypeLonglong, Value: int64(9223372036854775807)}, "bigint", JavaSQLTypeBIGINT, "9223372036854775807"},
{&model.Column{Name: "bigint unsigned", Type: mysql.TypeLonglong, Value: uint64(9223372036854775807), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "9223372036854775807"},
{&model.Column{Name: "bigint unsigned 2", Type: mysql.TypeLonglong, Value: uint64(9223372036854775808), Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeDECIMAL, "9223372036854775808"},
{&model.Column{Name: "bigint unsigned 3", Type: mysql.TypeLonglong, Value: "0", Flag: model.UnsignedFlag}, "bigint unsigned", JavaSQLTypeBIGINT, "0"},

{&model.Column{Name: "float", Type: mysql.TypeFloat, Value: 3.14}, "float", JavaSQLTypeREAL, "3.14"},
{&model.Column{Name: "double", Type: mysql.TypeDouble, Value: 2.71}, "double", JavaSQLTypeDOUBLE, "2.71"},
Expand Down Expand Up @@ -454,7 +455,8 @@ func (s *canalEntrySuite) TestGetMySQLTypeAndJavaSQLType(c *check.C) {
obtainedMySQLType := getMySQLType(item.column)
c.Assert(obtainedMySQLType, check.Equals, item.expectedMySQLType)

obtainedJavaSQLType := getJavaSQLType(item.column, obtainedMySQLType)
obtainedJavaSQLType, err := getJavaSQLType(item.column, obtainedMySQLType)
c.Assert(err, check.IsNil)
c.Assert(obtainedJavaSQLType, check.Equals, item.expectedJavaSQLType)
}
}

0 comments on commit e4ef2b2

Please sign in to comment.