Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/util/workqueue/prometheus: fix double registration #77553

Merged
merged 2 commits into from Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/util/workqueue/prometheus/BUILD
Expand Up @@ -12,7 +12,6 @@ go_library(
deps = [
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

Expand Down
257 changes: 91 additions & 166 deletions pkg/util/workqueue/prometheus/prometheus.go
Expand Up @@ -18,7 +18,6 @@ package prometheus

import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -38,194 +37,120 @@ const (
RetriesKey = "retries_total"
)

var (
depth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
},
[]string{"name"},
)

adds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
},
[]string{"name"},
)

latency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)

workDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)

unfinished = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
},
[]string{"name"},
)

longestRunningProcessor = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
},
[]string{"name"},
)

retries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
},
[]string{"name"},
)
)

func registerMetrics() {
prometheus.MustRegister(
depth,
adds,
latency,
workDuration,
unfinished,
longestRunningProcessor,
retries,
)
}

func init() {
registerMetrics()
workqueue.SetProvider(prometheusMetricsProvider{})
}

type prometheusMetricsProvider struct{}

func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
return depth.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
return adds.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
latency := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
return latency.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register workDuration metric %v: %v", name, err)
}
return workDuration
return workDuration.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return unfinished
return unfinished.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
longestRunningProcessor := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(longestRunningProcessor); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return longestRunningProcessor
return longestRunningProcessor.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
}

// TODO(danielqsj): Remove the following metrics, they are deprecated
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "(Deprecated) Current depth of workqueue: " + name,
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
}

func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "adds",
Help: "(Deprecated) Total number of adds handled by workqueue: " + name,
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
}

func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
latency := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "queue_latency",
Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.",
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
}

func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "work_duration",
Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.",
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register work_duration metric %v: %v", name, err)
}
return workDuration
}

func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_seconds",
Help: "(Deprecated) How many seconds of work " + name + " has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished_work_seconds metric %v: %v", name, err)
}
return unfinished
}

func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_processor_microseconds",
Help: "(Deprecated) How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register longest_running_processor_microseconds metric %v: %v", name, err)
}
return unfinished
}

func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "retries",
Help: "(Deprecated) Total number of retries handled by workqueue: " + name,
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
return retries.WithLabelValues(name)
}
17 changes: 7 additions & 10 deletions staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
Expand Up @@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {

func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}

go ret.waitingLoop()
Expand All @@ -74,8 +73,7 @@ type delayingType struct {
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
deprecatedMetrics retryMetrics
metrics retryMetrics
}

// waitFor holds the data to add and the time it should be added
Expand Down Expand Up @@ -148,7 +146,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}

q.metrics.retry()
q.deprecatedMetrics.retry()

// immediately add things with no delay
if duration <= 0 {
Expand Down