forked from knative/serving
/
autoscaler.go
309 lines (258 loc) · 9.43 KB
/
autoscaler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package autoscaler
import (
"context"
"errors"
"math"
"sync"
"time"
"github.com/knative/pkg/logging"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
)
const (
// bucketSize is the size of the buckets of stats we create.
bucketSize time.Duration = 2 * time.Second
)
// Stat defines a single measurement at a point in time
type Stat struct {
// The time the data point was collected on the pod.
Time *time.Time
// The unique identity of this pod. Used to count how many pods
// are contributing to the metrics.
PodName string
// Average number of requests currently being handled by this pod.
AverageConcurrentRequests float64
// Part of AverageConcurrentRequests, for requests going through a proxy.
AverageProxiedConcurrentRequests float64
// Number of requests received since last Stat (approximately QPS).
RequestCount int32
// Part of RequestCount, for requests going through a proxy.
ProxiedRequestCount int32
}
// StatMessage wraps a Stat with identifying information so it can be routed
// to the correct receiver.
type StatMessage struct {
Key string
Stat Stat
}
// statsBucket keeps all the stats that fall into a defined bucket.
type statsBucket map[string][]*Stat
// add adds a Stat to the bucket. Stats from the same pod will be
// collapsed.
func (b statsBucket) add(stat *Stat) {
b[stat.PodName] = append(b[stat.PodName], stat)
}
// concurrency calculates the overall concurrency as measured by this
// bucket. All stats that belong to the same pod will be averaged.
// The overall concurrency is the sum of the measured concurrency of all
// pods (including activator metrics).
func (b statsBucket) concurrency() float64 {
var total float64
for _, podStats := range b {
var subtotal float64
for _, stat := range podStats {
// Proxied requests have been counted at the activator. Subtract
// AverageProxiedConcurrentRequests to avoid double counting.
subtotal += stat.AverageConcurrentRequests - stat.AverageProxiedConcurrentRequests
}
total += subtotal / float64(len(podStats))
}
return total
}
// Autoscaler stores current state of an instance of an autoscaler
type Autoscaler struct {
*DynamicConfig
namespace string
revisionService string
endpointsLister corev1listers.EndpointsLister
reporter StatsReporter
// State in panic mode. Carries over multiple Scale calls.
panicTime *time.Time
maxPanicPods int32
// targetMutex guards the elements in the block below.
targetMutex sync.RWMutex
target float64
// statsMutex guards the elements in the block below.
statsMutex sync.Mutex
bucketed map[time.Time]statsBucket
}
// New creates a new instance of autoscaler
func New(
dynamicConfig *DynamicConfig,
namespace string,
revisionService string,
endpointsInformer corev1informers.EndpointsInformer,
target float64,
reporter StatsReporter) (*Autoscaler, error) {
if endpointsInformer == nil {
return nil, errors.New("'endpointsEnformer' must not be nil")
}
return &Autoscaler{
DynamicConfig: dynamicConfig,
namespace: namespace,
revisionService: revisionService,
endpointsLister: endpointsInformer.Lister(),
target: target,
bucketed: make(map[time.Time]statsBucket),
reporter: reporter,
}, nil
}
// Update reconfigures the UniScaler according to the DeciderSpec.
func (a *Autoscaler) Update(spec DeciderSpec) error {
a.targetMutex.Lock()
defer a.targetMutex.Unlock()
a.target = spec.TargetConcurrency
return nil
}
// Record a data point.
func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
if stat.Time == nil {
logger := logging.FromContext(ctx)
logger.Errorf("Missing time from stat: %+v", stat)
return
}
a.statsMutex.Lock()
defer a.statsMutex.Unlock()
bucketKey := stat.Time.Truncate(bucketSize)
bucket, ok := a.bucketed[bucketKey]
if !ok {
bucket = statsBucket{}
a.bucketed[bucketKey] = bucket
}
bucket.add(&stat)
}
// Scale calculates the desired scale based on current statistics given the current time.
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger := logging.FromContext(ctx)
originalReadyPodsCount, err := readyPodsCountOfEndpoints(a.endpointsLister, a.namespace, a.revisionService)
if err != nil {
logger.Errorw("Failed to get Endpoints via K8S Lister", zap.Error(err))
return 0, false
}
// Use 1 if there are zero current pods.
readyPodsCount := math.Max(1, float64(originalReadyPodsCount))
config := a.Current()
observedStableConcurrency, observedPanicConcurrency, lastBucket := a.aggregateData(now, config.StableWindow, config.PanicWindow)
if len(a.bucketed) == 0 {
logger.Debug("No data to scale on.")
return 0, false
}
// Log system totals.
logger.Debugf("Current concurrent clients: %0.3f", lastBucket.concurrency())
target := a.targetConcurrency()
// Desired pod count is observed concurrency of the revision over desired (stable) concurrency per pod.
// The scaling up rate is limited to the MaxScaleUpRate.
desiredStablePodCount := a.podCountLimited(math.Ceil(observedStableConcurrency/target), readyPodsCount)
desiredPanicPodCount := a.podCountLimited(math.Ceil(observedPanicConcurrency/target), readyPodsCount)
a.reporter.ReportStableRequestConcurrency(observedStableConcurrency)
a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrency)
a.reporter.ReportTargetRequestConcurrency(target)
logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds.", observedStableConcurrency, config.StableWindow)
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds.", observedPanicConcurrency, config.PanicWindow)
isOverPanicThreshold := observedPanicConcurrency/readyPodsCount >= target*2
if a.panicTime == nil && isOverPanicThreshold {
// Begin panicking when we cross the concurrency threshold in the panic window.
logger.Info("PANICKING")
a.panicTime = &now
} else if a.panicTime != nil && !isOverPanicThreshold && a.panicTime.Add(config.StableWindow).Before(now) {
// Stop panicking after the surge has made its way into the stable metric.
logger.Info("Un-panicking.")
a.panicTime = nil
a.maxPanicPods = 0
}
var desiredPodCount int32
if a.panicTime != nil {
logger.Debug("Operating in panic mode.")
a.reporter.ReportPanic(1)
// We do not scale down while in panic mode. Only increases will be applied.
if desiredPanicPodCount > a.maxPanicPods {
logger.Infof("Increasing pods from %v to %v.", originalReadyPodsCount, desiredPanicPodCount)
a.panicTime = &now
a.maxPanicPods = desiredPanicPodCount
}
desiredPodCount = a.maxPanicPods
} else {
logger.Debug("Operating in stable mode.")
a.reporter.ReportPanic(0)
desiredPodCount = desiredStablePodCount
}
a.reporter.ReportDesiredPodCount(int64(desiredPodCount))
return desiredPodCount, true
}
// aggregateData aggregates bucketed stats over the stableWindow and panicWindow
// respectively and returns the observedStableConcurrency, observedPanicConcurrency
// and the last bucket that was aggregated.
func (a *Autoscaler) aggregateData(now time.Time, stableWindow, panicWindow time.Duration) (
stableConcurrency float64, panicConcurrency float64, lastBucket statsBucket) {
a.statsMutex.Lock()
defer a.statsMutex.Unlock()
var (
stableBuckets float64
stableTotal float64
panicBuckets float64
panicTotal float64
lastBucketTime time.Time
)
for bucketTime, bucket := range a.bucketed {
if !bucketTime.Add(panicWindow).Before(now) {
panicBuckets++
panicTotal += bucket.concurrency()
}
if !bucketTime.Add(stableWindow).Before(now) {
stableBuckets++
stableTotal += bucket.concurrency()
} else {
delete(a.bucketed, bucketTime)
}
if bucketTime.After(lastBucketTime) {
lastBucketTime = bucketTime
lastBucket = bucket
}
}
if stableBuckets > 0 {
stableConcurrency = stableTotal / stableBuckets
}
if panicBuckets > 0 {
panicConcurrency = panicTotal / panicBuckets
}
return stableConcurrency, panicConcurrency, lastBucket
}
func (a *Autoscaler) targetConcurrency() float64 {
a.targetMutex.RLock()
defer a.targetMutex.RUnlock()
return a.target
}
func (a *Autoscaler) podCountLimited(desiredPodCount, currentPodCount float64) int32 {
return int32(math.Min(desiredPodCount, a.Current().MaxScaleUpRate*currentPodCount))
}
// readyPodsCountOfEndpoints returns the ready IP count in the K8S Endpoints object returned by
// the given K8S Informer with given namespace and name. This is same as ready Pod count.
func readyPodsCountOfEndpoints(lister corev1listers.EndpointsLister, ns, name string) (int, error) {
readyPods := 0
endpoints, err := lister.Endpoints(ns).Get(name)
if apierrors.IsNotFound(err) {
// Treat not found as zero endpoints, it either hasn't been created
// or it has been torn down.
} else if err != nil {
return 0, err
} else {
for _, es := range endpoints.Subsets {
readyPods += len(es.Addresses)
}
}
return readyPods, nil
}