Skip to content

Commit

Permalink
Merge pull request #77553 from s-urbaniak/fix-76956
Browse files Browse the repository at this point in the history
pkg/util/workqueue/prometheus: fix double registration
  • Loading branch information
k8s-ci-robot committed Jul 23, 2019
2 parents 08f9f2b + 4532cfd commit f101466
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 351 deletions.
1 change: 0 additions & 1 deletion pkg/util/workqueue/prometheus/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ go_library(
deps = [
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

Expand Down
257 changes: 91 additions & 166 deletions pkg/util/workqueue/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package prometheus

import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -38,194 +37,120 @@ const (
RetriesKey = "retries_total"
)

var (
depth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
},
[]string{"name"},
)

adds = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
},
[]string{"name"},
)

latency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)

workDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
},
[]string{"name"},
)

unfinished = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
},
[]string{"name"},
)

longestRunningProcessor = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
},
[]string{"name"},
)

retries = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
},
[]string{"name"},
)
)

func registerMetrics() {
prometheus.MustRegister(
depth,
adds,
latency,
workDuration,
unfinished,
longestRunningProcessor,
retries,
)
}

func init() {
registerMetrics()
workqueue.SetProvider(prometheusMetricsProvider{})
}

type prometheusMetricsProvider struct{}

func (prometheusMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
return depth.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
return adds.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
latency := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
return latency.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
workDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
ConstLabels: prometheus.Labels{"name": name},
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 10),
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register workDuration metric %v: %v", name, err)
}
return workDuration
return workDuration.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return unfinished
return unfinished.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
longestRunningProcessor := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(longestRunningProcessor); err != nil {
klog.Errorf("failed to register unfinished metric %v: %v", name, err)
}
return longestRunningProcessor
return longestRunningProcessor.WithLabelValues(name)
}

func (prometheusMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
ConstLabels: prometheus.Labels{"name": name},
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
}

// TODO(danielqsj): Remove the following metrics, they are deprecated
func (prometheusMetricsProvider) NewDeprecatedDepthMetric(name string) workqueue.GaugeMetric {
depth := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "depth",
Help: "(Deprecated) Current depth of workqueue: " + name,
})
if err := prometheus.Register(depth); err != nil {
klog.Errorf("failed to register depth metric %v: %v", name, err)
}
return depth
}

func (prometheusMetricsProvider) NewDeprecatedAddsMetric(name string) workqueue.CounterMetric {
adds := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "adds",
Help: "(Deprecated) Total number of adds handled by workqueue: " + name,
})
if err := prometheus.Register(adds); err != nil {
klog.Errorf("failed to register adds metric %v: %v", name, err)
}
return adds
}

func (prometheusMetricsProvider) NewDeprecatedLatencyMetric(name string) workqueue.SummaryMetric {
latency := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "queue_latency",
Help: "(Deprecated) How long an item stays in workqueue" + name + " before being requested.",
})
if err := prometheus.Register(latency); err != nil {
klog.Errorf("failed to register latency metric %v: %v", name, err)
}
return latency
}

func (prometheusMetricsProvider) NewDeprecatedWorkDurationMetric(name string) workqueue.SummaryMetric {
workDuration := prometheus.NewSummary(prometheus.SummaryOpts{
Subsystem: name,
Name: "work_duration",
Help: "(Deprecated) How long processing an item from workqueue" + name + " takes.",
})
if err := prometheus.Register(workDuration); err != nil {
klog.Errorf("failed to register work_duration metric %v: %v", name, err)
}
return workDuration
}

func (prometheusMetricsProvider) NewDeprecatedUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "unfinished_work_seconds",
Help: "(Deprecated) How many seconds of work " + name + " has done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register unfinished_work_seconds metric %v: %v", name, err)
}
return unfinished
}

func (prometheusMetricsProvider) NewDeprecatedLongestRunningProcessorMicrosecondsMetric(name string) workqueue.SettableGaugeMetric {
unfinished := prometheus.NewGauge(prometheus.GaugeOpts{
Subsystem: name,
Name: "longest_running_processor_microseconds",
Help: "(Deprecated) How many microseconds has the longest running " +
"processor for " + name + " been running.",
})
if err := prometheus.Register(unfinished); err != nil {
klog.Errorf("failed to register longest_running_processor_microseconds metric %v: %v", name, err)
}
return unfinished
}

func (prometheusMetricsProvider) NewDeprecatedRetriesMetric(name string) workqueue.CounterMetric {
retries := prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: name,
Name: "retries",
Help: "(Deprecated) Total number of retries handled by workqueue: " + name,
})
if err := prometheus.Register(retries); err != nil {
klog.Errorf("failed to register retries metric %v: %v", name, err)
}
return retries
return retries.WithLabelValues(name)
}
17 changes: 7 additions & 10 deletions staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {

func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
deprecatedMetrics: newDeprecatedRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}

go ret.waitingLoop()
Expand All @@ -77,8 +76,7 @@ type delayingType struct {
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
deprecatedMetrics retryMetrics
metrics retryMetrics
}

// waitFor holds the data to add and the time it should be added
Expand Down Expand Up @@ -154,7 +152,6 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
}

q.metrics.retry()
q.deprecatedMetrics.retry()

// immediately add things with no delay
if duration <= 0 {
Expand Down
Loading

0 comments on commit f101466

Please sign in to comment.