Skip to content

Commit

Permalink
Queue metrics (#45734)
Browse files Browse the repository at this point in the history
* Added metrics to worker queue

* Enable metrics only behind a flag

* Metrics behind a flag

* Disabling a test log statement

* Use .with while initialising

* Review comments and lint fix

* Tests fix, some lint

* Lint fix

* Lint fix

* Add releaseNotes

* add benchmarks

* Changed approach to maintain timestamps with objects

* Lint fix

* Review comments: turned to pointer queue

* Added monitoring.RegisterIf

* Optional registration of metrics, new way

---------

Co-authored-by: Timothy Ehlers <tehlers@expedia.com>
  • Loading branch information
ra-grover and tehlers320 committed Jul 21, 2023
1 parent 4f1b0f6 commit f371c65
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pilot/pkg/features/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ var (

OptimizedConfigRebuild = env.Register("ENABLE_OPTIMIZED_CONFIG_REBUILD", true,
"If enabled, pilot will only rebuild config for resources that have changed").Get()

EnableControllerQueueMetrics = env.Register("ISTIO_ENABLE_CONTROLLER_QUEUE_METRICS", false,
"If enabled, publishes metrics for queue depth, latency and processing times.").Get()
)

// UnsafeFeaturesEnabled returns true if any unsafe features are enabled.
Expand Down
28 changes: 22 additions & 6 deletions pkg/queue/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
// Task to be performed.
type Task func() error

type queueTask struct {
task Task
enqueueTime time.Time
startTime time.Time
}

// Instance of work tickets processed using a rate-limiting loop
type baseInstance interface {
// Push a task.
Expand All @@ -46,14 +52,15 @@ type Instance interface {

type queueImpl struct {
delay time.Duration
tasks []Task
tasks []*queueTask
cond *sync.Cond
closing bool
closed chan struct{}
closeOnce *sync.Once
// initialSync indicates the queue has initially "synced".
initialSync *atomic.Bool
id string
metrics *queueMetrics
}

// NewQueue instantiates a queue with a processing function
Expand All @@ -64,21 +71,23 @@ func NewQueue(errorDelay time.Duration) Instance {
func NewQueueWithID(errorDelay time.Duration, name string) Instance {
return &queueImpl{
delay: errorDelay,
tasks: make([]Task, 0),
tasks: make([]*queueTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
initialSync: atomic.NewBool(false),
cond: sync.NewCond(&sync.Mutex{}),
id: name,
metrics: newQueueMetrics(name),
}
}

func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, item)
q.tasks = append(q.tasks, &queueTask{task: item, enqueueTime: time.Now()})
q.metrics.depth.RecordInt(int64(len(q.tasks)))
}
q.cond.Signal()
}
Expand All @@ -89,7 +98,7 @@ func (q *queueImpl) Closed() <-chan struct{} {

// get blocks until it can return a task to be processed. If shutdown = true,
// the processing go routine should stop.
func (q *queueImpl) get() (task Task, shutdown bool) {
func (q *queueImpl) get() (task *queueTask, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for closing to be set, or a task to be pushed
Expand All @@ -105,6 +114,11 @@ func (q *queueImpl) get() (task Task, shutdown bool) {
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
q.tasks[0] = nil
q.tasks = q.tasks[1:]

task.startTime = time.Now()
q.metrics.depth.RecordInt(int64(len(q.tasks)))
q.metrics.latency.Record(time.Since(task.enqueueTime).Seconds())

return task, false
}

Expand All @@ -116,13 +130,15 @@ func (q *queueImpl) processNextItem() bool {
}

// Run the task.
if err := task(); err != nil {
if err := task.task(); err != nil {
delay := q.delay
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
q.Push(task)
q.Push(task.task)
})
}
q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())

return true
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/queue/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,6 @@ import (
"istio.io/istio/pkg/test/util/retry"
)

func BenchmarkQueue(b *testing.B) {
for n := 0; n < b.N; n++ {
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
q.Push(func() error {
wg.Done()
return nil
})
}
wg.Wait()
close(s)
}
}

func TestOrdering(t *testing.T) {
numValues := 1000

Expand Down
61 changes: 61 additions & 0 deletions pkg/queue/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
"time"

"k8s.io/utils/clock"

"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pkg/monitoring"
)

var (
queueIDTag = monitoring.CreateLabel("queueID")
enableMetric = monitoring.WithEnabled(func() bool {
return features.EnableControllerQueueMetrics
})
depth = monitoring.NewGauge("pilot_worker_queue_depth", "Depth of the controller queues", enableMetric)

latency = monitoring.NewDistribution("pilot_worker_queue_latency",
"Latency before the item is processed", []float64{.01, .1, .2, .5, 1, 3, 5}, enableMetric)

workDuration = monitoring.NewDistribution("pilot_worker_queue_duration",
"Time taken to process an item", []float64{.01, .1, .2, .5, 1, 3, 5}, enableMetric)
)

type queueMetrics struct {
depth monitoring.Metric
latency monitoring.Metric
workDuration monitoring.Metric
id string
clock clock.WithTicker
}

// Gets the time since the specified start in seconds.
func (m *queueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}

func newQueueMetrics(id string) *queueMetrics {
return &queueMetrics{
id: id,
depth: depth.With(queueIDTag.Value(id)),
workDuration: workDuration.With(queueIDTag.Value(id)),
latency: latency.With(queueIDTag.Value(id)),
clock: clock.RealClock{},
}
}
79 changes: 79 additions & 0 deletions pkg/queue/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue

import (
"sync"
"testing"
"time"

"istio.io/istio/pilot/pkg/features"
)

func BenchmarkMetricsQueue(b *testing.B) {
features.EnableControllerQueueMetrics = true
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
for n := 0; n < b.N; n++ {
wg := sync.WaitGroup{}
wg.Add(1)
q.Push(func() error {
wg.Done()
return nil
})
wg.Wait()

}
close(s)
}

func BenchmarkMetricsQueueDisabled(b *testing.B) {
features.EnableControllerQueueMetrics = false
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
for n := 0; n < b.N; n++ {
wg := sync.WaitGroup{}
wg.Add(1)
q.Push(func() error {
wg.Done()
return nil
})
wg.Wait()
}
close(s)
}

func BenchmarkMetricsQueueInc(b *testing.B) {
q := newQueueMetrics("test")
for n := 0; n < b.N; n++ {
q.depth.Increment()
}
}

func BenchmarkMetricsQueueRec(b *testing.B) {
q := newQueueMetrics("test")
for n := 0; n < b.N; n++ {
q.depth.Record(100)
}
}

func BenchmarkMetricsQueueSinceInSeconds(b *testing.B) {
q := newQueueMetrics("test")
dt := time.Now()
for n := 0; n < b.N; n++ {
q.sinceInSeconds(dt)
}
}
10 changes: 10 additions & 0 deletions releasenotes/notes/45734.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: release-notes/v2
kind: feature
area: telemetry

issue:
- 44985

releaseNotes:
- |
**Added** support for K8s controller queue metrics, enabled by setting env variable `ISTIO_ENABLE_CONTROLLER_QUEUE_METRICS` as `true`.

0 comments on commit f371c65

Please sign in to comment.