generated from knative-extensions/sample-controller
-
Notifications
You must be signed in to change notification settings - Fork 77
/
request.go
157 lines (133 loc) · 4.77 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/types"
)
// ReqEvent represents either an incoming or closed request.
type ReqEvent struct {
// Time is the time the request event happened.
Time time.Time
// Type is the type of the request event.
Type ReqEventType
// Key is the revision the event is associated with.
// +optional
Key types.NamespacedName
}
// ReqEventType denotes the type (incoming/closed) of a ReqEvent.
type ReqEventType int
const (
// ReqIn represents an incoming request
ReqIn ReqEventType = iota
// ReqOut represents a finished request
ReqOut
// ProxiedIn represents an incoming request through a proxy.
ProxiedIn
// ProxiedOut represents a finished proxied request.
ProxiedOut
)
// NewRequestStats builds a RequestStats instance, started at the given time.
func NewRequestStats(startedAt time.Time) *RequestStats {
return &RequestStats{lastChange: startedAt}
}
// RequestStats collects statistics about requests as they flow in and out of the system.
type RequestStats struct {
mux sync.Mutex
// State variables that track the current state. Not reset after reporting.
concurrency, proxiedConcurrency float64
lastChange time.Time
// Reporting variables that track state over the current window. Reset after
// reporting.
requestCount, proxiedCount float64
computedConcurrency, computedProxiedConcurrency float64
secondsInUse float64
}
// RequestStatsReport are the metrics reported from the the request stats collector
// at a given time.
type RequestStatsReport struct {
// AverageConcurrency is the average concurrency over the reporting timeframe.
// This is calculated via the utilization at a given concurrency. For example:
// 2 requests each taking 500ms over a 1s reporting window generate an average
// concurrency of 1.
AverageConcurrency float64
// AverageProxiedConcurrency is the average concurrency of all proxied requests.
// The same calculation as above applies.
AverageProxiedConcurrency float64
// RequestCount is the number of requests that arrived in the current reporting
// timeframe.
RequestCount float64
// ProxiedRequestCount is the number of proxied requests that arrived in the current
// reporting timeframe.
ProxiedRequestCount float64
}
// compute updates the internal state since the last computed change.
//
// Note: Due to the async nature in which compute can be called, for
// example via HandleEvent and Report, the individual timestamps are not
// guaranteed to be monotonic. We ignore negative changes as they are likely
// benign and are rounding errors at most if the proposed pattern is used.
func (s *RequestStats) compute(now time.Time) {
if durationSinceChange := now.Sub(s.lastChange); durationSinceChange > 0 {
durationSecs := durationSinceChange.Seconds()
s.secondsInUse += durationSecs
s.computedConcurrency += s.concurrency * durationSecs
s.computedProxiedConcurrency += s.proxiedConcurrency * durationSecs
s.lastChange = now
}
}
// HandleEvent handles an incoming or outgoing request event and updates
// the state accordingly.
func (s *RequestStats) HandleEvent(event ReqEvent) {
s.mux.Lock()
defer s.mux.Unlock()
s.compute(event.Time)
switch event.Type {
case ProxiedIn:
s.proxiedConcurrency++
s.proxiedCount++
fallthrough
case ReqIn:
s.requestCount++
s.concurrency++
case ProxiedOut:
s.proxiedConcurrency--
fallthrough
case ReqOut:
s.concurrency--
}
}
// Report returns a RequestStatsReport relative to the given time. The state
// will be reset for another reporting cycle afterwards.
func (s *RequestStats) Report(now time.Time) RequestStatsReport {
s.mux.Lock()
defer s.mux.Unlock()
s.compute(now)
defer s.reset()
report := RequestStatsReport{
RequestCount: s.requestCount,
ProxiedRequestCount: s.proxiedCount,
}
if s.secondsInUse > 0 {
report.AverageConcurrency = s.computedConcurrency / s.secondsInUse
report.AverageProxiedConcurrency = s.computedProxiedConcurrency / s.secondsInUse
}
return report
}
// reset resets the state so a new reporting cycle can start.
func (s *RequestStats) reset() {
s.computedConcurrency, s.computedProxiedConcurrency = 0, 0
s.requestCount, s.proxiedCount = 0, 0
s.secondsInUse = 0
}