Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue metrics #45734

Merged
merged 16 commits into from
Jul 21, 2023
3 changes: 3 additions & 0 deletions pilot/pkg/features/pilot.go
Expand Up @@ -673,6 +673,9 @@ var (

OptimizedConfigRebuild = env.Register("ENABLE_OPTIMIZED_CONFIG_REBUILD", true,
"If enabled, pilot will only rebuild config for resources that have changed").Get()

EnableControllerQueueMetrics = env.Register("ISTIO_ENABLE_CONTROLLER_QUEUE_METRICS", false,
"If enabled, publishes metrics for queue depth, latency and processing times.").Get()
)

// UnsafeFeaturesEnabled returns true if any unsafe features are enabled.
Expand Down
28 changes: 22 additions & 6 deletions pkg/queue/instance.go
Expand Up @@ -27,6 +27,12 @@ import (
// Task to be performed.
type Task func() error

type queueTask struct {
task Task
enqueueTime time.Time
startTime time.Time
}

// Instance of work tickets processed using a rate-limiting loop
type baseInstance interface {
// Push a task.
Expand All @@ -46,14 +52,15 @@ type Instance interface {

type queueImpl struct {
delay time.Duration
tasks []Task
tasks []*queueTask
cond *sync.Cond
closing bool
closed chan struct{}
closeOnce *sync.Once
// initialSync indicates the queue has initially "synced".
initialSync *atomic.Bool
id string
metrics *queueMetrics
}

// NewQueue instantiates a queue with a processing function
Expand All @@ -64,21 +71,23 @@ func NewQueue(errorDelay time.Duration) Instance {
func NewQueueWithID(errorDelay time.Duration, name string) Instance {
return &queueImpl{
delay: errorDelay,
tasks: make([]Task, 0),
tasks: make([]*queueTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
initialSync: atomic.NewBool(false),
cond: sync.NewCond(&sync.Mutex{}),
id: name,
metrics: newQueueMetrics(name),
}
}

func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, item)
q.tasks = append(q.tasks, &queueTask{task: item, enqueueTime: time.Now()})
q.metrics.depth.RecordInt(int64(len(q.tasks)))
}
q.cond.Signal()
}
Expand All @@ -89,7 +98,7 @@ func (q *queueImpl) Closed() <-chan struct{} {

// get blocks until it can return a task to be processed. If shutdown = true,
// the processing go routine should stop.
func (q *queueImpl) get() (task Task, shutdown bool) {
func (q *queueImpl) get() (task *queueTask, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for closing to be set, or a task to be pushed
Expand All @@ -105,6 +114,11 @@ func (q *queueImpl) get() (task Task, shutdown bool) {
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
q.tasks[0] = nil
q.tasks = q.tasks[1:]

task.startTime = time.Now()
q.metrics.depth.RecordInt(int64(len(q.tasks)))
q.metrics.latency.Record(time.Since(task.enqueueTime).Seconds())

return task, false
}

Expand All @@ -116,13 +130,15 @@ func (q *queueImpl) processNextItem() bool {
}

// Run the task.
if err := task(); err != nil {
if err := task.task(); err != nil {
delay := q.delay
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
q.Push(task)
q.Push(task.task)
})
}
q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())

return true
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/queue/instance_test.go
Expand Up @@ -26,24 +26,6 @@ import (
"istio.io/istio/pkg/test/util/retry"
)

func BenchmarkQueue(b *testing.B) {
for n := 0; n < b.N; n++ {
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
q.Push(func() error {
wg.Done()
return nil
})
}
wg.Wait()
close(s)
}
}

func TestOrdering(t *testing.T) {
numValues := 1000

Expand Down
61 changes: 61 additions & 0 deletions pkg/queue/metrics.go
@@ -0,0 +1,61 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
"time"

"k8s.io/utils/clock"

"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pkg/monitoring"
)

var (
queueIDTag = monitoring.CreateLabel("queueID")
enableMetric = monitoring.WithEnabled(func() bool {
return features.EnableControllerQueueMetrics
})
depth = monitoring.NewGauge("pilot_worker_queue_depth", "Depth of the controller queues", enableMetric)

latency = monitoring.NewDistribution("pilot_worker_queue_latency",
"Latency before the item is processed", []float64{.01, .1, .2, .5, 1, 3, 5}, enableMetric)

workDuration = monitoring.NewDistribution("pilot_worker_queue_duration",
"Time taken to process an item", []float64{.01, .1, .2, .5, 1, 3, 5}, enableMetric)
)

type queueMetrics struct {
depth monitoring.Metric
latency monitoring.Metric
workDuration monitoring.Metric
id string
clock clock.WithTicker
}

// Gets the time since the specified start in seconds.
func (m *queueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}

func newQueueMetrics(id string) *queueMetrics {
return &queueMetrics{
id: id,
depth: depth.With(queueIDTag.Value(id)),
workDuration: workDuration.With(queueIDTag.Value(id)),
latency: latency.With(queueIDTag.Value(id)),
clock: clock.RealClock{},
}
}
79 changes: 79 additions & 0 deletions pkg/queue/metrics_test.go
@@ -0,0 +1,79 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue

import (
"sync"
"testing"
"time"

"istio.io/istio/pilot/pkg/features"
)

func BenchmarkMetricsQueue(b *testing.B) {
features.EnableControllerQueueMetrics = true
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
for n := 0; n < b.N; n++ {
wg := sync.WaitGroup{}
wg.Add(1)
q.Push(func() error {
wg.Done()
return nil
})
wg.Wait()

}
close(s)
}

func BenchmarkMetricsQueueDisabled(b *testing.B) {
features.EnableControllerQueueMetrics = false
q := NewQueue(1 * time.Microsecond)
s := make(chan struct{})
go q.Run(s)
for n := 0; n < b.N; n++ {
wg := sync.WaitGroup{}
wg.Add(1)
q.Push(func() error {
wg.Done()
return nil
})
wg.Wait()
}
close(s)
}

func BenchmarkMetricsQueueInc(b *testing.B) {
q := newQueueMetrics("test")
for n := 0; n < b.N; n++ {
q.depth.Increment()
}
}

func BenchmarkMetricsQueueRec(b *testing.B) {
q := newQueueMetrics("test")
for n := 0; n < b.N; n++ {
q.depth.Record(100)
}
}

func BenchmarkMetricsQueueSinceInSeconds(b *testing.B) {
q := newQueueMetrics("test")
dt := time.Now()
for n := 0; n < b.N; n++ {
q.sinceInSeconds(dt)
}
}
10 changes: 10 additions & 0 deletions releasenotes/notes/45734.yaml
@@ -0,0 +1,10 @@
apiVersion: release-notes/v2
kind: feature
area: telemetry

issue:
- 44985

releaseNotes:
- |
**Added** support for K8s controller queue metrics, enabled by setting env variable `ISTIO_ENABLE_CONTROLLER_QUEUE_METRICS` as `true`.