forked from knative/serving
-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrency_reporter.go
110 lines (93 loc) · 3.41 KB
/
concurrency_reporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package handler
import (
"time"
"github.com/knative/pkg/system"
"github.com/knative/serving/pkg/autoscaler"
)
// ConcurrencyReporter reports stats based on incoming requests and ticks.
type ConcurrencyReporter struct {
podName string
// Ticks with every request arrived/completed respectively
reqChan chan ReqEvent
// Ticks with every stat report request
reportChan <-chan time.Time
// Stat reporting channel
statChan chan *autoscaler.StatMessage
clock system.Clock
}
// NewConcurrencyReporter creates a ConcurrencyReporter which listens to incoming
// ReqEvents on reqChan and ticks on reportChan and reports stats on statChan.
func NewConcurrencyReporter(podName string, reqChan chan ReqEvent, reportChan <-chan time.Time, statChan chan *autoscaler.StatMessage) *ConcurrencyReporter {
return NewConcurrencyReporterWithClock(podName, reqChan, reportChan, statChan, system.RealClock{})
}
// NewConcurrencyReporterWithClock instantiates a new concurrency reporter
// which uses the passed clock.
func NewConcurrencyReporterWithClock(podName string, reqChan chan ReqEvent, reportChan <-chan time.Time, statChan chan *autoscaler.StatMessage, clock system.Clock) *ConcurrencyReporter {
return &ConcurrencyReporter{
podName: podName,
reqChan: reqChan,
reportChan: reportChan,
statChan: statChan,
clock: clock,
}
}
func (cr *ConcurrencyReporter) report(key string, concurrency, requestCount int32) {
stat := autoscaler.Stat{
PodName: cr.podName,
AverageConcurrentRequests: float64(concurrency),
RequestCount: float64(requestCount),
}
// Send the stat to another goroutine to transmit
// so we can continue bucketing stats.
cr.statChan <- &autoscaler.StatMessage{
Key: key,
Stat: stat,
}
}
// Run runs until stopCh is closed and processes events on all incoming channels
func (cr *ConcurrencyReporter) Run(stopCh <-chan struct{}) {
// Contains the number of in-flight requests per-key
outstandingRequestsPerKey := make(map[string]int32)
// Contains the number of incoming requests in the current
// reporting period, per key.
incomingRequestsPerKey := make(map[string]int32)
for {
select {
case event := <-cr.reqChan:
switch event.EventType {
case ReqIn:
incomingRequestsPerKey[event.Key]++
// Report the first request for a key immediately.
if _, ok := outstandingRequestsPerKey[event.Key]; !ok {
cr.report(event.Key, 1, incomingRequestsPerKey[event.Key])
}
outstandingRequestsPerKey[event.Key]++
case ReqOut:
outstandingRequestsPerKey[event.Key]--
}
case <-cr.reportChan:
for key, concurrency := range outstandingRequestsPerKey {
if concurrency == 0 {
delete(outstandingRequestsPerKey, key)
} else {
cr.report(key, concurrency, incomingRequestsPerKey[key])
}
}
incomingRequestsPerKey = make(map[string]int32)
case <-stopCh:
return
}
}
}