/
exporter.go
88 lines (77 loc) · 2.73 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package clickhouseprofileexporter
import (
"context"
"fmt"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/metrico/otel-collector/exporter/clickhouseprofileexporter/ch"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
)
const (
errorCodeError = "1"
errorCodeSuccess = ""
)
type clickhouseProfileExporter struct {
cfg *Config
set *exporter.CreateSettings
logger *zap.Logger
meter metric.Meter
ch clickhouseAccess
}
type clickhouseAccess interface {
// Inserts a profile batch into the clickhouse server
InsertBatch(profiles plog.Logs) (int, error)
// Shuts down the clickhouse connection
Shutdown() error
}
// TODO: batch like this https://github.com/open-telemetry/opentelemetry-collector/issues/8122
func newClickhouseProfileExporter(ctx context.Context, set *exporter.CreateSettings, cfg *Config) (*clickhouseProfileExporter, error) {
exp := &clickhouseProfileExporter{
cfg: cfg,
set: set,
logger: set.Logger,
meter: set.MeterProvider.Meter(typeStr),
}
opts, err := clickhouse.ParseDSN(cfg.Dsn)
if err != nil {
return nil, fmt.Errorf("failed to parse clickhouse dsn: %w", err)
}
ch, err := ch.NewClickhouseAccessNativeColumnar(opts, exp.logger)
if err != nil {
return nil, fmt.Errorf("failed to init native ch storage: %w", err)
}
exp.ch = ch
if err := initMetrics(exp.meter); err != nil {
exp.logger.Error(fmt.Sprintf("failed to init metrics: %s", err.Error()))
return exp, err
}
return exp, nil
}
// Sends the profiles to clickhouse server using the configured connection
func (exp *clickhouseProfileExporter) send(ctx context.Context, logs plog.Logs) error {
start := time.Now().UnixMilli()
sz, err := exp.ch.InsertBatch(logs)
if err != nil {
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeError)))
exp.logger.Error(fmt.Sprintf("failed to insert batch: [%s]", err.Error()))
return err
}
otelcolExporterClickhouseProfileBatchInsertDurationMillis.Record(ctx, time.Now().UnixMilli()-start, metric.WithAttributeSet(*newOtelcolAttrSetBatch(errorCodeSuccess)))
exp.logger.Info("inserted batch", zap.Int("size", sz))
return nil
}
func newOtelcolAttrSetBatch(errorCode string) *attribute.Set {
s := attribute.NewSet(attribute.KeyValue{Key: "error_code", Value: attribute.StringValue(errorCode)})
return &s
}
// Shuts down the exporter, by shutting down the ch connection pull
func (exp *clickhouseProfileExporter) Shutdown(ctx context.Context) error {
if err := exp.ch.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown: %w", err)
}
return nil
}