New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue metrics #45734
Queue metrics #45734
Conversation
😊 Welcome @ra-grover! This is either your first contribution to the Istio istio repo, or it's been You can learn more about the Istio working groups, code of conduct, and contributing guidelines Thanks for contributing! Courtesy of your friendly welcome wagon. |
Hi @ra-grover. Thanks for your PR. I'm waiting for a istio member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/ok-to-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor nits
WDYT @ramaraochavali ?
pkg/queue/instance.go
Outdated
@@ -46,14 +47,16 @@ type Instance interface { | |||
|
|||
type queueImpl struct { | |||
delay time.Duration | |||
tasks []Task | |||
tasks []*Task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a pointer to the Task? Task is already a function which is basically a pointer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not able to use Task
as a key to a map, keys have to be comparable
. That map is being used to maintain the add times and processing times. Thats why I used a pointer to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified by not storing a map at all. Or maybe getting rid of metrics
entirely. Something like this:
diff --git a/pkg/queue/instance.go b/pkg/queue/instance.go
index ea29930966..10854e6824 100644
--- a/pkg/queue/instance.go
+++ b/pkg/queue/instance.go
@@ -47,7 +47,7 @@ type Instance interface {
type queueImpl struct {
delay time.Duration
- tasks []*Task
+ tasks []queueTask
cond *sync.Cond
closing bool
closed chan struct{}
@@ -70,7 +70,7 @@ func NewQueueWithID(errorDelay time.Duration, name string) Instance {
}
return &queueImpl{
delay: errorDelay,
- tasks: make([]*Task, 0),
+ tasks: make([]queueTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
@@ -81,11 +81,17 @@ func NewQueueWithID(errorDelay time.Duration, name string) Instance {
}
}
+type queueTask struct {
+ task Task
+ enqueueTime time.Time
+ startTime time.Time
+}
+
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
- q.tasks = append(q.tasks, &item)
+ q.tasks = append(q.tasks, queueTask{task: item, enqueueTime: time.Now()})
if q.metrics != nil {
q.metrics.add(&item)
}
@@ -100,7 +106,7 @@ func (q *queueImpl) Closed() <-chan struct{} {
// get blocks until it can return a task to be processed. If shutdown = true,
// the processing go routine should stop.
-func (q *queueImpl) get() (task *Task, shutdown bool) {
+func (q *queueImpl) get() (task queueTask, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for closing to be set, or a task to be pushed
@@ -110,15 +116,15 @@ func (q *queueImpl) get() (task *Task, shutdown bool) {
if q.closing && len(q.tasks) == 0 {
// We must be shutting down.
- return nil, true
+ return queueTask{}, true
}
task = q.tasks[0]
+ task.startTime = time.Now()
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
- q.tasks[0] = nil
+ q.tasks[0] = queueTask{}
q.tasks = q.tasks[1:]
- if q.metrics != nil {
- q.metrics.get(task)
- }
+
+ q.metrics.depth.RecordInt(int64(len(q.tasks)))
return task, false
}
@@ -131,16 +137,14 @@ func (q *queueImpl) processNextItem() bool {
}
// Run the task.
- if err := (*task)(); err != nil {
+ if err := task.task(); err != nil {
delay := q.delay
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
- q.Push(*task)
+ q.Push(task.task)
})
}
- if q.metrics != nil {
- q.metrics.done(task)
- }
+ q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())
return true
}
With metrics we are basically keeping all the same info twice. This may be higher cost and easy to make mistakes where they fall out of sync. Instead we can just pass around the timings? WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ive been following along on this a bit. I made a PR for some benchmarks. Basically the existing benchmark doesn't work well anymore with this change because of the way its setup.
It makes a new queue over and over again which isnt really what we want to test
for n := 0; n < b.N; n++ {
q := NewQueue(1 * time.Microsecond)
go test -bench=. -count 1 -benchmem
2023-07-06T21:53:46.190758Z info Work item handle failed (fake error), retry after delay 1µs
goos: darwin
goarch: arm64
pkg: istio.io/istio/pkg/queue
BenchmarkQueue-10 766 1366433 ns/op 630649 B/op 18114 allocs/op
BenchmarkMetricsQueue-10 685503 1694 ns/op 560 B/op 19 allocs/op
BenchmarkMetricsQueueDisabled-10 2983182 405.9 ns/op 48 B/op 4 allocs/op
BenchmarkMetricsQueueAdd-10 4464358 267.9 ns/op 106 B/op 3 allocs/op
BenchmarkMetricsQueueInc-10 4663022 257.1 ns/op 74 B/op 2 allocs/op
BenchmarkMetricsQueueRec-10 4460540 268.8 ns/op 106 B/op 3 allocs/op
BenchmarkMetricsQueueSinceInSeconds-10 36536446 32.18 ns/op 0 B/op 0 allocs/op
BenchmarkMetricsQueueGet-10 1609574 753.7 ns/op 320 B/op 11 allocs/op
PASS
ok istio.io/istio/pkg/queue 12.103s
I'm not sure how much value the others are but these 2 show the real impact of this change and would love to see how much impact taking the mapping out has.
BenchmarkMetricsQueue-10 685503 1694 ns/op 560 B/op 19 allocs/op
BenchmarkMetricsQueueDisabled-10 2983182 405.9 ns/op 48 B/op 4 allocs/op
/test lint |
Sure makes sense |
pkg/queue/instance.go
Outdated
}) | ||
} | ||
if q.metrics != nil { | ||
q.metrics.done(task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably should do this even in event of an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, if I am missing something, but if there is an error it will be recorded in the same flow as there is no early return in case of error and time.AfterFunc
is non-blocking.
pkg/queue/instance.go
Outdated
@@ -46,14 +47,16 @@ type Instance interface { | |||
|
|||
type queueImpl struct { | |||
delay time.Duration | |||
tasks []Task | |||
tasks []*Task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be simplified by not storing a map at all. Or maybe getting rid of metrics
entirely. Something like this:
diff --git a/pkg/queue/instance.go b/pkg/queue/instance.go
index ea29930966..10854e6824 100644
--- a/pkg/queue/instance.go
+++ b/pkg/queue/instance.go
@@ -47,7 +47,7 @@ type Instance interface {
type queueImpl struct {
delay time.Duration
- tasks []*Task
+ tasks []queueTask
cond *sync.Cond
closing bool
closed chan struct{}
@@ -70,7 +70,7 @@ func NewQueueWithID(errorDelay time.Duration, name string) Instance {
}
return &queueImpl{
delay: errorDelay,
- tasks: make([]*Task, 0),
+ tasks: make([]queueTask, 0),
closing: false,
closed: make(chan struct{}),
closeOnce: &sync.Once{},
@@ -81,11 +81,17 @@ func NewQueueWithID(errorDelay time.Duration, name string) Instance {
}
}
+type queueTask struct {
+ task Task
+ enqueueTime time.Time
+ startTime time.Time
+}
+
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
- q.tasks = append(q.tasks, &item)
+ q.tasks = append(q.tasks, queueTask{task: item, enqueueTime: time.Now()})
if q.metrics != nil {
q.metrics.add(&item)
}
@@ -100,7 +106,7 @@ func (q *queueImpl) Closed() <-chan struct{} {
// get blocks until it can return a task to be processed. If shutdown = true,
// the processing go routine should stop.
-func (q *queueImpl) get() (task *Task, shutdown bool) {
+func (q *queueImpl) get() (task queueTask, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// wait for closing to be set, or a task to be pushed
@@ -110,15 +116,15 @@ func (q *queueImpl) get() (task *Task, shutdown bool) {
if q.closing && len(q.tasks) == 0 {
// We must be shutting down.
- return nil, true
+ return queueTask{}, true
}
task = q.tasks[0]
+ task.startTime = time.Now()
// Slicing will not free the underlying elements of the array, so explicitly clear them out here
- q.tasks[0] = nil
+ q.tasks[0] = queueTask{}
q.tasks = q.tasks[1:]
- if q.metrics != nil {
- q.metrics.get(task)
- }
+
+ q.metrics.depth.RecordInt(int64(len(q.tasks)))
return task, false
}
@@ -131,16 +137,14 @@ func (q *queueImpl) processNextItem() bool {
}
// Run the task.
- if err := (*task)(); err != nil {
+ if err := task.task(); err != nil {
delay := q.delay
log.Infof("Work item handle failed (%v), retry after delay %v", err, delay)
time.AfterFunc(delay, func() {
- q.Push(*task)
+ q.Push(task.task)
})
}
- if q.metrics != nil {
- q.metrics.done(task)
- }
+ q.metrics.workDuration.Record(time.Since(task.startTime).Seconds())
return true
}
With metrics we are basically keeping all the same info twice. This may be higher cost and easy to make mistakes where they fall out of sync. Instead we can just pass around the timings? WDYT?
Thanks for suggesting, We are doing almost similar at Expedia, but we have modified the interface to supply the |
I tried the proposed change from @howardjohn as-is
proposed (ignore Disabled, this this change this is just always on and that code needs removal):
it is better by 12 allocs ~200ns. Is this what maintainers want, enabled permanently or was the code example just an example and the if blocks were removed just to highlight the idea? Keeping just this
would allow unchanged behavior from today (though i don't know how much 1000ns matters here).
|
My diff wasn't meant to be complete just a step in the direction, I think it can be optimized to be zero cost if metrics are disabled still (as you mentioned) |
pkg/queue/instance.go
Outdated
// Slicing will not free the underlying elements of the array, so explicitly clear them out here | ||
q.tasks[0] = nil | ||
q.tasks[0] = queueTask{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still holding memory allocated to empty queueTask.
The smooth change would be making use of queueTask pointer type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the same, when was writing. I will convert it to a pointer queue. Thanks for the suggestion!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this line is still important since it frees the func
in the task which may be a closure over some large variables. But a pointer would be 8bytes and queueTask is 56
pkg/queue/metrics.go
Outdated
} | ||
|
||
func init() { | ||
monitoring.MustRegister(depth, latency, workDuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use monitoring.RegisterIf
, this is how we handle other conditional-enabled metrics. It makes the metrics recording ~free.
Then we do not need to handle if metrics != nil
everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will check
pkg/queue/instance.go
Outdated
// Slicing will not free the underlying elements of the array, so explicitly clear them out here | ||
q.tasks[0] = nil | ||
q.tasks[0] = queueTask{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this line is still important since it frees the func
in the task which may be a closure over some large variables. But a pointer would be 8bytes and queueTask is 56
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, defer to @howardjohn to take a look at the metric issue
Apologies, I fixed it on my local and thought I pushed. I will resolve conflicts and make changes as pointed by you to make it pointer queue and |
The benchmarks are now comparable for both the scenarios after the improvement in monitoring package (#45341)
Let me know if the flag is still needed. |
/retest |
Please provide a description of this PR:
The PR adds metrics to the K8s worker queue. The metrics really helped to discover a lock contention issue mentioned in #44985.
The metrics are kept behind the flag, so that they can be enabled explicitly as adding them does result in impacted benchmarks.