Skip to content

Commit

Permalink
Added metrics to worker queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ra-grover committed Jul 20, 2023
1 parent b3ee713 commit aeb7e34
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 7 deletions.
20 changes: 13 additions & 7 deletions pkg/queue/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ type Instance interface {

type queueImpl struct {
delay time.Duration
tasks []Task
tasks []*Task
cond *sync.Cond
closing bool
closed chan struct{}
closeOnce *sync.Once
// initialSync indicates the queue has initially "synced".
initialSync *atomic.Bool
id string
metrics *queueMetrics
}

// NewQueue instantiates a queue with a processing function
Expand All @@ -64,21 +65,23 @@ func NewQueue(errorDelay time.Duration) Instance {
func NewQueueWithID(errorDelay time.Duration, name string) Instance {
return &queueImpl{
delay: errorDelay,
tasks: make([]Task, 0),
tasks: make([]*Task, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
initialSync: atomic.NewBool(false),
cond: sync.NewCond(&sync.Mutex{}),
id: name,
metrics: NewQueueMetrics(name),
}
}

func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
q.tasks = append(q.tasks, item)
q.tasks = append(q.tasks, &item)
q.metrics.add(&item)
}
q.cond.Signal()
}
Expand All @@ -89,7 +92,7 @@ func (q *queueImpl) Closed() <-chan struct{} {

// get blocks until it can return a task to be processed. If shutdown = true,
// the processing go routine should stop.
func (q *queueImpl) get() (task Task, shutdown bool) {
func (q *queueImpl) get() (task *Task, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for closing to be set, or a task to be pushed
Expand All @@ -105,6 +108,7 @@ func (q *queueImpl) get() (task Task, shutdown bool) {
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
q.tasks[0] = nil
q.tasks = q.tasks[1:]
q.metrics.get(task)
return task, false
}

Expand All @@ -114,15 +118,17 @@ func (q *queueImpl) processNextItem() bool {
if shuttingdown {
return false
}

//time.Sleep(time.Millisecond * 500)
callback := *task
// Run the task.
if err := task(); err != nil {
if err := callback(); err != nil {
delay := q.delay
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
q.Push(task)
q.Push(callback)
})
}
q.metrics.done(task)
return true
}

Expand Down
110 changes: 110 additions & 0 deletions pkg/queue/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package queue

import (
"istio.io/istio/pkg/monitoring"
"k8s.io/utils/clock"
"time"
)

var (
queueIdTag = monitoring.MustCreateLabel("queueId")

depth = monitoring.NewGauge("pilot_worker_queue_depth", "Depth of the controller queues", monitoring.WithLabels(queueIdTag))

adds = monitoring.NewSum("pilot_worker_queue_adds", "Adds to the controller queues", monitoring.WithLabels(queueIdTag))

latency = monitoring.NewDistribution(
"pilot_worker_queue_latency",
"Latency before the item is processed",
[]float64{.01, .1, 0.5, 1, 3, 5},
monitoring.WithLabels(queueIdTag))

workDuration = monitoring.NewDistribution("pilot_worker_queue_duration",
"Time taken to process an item",
[]float64{.01, .1, 0.5, 1, 3, 5},
monitoring.WithLabels(queueIdTag))
)

type queueMetrics struct {
depth monitoring.Metric
adds monitoring.Metric
latency monitoring.Metric
workDuration monitoring.Metric
id string
addTimes map[*Task]time.Time
processingStartTimes map[*Task]time.Time
clock clock.WithTicker
queueDepth int64
}

func (m *queueMetrics) add(item *Task) {
if m == nil {
return
}
m.queueDepth++
//log.Infof("the current length is %d", m.queueDepth)
m.adds.With(queueIdTag.Value(m.id)).Increment()
m.depth.With(queueIdTag.Value(m.id)).RecordInt(m.queueDepth)
if _, exists := m.addTimes[item]; !exists {
m.addTimes[item] = m.clock.Now()
}
}

func (m *queueMetrics) get(item *Task) {
if m == nil {
return
}
m.queueDepth--
m.depth.With(queueIdTag.Value(m.id)).RecordInt(m.queueDepth)
m.processingStartTimes[item] = m.clock.Now()
if startTime, exists := m.addTimes[item]; exists {
m.latency.With(queueIdTag.Value(m.id)).Record(m.sinceInSeconds(startTime))
delete(m.addTimes, item)
}
}

func (m *queueMetrics) done(item *Task) {
if m == nil {
return
}

if startTime, exists := m.processingStartTimes[item]; exists {
m.workDuration.With(queueIdTag.Value(m.id)).Record(m.sinceInSeconds(startTime))
delete(m.processingStartTimes, item)
}
}

// Gets the time since the specified start in seconds.
func (m *queueMetrics) sinceInSeconds(start time.Time) float64 {
return m.clock.Since(start).Seconds()
}

func NewQueueMetrics(id string) *queueMetrics {
return &queueMetrics{
id: id,
depth: depth,
workDuration: workDuration,
latency: latency,
adds: adds,
clock: clock.RealClock{},
addTimes: map[*Task]time.Time{},
processingStartTimes: map[*Task]time.Time{},
}
}

func init() {
monitoring.MustRegister(adds, depth, latency, workDuration)
}

0 comments on commit aeb7e34

Please sign in to comment.