/
processor.go
109 lines (84 loc) · 3.51 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package sumologicprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/sumologicprocessor"
import (
"context"
"fmt"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type sumologicSubprocessor interface {
processLogs(plog.Logs) error
processMetrics(pmetric.Metrics) error
processTraces(ptrace.Traces) error
isEnabled() bool
ConfigPropertyName() string
}
type sumologicProcessor struct {
logger *zap.Logger
subprocessors []sumologicSubprocessor
}
func newsumologicProcessor(set processor.CreateSettings, config *Config) *sumologicProcessor {
cloudNamespaceProcessor := newCloudNamespaceProcessor(config.AddCloudNamespace)
translateAttributesProcessor := newTranslateAttributesProcessor(config.TranslateAttributes)
translateTelegrafMetricsProcessor := newTranslateTelegrafMetricsProcessor(config.TranslateTelegrafAttributes)
nestingProcessor := newNestingProcessor(config.NestAttributes)
aggregateAttributesProcessor := newAggregateAttributesProcessor(config.AggregateAttributes)
logFieldsConversionProcessor := newLogFieldConversionProcessor(config.LogFieldsAttributes)
translateDockerMetricsProcessor := newTranslateDockerMetricsProcessor(config.TranslateDockerMetrics)
processors := []sumologicSubprocessor{
cloudNamespaceProcessor,
translateAttributesProcessor,
translateTelegrafMetricsProcessor,
nestingProcessor,
aggregateAttributesProcessor,
logFieldsConversionProcessor,
translateDockerMetricsProcessor,
}
processor := &sumologicProcessor{
logger: set.Logger,
subprocessors: processors,
}
return processor
}
func (processor *sumologicProcessor) start(_ context.Context, _ component.Host) error {
enabledSubprocessors := []zapcore.Field{}
for _, proc := range processor.subprocessors {
enabledSubprocessors = append(enabledSubprocessors, zap.Bool(proc.ConfigPropertyName(), proc.isEnabled()))
}
processor.logger.Info("Sumo Logic Processor has started.", enabledSubprocessors...)
return nil
}
func (processor *sumologicProcessor) shutdown(_ context.Context) error {
processor.logger.Info("Sumo Logic Processor has shut down.")
return nil
}
func (processor *sumologicProcessor) processLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
for _, subprocessor := range processor.subprocessors {
if err := subprocessor.processLogs(logs); err != nil {
return logs, fmt.Errorf("failed to process logs for property %s: %w", subprocessor.ConfigPropertyName(), err)
}
}
return logs, nil
}
func (processor *sumologicProcessor) processMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) {
for _, subprocessor := range processor.subprocessors {
if err := subprocessor.processMetrics(metrics); err != nil {
return metrics, fmt.Errorf("failed to process metrics for property %s: %w", subprocessor.ConfigPropertyName(), err)
}
}
return metrics, nil
}
func (processor *sumologicProcessor) processTraces(_ context.Context, traces ptrace.Traces) (ptrace.Traces, error) {
for _, subprocessor := range processor.subprocessors {
if err := subprocessor.processTraces(traces); err != nil {
return traces, fmt.Errorf("failed to process traces for property %s: %w", subprocessor.ConfigPropertyName(), err)
}
}
return traces, nil
}