-
Notifications
You must be signed in to change notification settings - Fork 0
/
delete.go
165 lines (142 loc) · 3.95 KB
/
delete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package delete
import (
"encoding/json"
"fmt"
"strconv"
"github.com/zeiss/typhoon/pkg/flow/adapter/transformation/common/storage"
"github.com/zeiss/typhoon/pkg/flow/adapter/transformation/transformer"
)
var _ transformer.Transformer = (*Delete)(nil)
// Delete object implements Transformer interface.
type Delete struct {
Path string
Value string
Type string
Separator string
variables *storage.Storage
}
// InitStep is used to figure out if this operation should
// run before main Transformations. For example, Store
// operation needs to run first to load all Pipeline variables.
var InitStep bool = false
// operationName is used to identify this transformation.
var operationName string = "delete"
// Register adds this transformation to the map which will
// be used to create Transformation pipeline.
func Register(m map[string]transformer.Transformer) {
m[operationName] = &Delete{}
}
// SetStorage sets a shared Storage with Pipeline variables.
func (d *Delete) SetStorage(storage *storage.Storage) {
d.variables = storage
}
// InitStep returns "true" if this Transformation should run
// as init step.
func (d *Delete) InitStep() bool {
return InitStep
}
// New returns a new instance of Delete object.
func (d *Delete) New(key, value, separator string) transformer.Transformer {
return &Delete{
Path: key,
Value: value,
Separator: separator,
variables: d.variables,
}
}
// Apply is a main method of Transformation that removed any type of
// variables from existing JSON.
func (d *Delete) Apply(eventID string, data []byte) ([]byte, error) {
d.Value = d.retrieveString(eventID, d.Value)
result, err := d.parse(data, "", "")
if err != nil {
return data, err
}
output, err := json.Marshal(result)
if err != nil {
return data, err
}
return output, nil
}
func (d *Delete) retrieveString(eventID, key string) string {
if value := d.variables.Get(eventID, key); value != nil {
if str, ok := value.(string); ok {
return str
}
}
return key
}
// nolint:gocyclo
func (d *Delete) parse(data interface{}, key, path string) (interface{}, error) {
output := make(map[string]interface{})
// TODO: keep only one filter call
if d.filter(path, data) {
return nil, nil
}
switch value := data.(type) {
case []byte:
var m interface{}
if err := json.Unmarshal(value, &m); err != nil {
return nil, fmt.Errorf("unmarshal err: %w", err)
}
o, err := d.parse(m, key, path)
if err != nil {
return nil, fmt.Errorf("recursive call in []bytes case: %w", err)
}
return o, nil
case float64, bool, string, nil:
return value, nil
case []interface{}:
slice := []interface{}{}
for i, v := range value {
o, err := d.parse(v, key, fmt.Sprintf("%s[%d]", path, i))
if err != nil {
return nil, fmt.Errorf("recursive call in []interface case: %w", err)
}
slice = append(slice, o)
}
return slice, nil
case map[string]interface{}:
for k, v := range value {
subPath := fmt.Sprintf("%s.%s", path, k)
if d.filter(subPath, v) {
continue
}
o, err := d.parse(v, k, subPath)
if err != nil {
return nil, fmt.Errorf("recursive call in map[]interface case: %w", err)
}
output[k] = o
}
}
return output, nil
}
func (d *Delete) filter(path string, value interface{}) bool {
switch {
case d.Path != "" && d.Value != "":
return d.filterPathAndValue(path, value)
case d.Path != "":
return d.filterPath(path)
case d.Value != "":
return d.filterValue(value)
}
// consider empty key and path as "delete any"
return true
}
func (d *Delete) filterPath(path string) bool {
return d.Separator+d.Path == path
}
func (d *Delete) filterValue(value interface{}) bool {
switch v := value.(type) {
case string:
return v == d.Value
case float64:
return d.Value == strconv.FormatFloat(v, 'f', -1, 64)
case bool:
return d.Value == fmt.Sprintf("%t", v)
}
return false
}
func (d *Delete) filterPathAndValue(path string, value interface{}) bool {
return d.filterPath(path) && d.filterValue(value)
}