Skip to content

Commit

Permalink
clean up main instance creation from config
Browse files Browse the repository at this point in the history
  • Loading branch information
dhontecillas committed Feb 12, 2024
1 parent eb8a41d commit 5a781c8
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 66 deletions.
32 changes: 32 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,43 @@ type Config struct {
TraceSampleRate *float64 `json:"trace_sample_rate"`
}

func (c *Config) Validate() error {
return c.Exporters.Validate()
}

func (c *Config) UnsetFieldsToDefaults() {
if c.MetricReportingPeriod == nil {
reportingPeriod := 30
c.MetricReportingPeriod = &reportingPeriod
}
if c.TraceSampleRate == nil {
sampleRate := float64(1.0)
c.TraceSampleRate = &sampleRate
}
}

type Exporters struct {
OTLP []OTLPExporter `json:"otlp"`
Prometheus []PrometheusExporter `json:"prometheus"`
}

func (e *Exporters) Validate() error {
uniqueNames := make(map[string]bool, len(e.OTLP)+len(e.Prometheus))
for idx, ecfg := range e.OTLP {
if uniqueNames[ecfg.Name] {
return fmt.Errorf("OTLP Exporter with duplicate name: %s (at idx %d)", ecfg.Name, idx)
}
uniqueNames[ecfg.Name] = true
}
for idx, ecfg := range e.Prometheus {
if uniqueNames[ecfg.Name] {
return fmt.Errorf("Prometheus with duplicate name: %s (at idx %d)", ecfg.Name, idx)
}
uniqueNames[ecfg.Name] = true
}
return nil
}

type OTLPExporter struct {
Name string `json:"name"`
Host string `json:"host"`
Expand Down
60 changes: 30 additions & 30 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,48 @@ var (
registerOnce = new(sync.Once)
)

// Instances create instances for a given configuration.
func Instances(ctx context.Context, cfg *config.Config) (map[string]MetricReader, map[string]SpanExporter, []error) {
m := make(map[string]MetricReader)
s := make(map[string]SpanExporter)
var errList []error

uniqueNames := map[string]bool{}
// Create OTLP (OpenTelemetry Line Protocol) exporters
for idx, ecfg := range cfg.Exporters.OTLP {
if uniqueNames[ecfg.Name] {
err := fmt.Errorf("OTLP Exporter with duplicate name: %s (at idx %d) ignored", ecfg.Name, idx)
errList = append(errList, err)
continue
}
func CreateOTLPExporters(ctx context.Context, otlpConfs []config.OTLPExporter) (map[string]MetricReader, map[string]SpanExporter, error) {
m := make(map[string]MetricReader, len(otlpConfs))
s := make(map[string]SpanExporter, len(otlpConfs))
for idx, ecfg := range otlpConfs {
c, err := otelcollector.Exporter(ctx, ecfg)
if err != nil {
err := fmt.Errorf("OTLP Exporter %s (at idx %d) failed: %s", ecfg.Name, idx, err.Error())
errList = append(errList, err)
continue
return nil, nil, fmt.Errorf("OTLP Exporter %s (at idx %d) failed: %s", ecfg.Name, idx, err.Error())
}
uniqueNames[ecfg.Name] = true
s[ecfg.Name] = c
m[ecfg.Name] = c
}
return m, s, nil
}

// Create Prometheus exporters
for idx, ecfg := range cfg.Exporters.Prometheus {
if uniqueNames[ecfg.Name] {
err := fmt.Errorf("Prometheus Exporter with duplicate name: %s (at idx %d) ignored", ecfg.Name, idx)
errList = append(errList, err)
continue
}
func CreatePrometheusExporters(ctx context.Context, promConfs []config.PrometheusExporter) (map[string]MetricReader, error) {
m := make(map[string]MetricReader, len(promConfs))
for idx, ecfg := range promConfs {
c, err := prometheus.Exporter(ctx, ecfg)
if err != nil {
err := fmt.Errorf("Prometheus Exporter %s (at idx %d) failed: %s", ecfg.Name, idx, err.Error())
errList = append(errList, err)
continue
return nil, fmt.Errorf("Prometheus Exporter %s (at idx %d) failed: %s", ecfg.Name, idx, err.Error())
}
uniqueNames[ecfg.Name] = true
m[ecfg.Name] = c
}
return m, s, errList
return m, nil
}

// Instances create instances for a given configuration.
func Instances(ctx context.Context, cfg *config.Config) (map[string]MetricReader, map[string]SpanExporter, error) {
// Create OTLP (OpenTelemetry Line Protocol) exporters
m, s, err := CreateOTLPExporters(ctx, cfg.Exporters.OTLP)
if err != nil {
return nil, nil, err
}
// Create Prometheus exporters
pm, err := CreatePrometheusExporters(ctx, cfg.Exporters.Prometheus)
if err != nil {
return nil, nil, err
}
for k, v := range pm {
m[k] = v
}
return m, s, nil
}

// SetGlobalExporterInstances sets the provided metric and traces
Expand Down
13 changes: 2 additions & 11 deletions http/server/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,14 @@ func fromContext(ctx context.Context) *tracking {
return nil
}

// SetEndpointPattern allows to set the endpoint attribute once it
// has been matched down the http handling pipeline.
func SetEndpointPattern(ctx context.Context, endpointPattern string) {
if t := fromContext(ctx); t != nil {
t.endpointPattern = endpointPattern
}
}

/*
We have no way of cancelling an already started span
(we would need to match it at the http layer level):
func SkipPath(ctx context) {
if t := fromContext(ctx); t != nil {
t.skipPath = true
}
}
*/

func (t *tracking) Start() {
t.startTime = time.Now()
}
Expand Down
36 changes: 18 additions & 18 deletions otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,34 @@ func Register(ctx context.Context, srvCfg lconfig.ServiceConfig) (func(), error)
// parsed config: sets the global exporter instances, the global propagation method, and
// the global KrakenD otel state, so it can be used from anywhere.
func RegisterWithConfig(ctx context.Context, cfg *config.Config) (func(), error) {
me, te, errs := exporter.Instances(ctx, cfg)
shutdownFn := func() {}
if err := cfg.Validate(); err != nil {
return shutdownFn, err
}
cfg.UnsetFieldsToDefaults()

if len(errs) > 0 {
// we will report a single error each time (even when there
// might be multiple errors in the exporters config).
return shutdownFn, errs[0]
me, te, err := exporter.Instances(ctx, cfg)
if err != nil {
return shutdownFn, err
}
exporter.SetGlobalExporterInstances(me, te)
return RegisterGlobalInstance(ctx, me, te, *cfg.MetricReportingPeriod, *cfg.TraceSampleRate, cfg.ServiceName)
}

// RegisterGlobalInstance creates the instance that will be used to report metrics and traces
func RegisterGlobalInstance(ctx context.Context, me map[string]exporter.MetricReader, te map[string]exporter.SpanExporter,
metricReportingPeriod int, traceSampleRate float64, serviceName string) (func(), error) {

shutdownFn := func() {}
prop := propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)

if cfg.MetricReportingPeriod == nil {
reportingPeriod := 30
cfg.MetricReportingPeriod = &reportingPeriod
}
if cfg.TraceSampleRate == nil {
sampleRate := float64(1.0)
cfg.TraceSampleRate = &sampleRate
}
otel.SetTextMapPropagator(prop)

globalStateCfg := &state.OTELStateConfig{
MetricReportingPeriod: *cfg.MetricReportingPeriod,
TraceSampleRate: *cfg.TraceSampleRate,
MetricReportingPeriod: metricReportingPeriod,
TraceSampleRate: traceSampleRate,
MetricProviders: make([]string, 0, len(me)),
TraceProviders: make([]string, 0, len(te)),
}
Expand All @@ -86,12 +87,11 @@ func RegisterWithConfig(ctx context.Context, cfg *config.Config) (func(), error)
globalStateCfg.TraceProviders = append(globalStateCfg.TraceProviders, k)
}
}
s, err := state.NewWithVersion(cfg.ServiceName, globalStateCfg, lcore.KrakendVersion, me, te)
s, err := state.NewWithVersion(serviceName, globalStateCfg, lcore.KrakendVersion, me, te)
if err != nil {
return shutdownFn, err
}
shutdownFn = func() { s.Shutdown(ctx) }
otel.SetTextMapPropagator(prop)
state.SetGlobalState(s)
return shutdownFn, nil
}
7 changes: 0 additions & 7 deletions state/context.go
Original file line number Diff line number Diff line change
@@ -1,8 +1 @@
package state

const (
// KrakenDContextOTELStrKey is a special key to be used when there
// is no way to obtain the span context from an inner context
// (like when gin has not the fallback option enabled in the engine).
KrakenDContextOTELStrKey string = "KrakendD-Context-OTEL"
)

0 comments on commit 5a781c8

Please sign in to comment.