forked from knative/serving
-
Notifications
You must be signed in to change notification settings - Fork 0
/
request_metric.go
228 lines (200 loc) · 6.88 KB
/
request_metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queue
import (
"context"
"net/http"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
network "knative.dev/networking/pkg"
pkgmetrics "knative.dev/pkg/metrics"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/metrics"
)
var (
// NOTE: 0 should not be used as boundary. See
// https://github.com/census-ecosystem/opencensus-go-exporter-stackdriver/issues/98
defaultLatencyDistribution = view.Distribution(
5, 10, 20, 40, 60, 80, 100, 150, 200, 250, 300, 350, 400, 450, 500, 600,
700, 800, 900, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
// Metric counters.
requestCountM = stats.Int64(
"request_count",
"The number of requests that are routed to queue-proxy",
stats.UnitDimensionless)
responseTimeInMsecM = stats.Float64(
"request_latencies",
"The response time in millisecond",
stats.UnitMilliseconds)
appRequestCountM = stats.Int64(
"app_request_count",
"The number of requests that are routed to user-container",
stats.UnitDimensionless)
appResponseTimeInMsecM = stats.Float64(
"app_request_latencies",
"The response time in millisecond",
stats.UnitMilliseconds)
queueDepthM = stats.Int64(
"queue_depth",
"The current number of items in the serving and waiting queue, or not reported if unlimited concurrency.",
stats.UnitDimensionless)
)
type requestMetricsHandler struct {
next http.Handler
statsCtx context.Context
}
type appRequestMetricsHandler struct {
next http.Handler
statsCtx context.Context
breaker *Breaker
}
// NewRequestMetricsHandler creates an http.Handler that emits request metrics.
func NewRequestMetricsHandler(next http.Handler,
ns, service, config, rev, pod string) (http.Handler, error) {
keys := []tag.Key{metrics.PodKey, metrics.ContainerKey, metrics.ResponseCodeKey, metrics.ResponseCodeClassKey, metrics.RouteTagKey}
if err := pkgmetrics.RegisterResourceView(
&view.View{
Description: "The number of requests that are routed to queue-proxy",
Measure: requestCountM,
Aggregation: view.Count(),
TagKeys: keys,
},
&view.View{
Description: "The response time in millisecond",
Measure: responseTimeInMsecM,
Aggregation: defaultLatencyDistribution,
TagKeys: keys,
},
); err != nil {
return nil, err
}
ctx, err := metrics.PodRevisionContext(pod, "queue-proxy", ns, service, config, rev)
if err != nil {
return nil, err
}
return &requestMetricsHandler{
next: next,
statsCtx: ctx,
}, nil
}
func (h *requestMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rr := pkghttp.NewResponseRecorder(w, http.StatusOK)
startTime := time.Now()
defer func() {
// Filter probe requests for revision metrics.
if network.IsProbe(r) {
return
}
// If ServeHTTP panics, recover, record the failure and panic again.
err := recover()
latency := time.Since(startTime)
routeTag := GetRouteTagNameFromRequest(r)
if err != nil {
ctx := metrics.AugmentWithResponseAndRouteTag(h.statsCtx,
http.StatusInternalServerError, routeTag)
pkgmetrics.RecordBatch(ctx, requestCountM.M(1),
responseTimeInMsecM.M(float64(latency.Milliseconds())))
panic(err)
}
ctx := metrics.AugmentWithResponseAndRouteTag(h.statsCtx,
rr.ResponseCode, routeTag)
pkgmetrics.RecordBatch(ctx, requestCountM.M(1),
responseTimeInMsecM.M(float64(latency.Milliseconds())))
}()
h.next.ServeHTTP(rr, r)
}
// NewAppRequestMetricsHandler creates an http.Handler that emits request metrics.
func NewAppRequestMetricsHandler(next http.Handler, b *Breaker,
ns, service, config, rev, pod string) (http.Handler, error) {
keys := []tag.Key{metrics.PodKey, metrics.ContainerKey, metrics.ResponseCodeKey, metrics.ResponseCodeClassKey}
if err := pkgmetrics.RegisterResourceView(&view.View{
Description: "The number of requests that are routed to user-container",
Measure: appRequestCountM,
Aggregation: view.Count(),
TagKeys: keys,
}, &view.View{
Description: "The response time in millisecond",
Measure: appResponseTimeInMsecM,
Aggregation: defaultLatencyDistribution,
TagKeys: keys,
}, &view.View{
Description: "The number of items queued at this queue proxy.",
Measure: queueDepthM,
Aggregation: view.LastValue(),
TagKeys: keys,
}); err != nil {
return nil, err
}
ctx, err := metrics.PodRevisionContext(pod, "queue-proxy", ns, service, config, rev)
if err != nil {
return nil, err
}
return &appRequestMetricsHandler{
next: next,
statsCtx: ctx,
breaker: b,
}, nil
}
func (h *appRequestMetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rr := pkghttp.NewResponseRecorder(w, http.StatusOK)
startTime := time.Now()
if h.breaker != nil {
pkgmetrics.Record(h.statsCtx, queueDepthM.M(int64(h.breaker.InFlight())))
}
defer func() {
// Filter probe requests for revision metrics.
if network.IsProbe(r) {
return
}
// If ServeHTTP panics, recover, record the failure and panic again.
err := recover()
latency := time.Since(startTime)
if err != nil {
ctx := metrics.AugmentWithResponse(h.statsCtx, http.StatusInternalServerError)
pkgmetrics.RecordBatch(ctx, appRequestCountM.M(1),
appResponseTimeInMsecM.M(float64(latency.Milliseconds())))
panic(err)
}
ctx := metrics.AugmentWithResponse(h.statsCtx, rr.ResponseCode)
pkgmetrics.RecordBatch(ctx, appRequestCountM.M(1),
appResponseTimeInMsecM.M(float64(latency.Milliseconds())))
}()
h.next.ServeHTTP(rr, r)
}
const (
defaultTagName = "DEFAULT"
undefinedTagName = "UNDEFINED"
disabledTagName = "DISABLED"
)
// GetRouteTagNameFromRequest extracts the value of the tag header from http.Request
func GetRouteTagNameFromRequest(r *http.Request) string {
name := r.Header.Get(network.TagHeaderName)
isDefaultRoute := r.Header.Get(network.DefaultRouteHeaderName)
if name == "" {
if isDefaultRoute == "" {
// If there are no tag header and no `Knative-Serving-Default-Route` header,
// it means that the tag header based routing is disabled, so the tag value is set to `disabled`.
return disabledTagName
}
// If there is no tag header, just returns "default".
return defaultTagName
} else if isDefaultRoute == "true" {
// If there is a tag header with not-empty string and the request is routed via the default route,
// returns "undefined".
return undefinedTagName
}
// Otherwise, returns the value of the tag header.
return name
}