/
filter_convert_dml_column.go
95 lines (83 loc) · 2.68 KB
/
filter_convert_dml_column.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package filter
import (
"encoding/json"
"fmt"
"github.com/liuxinwang/go-mysql-starrocks/pkg/msg"
"github.com/liuxinwang/go-mysql-starrocks/pkg/utils"
"github.com/pingcap/errors"
"github.com/siddontang/go-log/log"
)
const ConvertDmlColumnFilterName = "convert-dml-column"
type ConvertDmlColumnFilter struct {
name string
matchSchema string
matchTable string
columns []string
castAs []string
}
func (cdcf *ConvertDmlColumnFilter) NewFilter(config map[string]interface{}) error {
columns, ok := config["columns"]
if !ok {
return errors.Trace(errors.New("'columns' is not configured"))
}
castAs, ok := config["cast-as"]
if !ok {
return errors.Trace(errors.New("'cast-as' is not configured"))
}
c, ok := utils.CastToSlice(columns)
if !ok {
return errors.Trace(errors.New("'columns' should be an array"))
}
columnsString, err := utils.CastSliceInterfaceToSliceString(c)
if err != nil {
return errors.Trace(errors.New("'columns' should be an array of string"))
}
ca, ok := utils.CastToSlice(castAs)
if !ok {
return errors.Trace(errors.New("'cast-as' should be an array"))
}
castAsString, err := utils.CastSliceInterfaceToSliceString(ca)
if err != nil {
return errors.Trace(errors.New("'cast-as' should be an array of string"))
}
if len(c) != len(ca) {
return errors.Trace(errors.New("'columns' should have the same length of 'cast-as'"))
}
cdcf.name = ConvertDmlColumnFilterName
cdcf.matchSchema = fmt.Sprintf("%v", config["match-schema"])
cdcf.matchTable = fmt.Sprintf("%v", config["match-table"])
cdcf.columns = columnsString
cdcf.castAs = castAsString
return nil
}
func (cdcf *ConvertDmlColumnFilter) Filter(msg *msg.Msg) bool {
if cdcf.matchSchema == msg.Database && cdcf.matchTable == msg.Table {
for i, column := range cdcf.columns {
value := FindColumn(msg.DmlMsg.Data, column)
if value != nil {
if value == "" {
continue
}
switch cdcf.castAs[i] {
case "json":
var columnJson map[string]interface{}
err := json.Unmarshal([]byte(fmt.Sprintf("%v", value)), &columnJson)
if err != nil {
log.Warnf("%s filter error: %v, column '%s' value: '%v' cast as json error, row event: %v",
cdcf.name, err.Error(), column, value, msg.DmlMsg.Data)
}
msg.DmlMsg.Data[column] = columnJson
case "arrayJson":
var columnArrayJson []map[string]interface{}
err := json.Unmarshal([]byte(fmt.Sprintf("%v", value)), &columnArrayJson)
if err != nil {
log.Warnf("%s filter error: %v, column '%s' value: '%v' cast as json error, row event: %v",
cdcf.name, err.Error(), column, value, msg.DmlMsg.Data)
}
msg.DmlMsg.Data[column] = columnArrayJson
}
}
}
}
return false
}