Skip to content

Commit

Permalink
sink(ticdc): add hex encode to csv (#9366) (#9379)
Browse files Browse the repository at this point in the history
close #9373
  • Loading branch information
ti-chi-bot committed Jul 27, 2023
1 parent f2dd298 commit 623962a
Show file tree
Hide file tree
Showing 15 changed files with 298 additions and 65 deletions.
27 changes: 15 additions & 12 deletions cdc/api/v2/model.go
Expand Up @@ -261,10 +261,11 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
var csvConfig *config.CSVConfig
if c.Sink.CSVConfig != nil {
csvConfig = &config.CSVConfig{
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
NullString: c.Sink.CSVConfig.NullString,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
}
}

Expand Down Expand Up @@ -378,10 +379,11 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
var csvConfig *CSVConfig
if cloned.Sink.CSVConfig != nil {
csvConfig = &CSVConfig{
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
NullString: cloned.Sink.CSVConfig.NullString,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
}
}

Expand Down Expand Up @@ -557,10 +559,11 @@ type KafkaConfig struct {
// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
NullString string `json:"null"`
IncludeCommitTs bool `json:"include_commit_ts"`
BinaryEncodingMethod string `json:"binary_encoding_method"`
}

// DispatchRule represents partition rule for a table
Expand Down
7 changes: 4 additions & 3 deletions cdc/api/v2/model_test.go
Expand Up @@ -43,9 +43,10 @@ var defaultAPIConfig = &ReplicaConfig{
},
Sink: &SinkConfig{
CSVConfig: &CSVConfig{
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
Quote: string(config.DoubleQuoteChar),
Delimiter: config.Comma,
NullString: config.NULL,
BinaryEncodingMethod: config.BinaryEncodingBase64,
},
EncoderConcurrency: 16,
Terminator: config.CRLF,
Expand Down
12 changes: 7 additions & 5 deletions cdc/sink/codec/common/config.go
Expand Up @@ -46,11 +46,12 @@ type Config struct {
AvroBigintUnsignedHandlingMode string

// for sinking to cloud storage
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
Delimiter string
Quote string
NullString string
IncludeCommitTs bool
Terminator string
BinaryEncodingMethod string
}

// NewConfig return a Config for codec
Expand Down Expand Up @@ -134,6 +135,7 @@ func (c *Config) Apply(sinkURI *url.URL, config *config.ReplicaConfig) error {
c.Quote = config.Sink.CSVConfig.Quote
c.NullString = config.Sink.CSVConfig.NullString
c.IncludeCommitTs = config.Sink.CSVConfig.IncludeCommitTs
c.BinaryEncodingMethod = config.Sink.CSVConfig.BinaryEncodingMethod
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/csv/csv_decoder.go
Expand Up @@ -104,7 +104,7 @@ func (b *batchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed, errors.New("no csv row can be found"))
}

e, err := csvMsg2RowChangedEvent(b.msg, b.tableInfo.Columns)
e, err := csvMsg2RowChangedEvent(b.codecConfig, b.msg, b.tableInfo.Columns)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
47 changes: 33 additions & 14 deletions cdc/sink/codec/csv/csv_message.go
Expand Up @@ -15,6 +15,7 @@ package csv

import (
"encoding/base64"
"encoding/hex"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -233,7 +234,7 @@ func (c *csvMessage) formatValue(value any, strBuilder *strings.Builder) {
}
}

func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) {
func fromCsvValToColValue(csvConfig *common.Config, csvVal any, ft types.FieldType) (any, error) {
str, ok := csvVal.(string)
if !ok {
return csvVal, nil
Expand All @@ -243,9 +244,18 @@ func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob,
mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
if ft.GetCharset() == charset.CharsetBin {
blob, err := base64.StdEncoding.DecodeString(str)
return blob, err
switch csvConfig.BinaryEncodingMethod {
case config.BinaryEncodingBase64:
return base64.StdEncoding.DecodeString(str)
case config.BinaryEncodingHex:
return hex.DecodeString(str)
default:
return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed,
errors.Errorf("unsupported binary encoding method %s",
csvConfig.BinaryEncodingMethod))
}
}

return []byte(str), nil
case mysql.TypeFloat:
val, err := strconv.ParseFloat(str, 32)
Expand All @@ -269,7 +279,7 @@ func fromCsvValToColValue(csvVal any, ft types.FieldType) (any, error) {
}

// fromColValToCsvVal converts column from TiDB type to csv type.
func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) {
func fromColValToCsvVal(csvConfig *common.Config, col *model.Column, ft *types.FieldType) (any, error) {
if col.Value == nil {
return nil, nil
}
Expand All @@ -279,7 +289,16 @@ func fromColValToCsvVal(col *model.Column, ft *types.FieldType) (any, error) {
mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
if col.Flag.IsBinary() {
if v, ok := col.Value.([]byte); ok {
return base64.StdEncoding.EncodeToString(v), nil
switch csvConfig.BinaryEncodingMethod {
case config.BinaryEncodingBase64:
return base64.StdEncoding.EncodeToString(v), nil
case config.BinaryEncodingHex:
return hex.EncodeToString(v), nil
default:
return nil, cerror.WrapError(cerror.ErrCSVEncodeFailed,
errors.Errorf("unsupported binary encoding method %s",
csvConfig.BinaryEncodingMethod))
}
}
return col.Value, nil
}
Expand Down Expand Up @@ -323,7 +342,7 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent)
}
if e.IsDelete() {
csvMsg.opType = operationDelete
csvMsg.columns, err = rowChangeColumns2CSVColumns(e.PreColumns, e.ColInfos)
csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.PreColumns, e.ColInfos)
if err != nil {
return nil, err
}
Expand All @@ -334,15 +353,15 @@ func rowChangedEvent2CSVMsg(csvConfig *common.Config, e *model.RowChangedEvent)
csvMsg.opType = operationUpdate
}
// for insert and update operation, we only record the after columns.
csvMsg.columns, err = rowChangeColumns2CSVColumns(e.Columns, e.ColInfos)
csvMsg.columns, err = rowChangeColumns2CSVColumns(csvConfig, e.Columns, e.ColInfos)
if err != nil {
return nil, err
}
}
return csvMsg, nil
}

func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) {
func csvMsg2RowChangedEvent(csvConfig *common.Config, csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*model.RowChangedEvent, error) {
var err error
if len(csvMsg.columns) != len(ticols) {
return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed,
Expand All @@ -357,9 +376,9 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*
Table: csvMsg.tableName,
}
if csvMsg.opType == operationDelete {
e.PreColumns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
e.PreColumns, err = csvColumns2RowChangeColumns(csvConfig, csvMsg.columns, ticols)
} else {
e.Columns, err = csvColumns2RowChangeColumns(csvMsg.columns, ticols)
e.Columns, err = csvColumns2RowChangeColumns(csvConfig, csvMsg.columns, ticols)
}

if err != nil {
Expand All @@ -369,7 +388,7 @@ func csvMsg2RowChangedEvent(csvMsg *csvMessage, ticols []*timodel.ColumnInfo) (*
return e, nil
}

func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) {
func rowChangeColumns2CSVColumns(csvConfig *common.Config, cols []*model.Column, colInfos []rowcodec.ColInfo) ([]any, error) {
var csvColumns []any
for i, column := range cols {
// column could be nil in a condition described in
Expand All @@ -378,7 +397,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn
continue
}

converted, err := fromColValToCsvVal(column, colInfos[i].Ft)
converted, err := fromColValToCsvVal(csvConfig, column, colInfos[i].Ft)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -388,7 +407,7 @@ func rowChangeColumns2CSVColumns(cols []*model.Column, colInfos []rowcodec.ColIn
return csvColumns, nil
}

func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) {
func csvColumns2RowChangeColumns(csvConfig *common.Config, csvCols []any, ticols []*timodel.ColumnInfo) ([]*model.Column, error) {
cols := make([]*model.Column, 0, len(csvCols))
for idx, csvCol := range csvCols {
col := new(model.Column)
Expand All @@ -402,7 +421,7 @@ func csvColumns2RowChangeColumns(csvCols []any, ticols []*timodel.ColumnInfo) ([
col.Flag.SetIsPrimaryKey()
}

val, err := fromCsvValToColValue(csvCol, ticol.FieldType)
val, err := fromCsvValToColValue(csvConfig, csvCol, ticol.FieldType)
if err != nil {
return cols, err
}
Expand Down

0 comments on commit 623962a

Please sign in to comment.