-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
jaegerstorage.go
97 lines (83 loc) · 2.7 KB
/
jaegerstorage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0
package datareceivers
import (
"context"
"fmt"
"time"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/receiver"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver"
)
type jaegerStorageDataReceiver struct {
TelemetrySettings component.TelemetrySettings
TraceStorage string
StorageConfig *jaegerstorage.Config
host *storagetest.StorageHost
receiver receiver.Traces
}
func NewJaegerStorageDataReceiver(
telemetrySettings component.TelemetrySettings,
traceStorage string,
storageConfig *jaegerstorage.Config,
) testbed.DataReceiver {
return &jaegerStorageDataReceiver{
TelemetrySettings: telemetrySettings,
TraceStorage: traceStorage,
StorageConfig: storageConfig,
}
}
func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
ctx := context.Background()
extSet := extension.CreateSettings{
ID: jaegerstorage.ID,
TelemetrySettings: dr.TelemetrySettings,
}
extFactory := jaegerstorage.NewFactory()
ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig)
if err != nil {
return err
}
rcvSet := receiver.CreateSettings{
ID: storagereceiver.ID,
TelemetrySettings: dr.TelemetrySettings,
}
rcvFactory := storagereceiver.NewFactory()
rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config)
rcvCfg.TraceStorage = dr.TraceStorage
rcvCfg.PullInterval = 100 * time.Millisecond
rcv, err := rcvFactory.CreateTracesReceiver(ctx, rcvSet, rcvCfg, tc)
if err != nil {
return err
}
dr.receiver = rcv
dr.host = storagetest.NewStorageHost()
dr.host.WithExtension(jaegerstorage.ID, ext)
err = dr.host.GetExtensions()[jaegerstorage.ID].Start(ctx, dr.host)
if err != nil {
return err
}
return dr.receiver.Start(ctx, dr.host)
}
func (dr *jaegerStorageDataReceiver) Stop() error {
ctx := context.Background()
err := dr.receiver.Shutdown(ctx)
if err != nil {
return err
}
return dr.host.GetExtensions()[jaegerstorage.ID].Shutdown(ctx)
}
func (dr *jaegerStorageDataReceiver) GenConfigYAMLStr() string {
return fmt.Sprintf(`
jaeger_storage_receiver:
trace_storage: %s
`, dr.TraceStorage)
}
func (dr *jaegerStorageDataReceiver) ProtocolName() string {
return "jaeger_storage_receiver"
}