/
attraction.go
283 lines (248 loc) · 10.3 KB
/
attraction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package processorhelper
import (
"fmt"
"regexp"
"strings"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/processor/filterhelper"
)
// Settings
type Settings struct {
// Actions specifies the list of attributes to act on.
// The set of actions are {INSERT, UPDATE, UPSERT, DELETE, HASH, EXTRACT}.
// This is a required field.
Actions []ActionKeyValue `mapstructure:"actions"`
}
// ActionKeyValue specifies the attribute key to act upon.
type ActionKeyValue struct {
// Key specifies the attribute to act upon.
// This is a required field.
Key string `mapstructure:"key"`
// Value specifies the value to populate for the key.
// The type of the value is inferred from the configuration.
Value interface{} `mapstructure:"value"`
// A regex pattern must be specified for the action EXTRACT.
// It uses the attribute specified by `key' to extract values from
// The target keys are inferred based on the names of the matcher groups
// provided and the names will be inferred based on the values of the
// matcher group.
// Note: All subexpressions must have a name.
// Note: The value type of the source key must be a string. If it isn't,
// no extraction will occur.
RegexPattern string `mapstructure:"pattern"`
// FromAttribute specifies the attribute to use to populate
// the value. If the attribute doesn't exist, no action is performed.
FromAttribute string `mapstructure:"from_attribute"`
// Action specifies the type of action to perform.
// The set of values are {INSERT, UPDATE, UPSERT, DELETE, HASH}.
// Both lower case and upper case are supported.
// INSERT - Inserts the key/value to attributes when the key does not exist.
// No action is applied to attributes where the key already exists.
// Either Value or FromAttribute must be set.
// UPDATE - Updates an existing key with a value. No action is applied
// to attributes where the key does not exist.
// Either Value or FromAttribute must be set.
// UPSERT - Performs insert or update action depending on the attributes
// containing the key. The key/value is insert to attributes
// that did not originally have the key. The key/value is updated
// for attributes where the key already existed.
// Either Value or FromAttribute must be set.
// DELETE - Deletes the attribute. If the key doesn't exist,
// no action is performed.
// HASH - Calculates the SHA-1 hash of an existing value and overwrites the
// value with it's SHA-1 hash result.
// EXTRACT - Extracts values using a regular expression rule from the input
// 'key' to target keys specified in the 'rule'. If a target key
// already exists, it will be overridden.
// This is a required field.
Action Action `mapstructure:"action"`
}
// Action is the enum to capture the four types of actions to perform on an
// attribute.
type Action string
const (
// INSERT adds the key/value to attributes when the key does not exist.
// No action is applied to attributes where the key already exists.
INSERT Action = "insert"
// UPDATE updates an existing key with a value. No action is applied
// to attributes where the key does not exist.
UPDATE Action = "update"
// UPSERT performs the INSERT or UPDATE action. The key/value is
// insert to attributes that did not originally have the key. The key/value is
// updated for attributes where the key already existed.
UPSERT Action = "upsert"
// DELETE deletes the attribute. If the key doesn't exist, no action is performed.
DELETE Action = "delete"
// HASH calculates the SHA-1 hash of an existing value and overwrites the
// value with it's SHA-1 hash result.
HASH Action = "hash"
// EXTRACT extracts values using a regular expression rule from the input
// 'key' to target keys specified in the 'rule'. If a target key already
// exists, it will be overridden.
EXTRACT Action = "extract"
)
type attributeAction struct {
Key string
FromAttribute string
// Compiled regex if provided
Regex *regexp.Regexp
// Attribute names extracted from the regexp's subexpressions.
AttrNames []string
// Number of non empty strings in above array
// TODO https://go.opentelemetry.io/collector/issues/296
// Do benchmark testing between having action be of type string vs integer.
// The reason is attributes processor will most likely be commonly used
// and could impact performance.
Action Action
AttributeValue *pdata.AttributeValue
}
type AttrProc struct {
actions []attributeAction
}
// NewAttrProc validates that the input configuration has all of the required fields for the processor
// and returns a AttrProc to be used to process attributes.
// An error is returned if there are any invalid inputs.
func NewAttrProc(settings *Settings) (*AttrProc, error) {
var attributeActions []attributeAction
for i, a := range settings.Actions {
// `key` is a required field
if a.Key == "" {
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"key\" at the %d-th actions", i)
}
// Convert `action` to lowercase for comparison.
a.Action = Action(strings.ToLower(string(a.Action)))
action := attributeAction{
Key: a.Key,
Action: a.Action,
}
switch a.Action {
case INSERT, UPDATE, UPSERT:
if a.Value == nil && a.FromAttribute == "" {
return nil, fmt.Errorf("error creating AttrProc. Either field \"value\" or \"from_attribute\" setting must be specified for %d-th action", i)
}
if a.Value != nil && a.FromAttribute != "" {
return nil, fmt.Errorf("error creating AttrProc due to both fields \"value\" and \"from_attribute\" being set at the %d-th actions", i)
}
if a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use the \"pattern\" field. This must not be specified for %d-th action", a.Action, i)
}
// Convert the raw value from the configuration to the internal trace representation of the value.
if a.Value != nil {
val, err := filterhelper.NewAttributeValueRaw(a.Value)
if err != nil {
return nil, err
}
action.AttributeValue = &val
} else {
action.FromAttribute = a.FromAttribute
}
case HASH, DELETE:
if a.Value != nil || a.FromAttribute != "" || a.RegexPattern != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use \"value\", \"pattern\" or \"from_attribute\" field. These must not be specified for %d-th action", a.Action, i)
}
case EXTRACT:
if a.Value != nil || a.FromAttribute != "" {
return nil, fmt.Errorf("error creating AttrProc. Action \"%s\" does not use \"value\" or \"from_attribute\" field. These must not be specified for %d-th action", a.Action, i)
}
if a.RegexPattern == "" {
return nil, fmt.Errorf("error creating AttrProc due to missing required field \"pattern\" for action \"%s\" at the %d-th action", a.Action, i)
}
re, err := regexp.Compile(a.RegexPattern)
if err != nil {
return nil, fmt.Errorf("error creating AttrProc. Field \"pattern\" has invalid pattern: \"%s\" to be set at the %d-th actions", a.RegexPattern, i)
}
attrNames := re.SubexpNames()
if len(attrNames) <= 1 {
return nil, fmt.Errorf("error creating AttrProc. Field \"pattern\" contains no named matcher groups at the %d-th actions", i)
}
for subExpIndex := 1; subExpIndex < len(attrNames); subExpIndex++ {
if attrNames[subExpIndex] == "" {
return nil, fmt.Errorf("error creating AttrProc. Field \"pattern\" contains at least one unnamed matcher group at the %d-th actions", i)
}
}
action.Regex = re
action.AttrNames = attrNames
default:
return nil, fmt.Errorf("error creating AttrProc due to unsupported action %q at the %d-th actions", a.Action, i)
}
attributeActions = append(attributeActions, action)
}
return &AttrProc{actions: attributeActions}, nil
}
func (ap *AttrProc) Process(attrs pdata.AttributeMap) {
for _, action := range ap.actions {
// TODO https://go.opentelemetry.io/collector/issues/296
// Do benchmark testing between having action be of type string vs integer.
// The reason is attributes processor will most likely be commonly used
// and could impact performance.
switch action.Action {
case DELETE:
attrs.Delete(action.Key)
case INSERT:
av, found := getSourceAttributeValue(action, attrs)
if !found {
continue
}
attrs.Insert(action.Key, av)
case UPDATE:
av, found := getSourceAttributeValue(action, attrs)
if !found {
continue
}
attrs.Update(action.Key, av)
case UPSERT:
av, found := getSourceAttributeValue(action, attrs)
if !found {
continue
}
attrs.Upsert(action.Key, av)
case HASH:
hashAttribute(action, attrs)
case EXTRACT:
extractAttributes(action, attrs)
}
}
}
func getSourceAttributeValue(action attributeAction, attrs pdata.AttributeMap) (pdata.AttributeValue, bool) {
// Set the key with a value from the configuration.
if action.AttributeValue != nil {
return *action.AttributeValue, true
}
return attrs.Get(action.FromAttribute)
}
func hashAttribute(action attributeAction, attrs pdata.AttributeMap) {
if value, exists := attrs.Get(action.Key); exists {
sha1Hasher(value)
}
}
func extractAttributes(action attributeAction, attrs pdata.AttributeMap) {
value, found := attrs.Get(action.Key)
// Extracting values only functions on strings.
if !found || value.Type() != pdata.AttributeValueSTRING {
return
}
// Note: The number of matches will always be equal to number of
// subexpressions.
matches := action.Regex.FindStringSubmatch(value.StringVal())
if matches == nil {
return
}
// Start from index 1, which is the first submatch (index 0 is the entire
// match).
for i := 1; i < len(matches); i++ {
attrs.UpsertString(action.AttrNames[i], matches[i])
}
}