/
processor.go
171 lines (155 loc) · 5.75 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package datadogprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/datadogprocessor"
import (
"context"
"fmt"
pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes"
"github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/datadog"
)
type datadogProcessor struct {
logger *zap.Logger
nextConsumer consumer.Traces
cfg *Config
started bool
// metricsExporter specifies the metrics exporter used to exporter APM Stats
// as metrics through
metricsExporter exporter.Metrics
// agent specifies the agent used to ingest traces and output APM Stats.
// It is implemented by the traceagent structure; replaced in tests.
agent datadog.Ingester
// translator specifies the translator used to transform APM Stats Payloads
// from the agent to OTLP Metrics.
translator *metrics.Translator
// in specifies the channel through which the agent will output Stats Payloads
// resulting from ingested traces.
in chan *pb.StatsPayload
// exit specifies the exit channel, which will be closed upon shutdown.
exit chan struct{}
}
func newProcessor(ctx context.Context, set component.TelemetrySettings, config component.Config, nextConsumer consumer.Traces) (*datadogProcessor, error) {
cfg := config.(*Config)
in := make(chan *pb.StatsPayload, 100)
set.MeterProvider = noop.NewMeterProvider() // disable metrics for the processor
attributesTranslator, err := attributes.NewTranslator(set)
if err != nil {
return nil, err
}
trans, err := metrics.NewTranslator(set, attributesTranslator)
if err != nil {
return nil, err
}
set.Logger.Warn(
"The datadogprocessor has been deprecated in favor of the datadogconnector",
zap.String(
"documentation",
"https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/datadogprocessor/README.md#deprecated-in-favor-of-datadogconnector",
),
)
return &datadogProcessor{
logger: set.Logger,
nextConsumer: nextConsumer,
agent: datadog.NewAgent(ctx, in),
translator: trans,
in: in,
cfg: cfg,
exit: make(chan struct{}),
}, nil
}
// Start implements the component.Component interface.
func (p *datadogProcessor) Start(_ context.Context, host component.Host) error {
var datadogs []exporter.Metrics
loop:
for k, exp := range host.GetExporters()[component.DataTypeMetrics] { //nolint:staticcheck
mexp, ok := exp.(exporter.Metrics)
if !ok {
return fmt.Errorf("the exporter %q isn't a metrics exporter", k.String())
}
switch p.cfg.MetricsExporter {
case k:
// we found exactly the configured metrics exporter
p.metricsExporter = mexp
break loop
case datadogComponent:
// we are looking for the default "datadog" component
if k.Type() == datadogComponent.Type() {
// and k has the type, but not the name, so it's not an exact match. Store
// it for later; if we discover that k was the only Datadog component, we will
// will conclude that it is safe to use. Otherwise, we will fail and force the
// user to choose.
datadogs = append(datadogs, mexp)
}
}
}
if p.metricsExporter == nil {
// the exact component was not found
switch len(datadogs) {
case 0:
// no valid defaults to fall back to
return fmt.Errorf("failed to find metrics exporter %q; please specify a valid processor::datadog::metrics_exporter", p.cfg.MetricsExporter)
case 1:
// exactly one valid default to fall back to; use it
p.metricsExporter = datadogs[0]
default:
// too many defaults to fall back to; ambiguous situation; force the user to choose:
return fmt.Errorf("too many exporters of type %q; please choose one using processor::datadog::metrics_exporter", p.cfg.MetricsExporter)
}
}
p.started = true
p.agent.Start()
go p.run()
p.logger.Debug("Started datadogprocessor", zap.Stringer("metrics_exporter", p.cfg.MetricsExporter))
return nil
}
// Shutdown implements the component.Component interface.
func (p *datadogProcessor) Shutdown(context.Context) error {
if !p.started {
return nil
}
p.started = false
p.agent.Stop()
p.exit <- struct{}{} // signal exit
<-p.exit // wait for close
return nil
}
// Capabilities implements the consumer interface.
func (p *datadogProcessor) Capabilities() consumer.Capabilities {
// A resource attribute is added to traces to specify that stats have already
// been computed for them; thus, we end up mutating the data:
return consumer.Capabilities{MutatesData: true}
}
// ConsumeTraces implements consumer.Traces.
func (p *datadogProcessor) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
p.logger.Debug("Received traces.", zap.Int("spans", traces.SpanCount()))
p.agent.Ingest(ctx, traces)
return p.nextConsumer.ConsumeTraces(ctx, traces)
}
// run awaits incoming stats resulting from the agent's ingestion, converts them
// to metrics and flushes them using the configured metrics exporter.
func (p *datadogProcessor) run() {
defer close(p.exit)
for {
select {
case stats := <-p.in:
if len(stats.Stats) == 0 {
continue
}
mx := p.translator.StatsPayloadToMetrics(stats)
ctx := context.TODO()
p.logger.Debug("Exporting APM Stats metrics.", zap.Int("count", mx.MetricCount()))
if err := p.metricsExporter.ConsumeMetrics(ctx, mx); err != nil {
p.logger.Error("Error exporting metrics.", zap.Error(err))
}
case <-p.exit:
return
}
}
}