/
processor.go
224 lines (187 loc) · 8.39 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package component // import "go.opentelemetry.io/collector/component"
import (
"context"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)
// Processor defines the common functions that must be implemented by TracesProcessor
// and MetricsProcessor.
type Processor interface {
Component
}
// TracesProcessor is a processor that can consume traces.
type TracesProcessor interface {
Processor
consumer.Traces
}
// MetricsProcessor is a processor that can consume metrics.
type MetricsProcessor interface {
Processor
consumer.Metrics
}
// LogsProcessor is a processor that can consume logs.
type LogsProcessor interface {
Processor
consumer.Logs
}
// ProcessorCreateSettings is passed to Create* functions in ProcessorFactory.
type ProcessorCreateSettings struct {
TelemetrySettings
// BuildInfo can be used by components for informational purposes
BuildInfo BuildInfo
}
// ProcessorFactory is Factory interface for processors.
//
// This interface cannot be directly implemented. Implementations must
// use the NewProcessorFactory to implement it.
type ProcessorFactory interface {
Factory
// CreateDefaultConfig creates the default configuration for the Processor.
// This method can be called multiple times depending on the pipeline
// configuration and should not cause side-effects that prevent the creation
// of multiple instances of the Processor.
// The object returned by this method needs to pass the checks implemented by
// 'configtest.CheckConfigStruct'. It is recommended to have these checks in the
// tests of any implementation of the Factory interface.
CreateDefaultConfig() config.Processor
// CreateTracesProcessor creates a TracesProcessor based on this config.
// If the processor type does not support tracing or if the config is not valid,
// an error will be returned instead.
CreateTracesProcessor(ctx context.Context, set ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Traces) (TracesProcessor, error)
// TracesProcessorStability gets the stability level of the TracesProcessor.
TracesProcessorStability() StabilityLevel
// CreateMetricsProcessor creates a MetricsProcessor based on this config.
// If the processor type does not support metrics or if the config is not valid,
// an error will be returned instead.
CreateMetricsProcessor(ctx context.Context, set ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Metrics) (MetricsProcessor, error)
// MetricsProcessorStability gets the stability level of the MetricsProcessor.
MetricsProcessorStability() StabilityLevel
// CreateLogsProcessor creates a LogsProcessor based on the config.
// If the processor type does not support logs or if the config is not valid,
// an error will be returned instead.
CreateLogsProcessor(ctx context.Context, set ProcessorCreateSettings, cfg config.Processor, nextConsumer consumer.Logs) (LogsProcessor, error)
// LogsProcessorStability gets the stability level of the LogsProcessor.
LogsProcessorStability() StabilityLevel
}
// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig().
type ProcessorCreateDefaultConfigFunc func() config.Processor
// CreateDefaultConfig implements ProcessorFactory.CreateDefaultConfig().
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}
// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption interface {
// applyProcessorFactoryOption applies the option.
applyProcessorFactoryOption(o *processorFactory)
}
var _ ProcessorFactoryOption = (*processorFactoryOptionFunc)(nil)
// processorFactoryOptionFunc is an ProcessorFactoryOption created through a function.
type processorFactoryOptionFunc func(*processorFactory)
func (f processorFactoryOptionFunc) applyProcessorFactoryOption(o *processorFactory) {
f(o)
}
// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor().
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)
// CreateTracesProcessor implements ProcessorFactory.CreateTracesProcessor().
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor().
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)
// CreateMetricsProcessor implements ProcessorFactory.CreateMetricsProcessor().
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor().
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)
// CreateLogsProcessor implements ProcessorFactory.CreateLogsProcessor().
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
type processorFactory struct {
baseFactory
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
tracesStabilityLevel StabilityLevel
CreateMetricsProcessorFunc
metricsStabilityLevel StabilityLevel
CreateLogsProcessorFunc
logsStabilityLevel StabilityLevel
}
func (p processorFactory) TracesProcessorStability() StabilityLevel {
return p.tracesStabilityLevel
}
func (p processorFactory) MetricsProcessorStability() StabilityLevel {
return p.metricsStabilityLevel
}
func (p processorFactory) LogsProcessorStability() StabilityLevel {
return p.logsStabilityLevel
}
// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor and the default "undefined" stability level.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.tracesStabilityLevel = sl
o.CreateTracesProcessorFunc = createTracesProcessor
})
}
// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor and the default "undefined" stability level.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.metricsStabilityLevel = sl
o.CreateMetricsProcessorFunc = createMetricsProcessor
})
}
// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor and the default "undefined" stability level.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.logsStabilityLevel = sl
o.CreateLogsProcessorFunc = createLogsProcessor
})
}
// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
baseFactory: baseFactory{cfgType: cfgType},
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt.applyProcessorFactoryOption(f)
}
return f
}