/
processor.go
145 lines (137 loc) · 4.2 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package sql
import (
"fmt"
"github.com/jitsucom/bulker/bulkerlib/implementations"
"github.com/jitsucom/bulker/bulkerlib/types"
"github.com/jitsucom/bulker/jitsubase/utils"
"strings"
)
const SqlTypePrefix = "__sql_type"
// ProcessEvents processes events objects without applying mapping rules
// returns table headerm array of processed objects
// or error if at least 1 was occurred
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool, stringifyObjects bool) (*TypesHeader, types.Object, error) {
sqlTypesHints, err := extractSQLTypesHints(event)
if err != nil {
return nil, nil, err
}
for k, v := range customTypes {
sqlTypesHints[k] = v
}
flatObject, err := implementations.NewFlattener(omitNils, stringifyObjects).FlattenObject(event, sqlTypesHints)
if err != nil {
return nil, nil, err
}
fields, err := DefaultTypeResolver.Resolve(flatObject, sqlTypesHints)
if err != nil {
return nil, nil, err
}
bh := &TypesHeader{TableName: tableName, Fields: fields}
return bh, flatObject, nil
}
func extractSQLTypesHints(object map[string]any) (types.SQLTypes, error) {
result := types.SQLTypes{}
err := _extractSQLTypesHints("", object, result)
return result, err
}
func _extractSQLTypesHints(key string, object map[string]any, result types.SQLTypes) error {
for k, v := range object {
//if column has __sql_type_ prefix
if strings.HasPrefix(k, SqlTypePrefix) {
delete(object, k)
columnName := strings.TrimPrefix(k[len(SqlTypePrefix):], "_")
//when columnName is empty it means that provided sql type is meant for the whole object
//e.g. to map nested object to sql JSON type you can add the following property to nested object: "__sql_type_": "JSON" )
mappedColumnName := utils.JoinNonEmptyStrings("_", key, columnName)
switch val := v.(type) {
case []any:
if len(val) > 1 {
result[mappedColumnName] = types.SQLColumn{Type: fmt.Sprint(val[0]), DdlType: fmt.Sprint(val[1]), Override: true}
} else {
result[mappedColumnName] = types.SQLColumn{Type: fmt.Sprint(val[0]), Override: true}
}
case string:
result[mappedColumnName] = types.SQLColumn{Type: val, Override: true}
default:
return fmt.Errorf("incorrect type of value for '__sql_type_' hint: %T", v)
}
} else if val, ok := v.(map[string]any); ok {
err := _extractSQLTypesHints(utils.JoinNonEmptyStrings("_", key, k), val, result)
if err != nil {
return err
}
}
}
return nil
}
//
//// foldLongFields replace all column names with truncated values if they exceed the limit
//// uses cutName under the hood
//func foldLongFields(header *TypesHeader, object types.Object) (*TypesHeader, types.Object, error) {
// //TODO: Get maxColumnNameLen from storage config
// maxColumnNameLen := 0
//
// if maxColumnNameLen <= 0 {
// return header, object, nil
// }
//
// changes := map[string]string{}
// for name := range header.Fields {
// if len(name) > maxColumnNameLen {
// newName := cutName(name, maxColumnNameLen)
// if name != newName {
// changes[name] = newName
// }
// }
// }
//
// for oldName, newName := range changes {
// field, _ := header.Fields[oldName]
// delete(header.Fields, oldName)
// header.Fields[newName] = field
//
// if value, ok := object[oldName]; ok {
// delete(object, oldName)
// object[newName] = value
// }
// }
//
// return header, object, nil
//}
//
//// cutName converts input name that exceeds maxLen to lower length string by cutting parts between '_' to 2 symbols.
//// if name len is still greater than returns maxLen symbols from the end of the name
//func cutName(name string, maxLen int) string {
// if len(name) <= maxLen {
// return name
// }
//
// //just cut from the beginning
// if !strings.Contains(name, "_") {
// return name[len(name)-maxLen:]
// }
//
// var replaced bool
// replace := ""
// for _, part := range strings.Split(name, "_") {
// if replace != "" {
// replace += "_"
// }
//
// if len(part) > 2 {
// newPart := part[:2]
// name = strings.ReplaceAll(name, replace+part, replace+newPart)
// replaced = true
// break
// } else {
// replace += part
// }
// }
//
// if !replaced {
// //case when ab_ac_ad and maxLen = 6
// return name[len(name)-maxLen:]
// }
//
// return cutName(name, maxLen)
//}