Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cli): break down observability.startMetrics() #3196

Merged
merged 5 commits into from Aug 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
130 changes: 71 additions & 59 deletions cli/observability_flags.go
Expand Up @@ -79,72 +79,98 @@
}

func (c *observabilityFlags) startMetrics(ctx context.Context) error {
if c.metricsListenAddr != "" {
m := mux.NewRouter()
initPrometheus(m)

if c.enablePProf {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595
}
c.maybeStartListener(ctx)

if err := c.maybeStartMetricsPusher(ctx); err != nil {
return err
}

return c.maybeStartTraceExporter()
}

// Starts observability listener when a listener address is specified.
func (c *observabilityFlags) maybeStartListener(ctx context.Context) {
if c.metricsListenAddr == "" {
return
}

log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)
m := mux.NewRouter()
initPrometheus(m)

Check warning on line 98 in cli/observability_flags.go

View check run for this annotation

Codecov / codecov/patch

cli/observability_flags.go#L97-L98

Added lines #L97 - L98 were not covered by tests

go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec
if c.enablePProf {
m.HandleFunc("/debug/pprof/", pprof.Index)
m.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.HandleFunc("/debug/pprof/trace", pprof.Trace)
m.HandleFunc("/debug/pprof/{cmd}", pprof.Index) // special handling for Gorilla mux, see https://stackoverflow.com/questions/30560859/cant-use-go-tool-pprof-with-an-existing-server/71032595#71032595

Check warning on line 106 in cli/observability_flags.go

View check run for this annotation

Codecov / codecov/patch

cli/observability_flags.go#L100-L106

Added lines #L100 - L106 were not covered by tests
}

if c.metricsPushAddr != "" {
c.stopPusher = make(chan struct{})
c.pusherWG.Add(1)
log(ctx).Infof("starting prometheus metrics on %v", c.metricsListenAddr)

Check warning on line 109 in cli/observability_flags.go

View check run for this annotation

Codecov / codecov/patch

cli/observability_flags.go#L109

Added line #L109 was not covered by tests

pusher := push.New(c.metricsPushAddr, c.metricsJob)
go http.ListenAndServe(c.metricsListenAddr, m) //nolint:errcheck,gosec

Check warning on line 111 in cli/observability_flags.go

View check run for this annotation

Codecov / codecov/patch

cli/observability_flags.go#L111

Added line #L111 was not covered by tests
}

pusher.Gatherer(prometheus.DefaultGatherer)
func (c *observabilityFlags) maybeStartMetricsPusher(ctx context.Context) error {
if c.metricsPushAddr == "" {
return nil
}

for _, g := range c.metricsGroupings {
const nParts = 2
c.stopPusher = make(chan struct{})
c.pusherWG.Add(1)

parts := strings.SplitN(g, ":", nParts)
if len(parts) != nParts {
return errors.Errorf("grouping must be name:value")
}
pusher := push.New(c.metricsPushAddr, c.metricsJob)

name := parts[0]
val := parts[1]
pusher.Gatherer(prometheus.DefaultGatherer)

pusher.Grouping(name, val)
}
for _, g := range c.metricsGroupings {
const nParts = 2

if c.metricsPushUsername != "" {
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
parts := strings.SplitN(g, ":", nParts)
if len(parts) != nParts {
return errors.Errorf("grouping must be name:value")
}

if c.metricsPushFormat != "" {
pusher.Format(metricsPushFormats[c.metricsPushFormat])
}
name := parts[0]
val := parts[1]

log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
c.pushOnce(ctx, "initial", pusher)
pusher.Grouping(name, val)
}

go c.pushPeriodically(ctx, pusher)
if c.metricsPushUsername != "" {
pusher.BasicAuth(c.metricsPushUsername, c.metricsPushPassword)
}

se, err := c.getSpanExporter()
if err != nil {
return err
if c.metricsPushFormat != "" {
pusher.Format(metricsPushFormats[c.metricsPushFormat])
}

log(ctx).Infof("starting prometheus pusher on %v every %v", c.metricsPushAddr, c.metricsPushInterval)
c.pushOnce(ctx, "initial", pusher)

go c.pushPeriodically(ctx, pusher)

return nil
}

func (c *observabilityFlags) maybeStartTraceExporter() error {
if !c.enableJaeger {
return nil
}

r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("kopia"),
semconv.ServiceVersionKey.String(repo.BuildVersion),
)
// Create the Jaeger exporter
se, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
return errors.Wrap(err, "unable to create Jaeger exporter")
}

Check warning on line 165 in cli/observability_flags.go

View check run for this annotation

Codecov / codecov/patch

cli/observability_flags.go#L164-L165

Added lines #L164 - L165 were not covered by tests

if se != nil {
r := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("kopia"),
semconv.ServiceVersionKey.String(repo.BuildVersion),
)

tp := trace.NewTracerProvider(
trace.WithBatcher(se),
trace.WithResource(r),
Expand All @@ -158,20 +184,6 @@
return nil
}

func (c *observabilityFlags) getSpanExporter() (trace.SpanExporter, error) {
if c.enableJaeger {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint())
if err != nil {
return nil, errors.Wrap(err, "unable to create Jaeger exporter")
}

return exp, nil
}

return nil, nil
}

func (c *observabilityFlags) stopMetrics(ctx context.Context) {
if c.stopPusher != nil {
close(c.stopPusher)
Expand Down