Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement spans exporting for ClickHouse storage in Jaeger V2 #4941

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageExporter struct {
config *Config
logger *zap.Logger
spanWriter spanstore.Writer
clickhouse bool
// Separate traces exporting function for ClickHouse storage.
// This is temporary until we have v2 storage API.
chExportTraces func(ctx context.Context, td ptrace.Traces) error
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -36,8 +41,15 @@ func (exp *storageExporter) start(_ context.Context, host component.Host) error
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
switch t := f.(type) {
case *ch.Factory:
haanhvu marked this conversation as resolved.
Show resolved Hide resolved
exp.clickhouse = true
exp.chExportTraces = t.ExportSpans
default:
exp.clickhouse = false
if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
}
}

return nil
Expand All @@ -49,6 +61,10 @@ func (*storageExporter) close(_ context.Context) error {
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
if exp.clickhouse {
return exp.chExportTraces(ctx, td)
}

batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/exporters/storageexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func createTracesExporter(ctx context.Context, set exporter.CreateSettings, conf
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(configretry.BackOffConfig{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
// Enable queue settings for Clickhouse only
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
exporterhelper.WithStart(ex.start),
exporterhelper.WithShutdown(ex.close),
)
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
badgerCfg "github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
grpcCfg "github.com/jaegertracing/jaeger/plugin/storage/grpc"
)

Expand All @@ -22,16 +23,12 @@ type Config struct {
Opensearch map[string]esCfg.Configuration `mapstructure:"opensearch"`
Elasticsearch map[string]esCfg.Configuration `mapstructure:"elasticsearch"`
Cassandra map[string]cassandra.Options `mapstructure:"cassandra"`
ClickHouse map[string]clickhouse.Config `mapstructure:"clickhouse"`
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
}

func (cfg *Config) Validate() error {
emptyCfg := createDefaultConfig().(*Config)
if reflect.DeepEqual(*cfg, *emptyCfg) {
Expand Down
39 changes: 27 additions & 12 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
Expand Down Expand Up @@ -71,24 +72,31 @@
}

type starter[Config any, Factory storage.Factory] struct {
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
ext *storageExt
storageKind string
cfg map[string]Config
builder func(Config, metrics.Factory, *zap.Logger) (Factory, error)
clickhouseBuilder func(context.Context, Config, *zap.Logger) Factory
}

func (s *starter[Config, Factory]) build(_ context.Context, _ component.Host) error {
func (s *starter[Config, Factory]) build(ctx context.Context, _ component.Host) error {

Check warning on line 82 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L82

Added line #L82 was not covered by tests
for name, cfg := range s.cfg {
if _, ok := s.ext.factories[name]; ok {
return fmt.Errorf("duplicate %s storage name %s", s.storageKind, name)
}
factory, err := s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)
var factory Factory
if s.clickhouseBuilder != nil {
factory = s.clickhouseBuilder(ctx, cfg, s.ext.logger.With(zap.String("storage_name", name)))
} else {
var err error
factory, err = s.builder(
cfg,
metrics.NullFactory,
s.ext.logger.With(zap.String("storage_name", name)),
)
if err != nil {
return fmt.Errorf("failed to initialize %s storage %s: %w", s.storageKind, name, err)

Check warning on line 98 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L87-L98

Added lines #L87 - L98 were not covered by tests
}
}
s.ext.factories[name] = factory
}
Expand Down Expand Up @@ -139,6 +147,12 @@
cfg: s.config.Cassandra,
builder: cassandra.NewFactoryWithConfig,
}
clickhouseStarter := &starter[clickhouse.Config, *clickhouse.Factory]{
ext: s,
storageKind: "clickhouse",
cfg: s.config.ClickHouse,
clickhouseBuilder: clickhouse.NewFactory,

Check warning on line 154 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L150-L154

Added lines #L150 - L154 were not covered by tests
}

builders := []func(ctx context.Context, host component.Host) error{
memStarter.build,
Expand All @@ -147,6 +161,7 @@
esStarter.build,
osStarter.build,
cassandraStarter.build,
clickhouseStarter.build,

Check warning on line 164 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L164

Added line #L164 was not covered by tests
// TODO add support for other backends
}
for _, builder := range builders {
Expand Down
53 changes: 53 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
# health_check:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679

jaeger_query:
trace_storage: ch_store
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000
clickhouse:
ch_store:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
spans_table_name: jaeger_spans

receivers:
otlp:
protocols:
grpc:
endpoint: 127.0.0.1:4317
http:
endpoint: 127.0.0.1:4318

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: ch_store
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ require (
)

require (
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.15.0
github.com/IBM/sarama v1.43.2 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/aws/aws-sdk-go v1.53.11 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
Expand All @@ -97,12 +100,14 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
Expand Down Expand Up @@ -156,6 +161,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.102.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
Expand All @@ -170,8 +176,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.4 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand All @@ -195,7 +203,7 @@ require (
go.opentelemetry.io/collector/exporter/debugexporter v0.102.1
go.opentelemetry.io/collector/extension/auth v0.102.1 // indirect
go.opentelemetry.io/collector/featuregate v1.9.0 // indirect
go.opentelemetry.io/collector/semconv v0.102.1 // indirect
go.opentelemetry.io/collector/semconv v0.102.1
go.opentelemetry.io/collector/service v0.102.1 // indirect
go.opentelemetry.io/contrib/config v0.7.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.27.0 // indirect
Expand Down