-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
concurrency_reporter.go
85 lines (72 loc) · 2.43 KB
/
concurrency_reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handler
import (
"time"
"github.com/knative/serving/pkg/autoscaler"
)
// Channels is a structure for holding the channels for driving Stats.
// It's just to make the NewStats signature easier to read.
type Channels struct {
// Ticks with every request arrived/completed respectively
ReqChan chan ReqEvent
// Ticks with every stat report request
ReportChan <-chan time.Time
// Stat reporting channel
StatChan chan *autoscaler.StatMessage
}
// NewConcurrencyReporter instantiates a new goroutine that consumes and
// produces from the given channels.
// On each tick on the ReportChan, StatMessages will be sent to the
// StatChan.
func NewConcurrencyReporter(podName string, channels Channels) {
go func() {
outstandingRequestsPerKey := make(map[string]int32)
// Contains the number of incoming requests in the current
// reporting period, per key.
incomingRequestsPerKey := make(map[string]int32)
for {
select {
case event := <-channels.ReqChan:
switch event.EventType {
case ReqIn:
incomingRequestsPerKey[event.Key]++
outstandingRequestsPerKey[event.Key]++
case ReqOut:
outstandingRequestsPerKey[event.Key]--
}
case now := <-channels.ReportChan:
for key, concurrency := range outstandingRequestsPerKey {
if concurrency == 0 {
delete(outstandingRequestsPerKey, key)
} else {
requestCount := incomingRequestsPerKey[key]
stat := autoscaler.Stat{
Time: &now,
PodName: podName,
AverageConcurrentRequests: float64(concurrency),
RequestCount: requestCount,
}
// Send the stat to another goroutine to transmit
// so we can continue bucketing stats.
channels.StatChan <- &autoscaler.StatMessage{
Key: key,
Stat: stat,
}
}
}
incomingRequestsPerKey = make(map[string]int32)
}
}
}()
}