Skip to content

Commit

Permalink
Start autoscaler in panic mode. (#4795)
Browse files Browse the repository at this point in the history
* Start autoscaler in panic mode.

When autoscaler restarts it has no memory of the previous stats and immediately scales the deployment down. Under my load test conditions,
Autoscaler goes from 7 to 1 then to 2, only then to 6 pods, before finally reaching 7 after a few cycles.
This is obviously not a desired property, so this change fixes that behavior, by
starting Autoscaler in panic mode with panic pod count equal to the current pod count
if we have more than 1 serving pods right now.

Obviously if the deployment is scaled to 0, there's no reason to change logic
and if it is 1, then we won't scale below 1 anyway for the next stable window, so no need to panic either.

/assign @mattmoor @markusthoemmes

This is the GA scope of #2930

* unit tests

* updates to the imports

* merge
  • Loading branch information
vagababov authored and knative-prow-robot committed Jul 19, 2019
1 parent fa74ee7 commit b52b020
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 27 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 19 additions & 2 deletions cmd/autoscaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
Expand All @@ -32,6 +33,11 @@ const (
testRevision = "test-Revision"
)

var (
kubeClient = fakeK8s.NewSimpleClientset()
kubeInformer = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
)

func TestUniscalerFactoryFailures(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -100,7 +106,20 @@ func TestUniscalerFactoryFailures(t *testing.T) {
}
}

func endpoints(ns, n string) {
ep := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: n,
},
Subsets: []corev1.EndpointSubset{{}},
}
kubeClient.CoreV1().Endpoints(ns).Create(ep)
kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(ep)
}

func TestUniScalerFactoryFunc(t *testing.T) {
endpoints(testNamespace, "magic-services-offered")
uniScalerFactory := getTestUniScalerFactory()
for _, srv := range []string{"some", ""} {
decider := &autoscaler.Decider{
Expand All @@ -125,8 +144,6 @@ func TestUniScalerFactoryFunc(t *testing.T) {
}

func getTestUniScalerFactory() func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
kubeClient := fakeK8s.NewSimpleClientset()
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints(), &testMetricClient{})
}

Expand Down
24 changes: 22 additions & 2 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/zap"

"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
"knative.dev/serving/pkg/resources"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -66,8 +67,24 @@ func New(
return nil, errors.New("stats reporter must not be nil")
}

// A new instance of autoscaler is created without panic mode.
reporter.ReportPanic(0)
// We always start in the panic mode, if the deployment is scaled up over 1 pod.
// If the scale is 0 or 1, normal Autoscaler behavior is fine.
// When Autoscaler restarts we lose metric history, which causes us to
// momentarily scale down, and that is not a desired behaviour.
// Thus, we're keeping at least the current scale until we
// accumulate enough data to make conscious decisions.
curC, err := podCounter.ReadyCount()
if err != nil {
return nil, fmt.Errorf("initial pod count failed: %v", err)
}
var pt *time.Time
if curC > 1 {
pt = ptr.Time(time.Now())
// A new instance of autoscaler is created in panic mode.
reporter.ReportPanic(1)
} else {
reporter.ReportPanic(0)
}

return &Autoscaler{
namespace: namespace,
Expand All @@ -76,6 +93,9 @@ func New(
podCounter: podCounter,
deciderSpec: deciderSpec,
reporter: reporter,

panicTime: pt,
maxPanicPods: int32(curC),
}, nil
}

Expand Down
113 changes: 92 additions & 21 deletions pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,53 @@ func TestAutoscalerNoDataNoAutoscale(t *testing.T) {
err: errors.New("no metrics"),
}

a := newTestAutoscaler(10, 100, metrics)
a := newTestAutoscaler(t, 10, 100, metrics)
a.expectScale(t, time.Now(), 0, 0, false)
}

func expectedEBC(tc, tbc, rc, np float64) int32 {
return int32(math.Floor(tc/targetUtilization*np - tbc - rc))
}
func TestAutoscalerNoDataAtZeroNoAutoscale(t *testing.T) {
a := newTestAutoscaler(10, 100, &testMetricClient{})
a := newTestAutoscaler(t, 10, 100, &testMetricClient{})
// We always presume at least 1 pod, even if counter says 0.
a.expectScale(t, time.Now(), 0, expectedEBC(10, 100, 0, 1), true)
}

func TestAutoscalerNoDataAtZeroNoAutoscaleWithExplicitEPs(t *testing.T) {
a := newTestAutoscaler(10, 100, &testMetricClient{})
a := newTestAutoscaler(t, 10, 100, &testMetricClient{})
endpoints(1)
a.expectScale(t, time.Now(), 0, expectedEBC(10, 100, 0, 1), true)
}

func TestAutoscalerStableModeUnlimitedTBC(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 21.0}
a := newTestAutoscaler(181, -1, metrics)
a := newTestAutoscaler(t, 181, -1, metrics)
a.expectScale(t, time.Now(), 1, -1, true)
}

func TestAutoscaler0TBC(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 50.0}
a := newTestAutoscaler(10, 0, metrics)
a := newTestAutoscaler(t, 10, 0, metrics)
a.expectScale(t, time.Now(), 5, 0, true)
}

func TestAutoscalerStableModeNoChange(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 50.0}
a := newTestAutoscaler(10, 100, metrics)
a := newTestAutoscaler(t, 10, 100, metrics)
a.expectScale(t, time.Now(), 5, expectedEBC(10, 100, 50, 1), true)
}

func TestAutoscalerStableModeNoChangeAlreadyScaled(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 50.0}
a := newTestAutoscaler(10, 100, metrics)
a := newTestAutoscaler(t, 10, 100, metrics)
endpoints(5)
a.expectScale(t, time.Now(), 5, expectedEBC(10, 100, 50, 5), true)
}

func TestAutoscalerStableModeIncrease(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 50.0}
a := newTestAutoscaler(10, 101, metrics)
a := newTestAutoscaler(t, 10, 101, metrics)
a.expectScale(t, time.Now(), 5, expectedEBC(10, 101, 50, 1), true)

metrics.stableConcurrency = 100
Expand All @@ -121,7 +121,7 @@ func TestAutoscalerStableModeIncrease(t *testing.T) {

func TestAutoscalerStableModeDecrease(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 100.0}
a := newTestAutoscaler(10, 98, metrics)
a := newTestAutoscaler(t, 10, 98, metrics)
endpoints(8)
a.expectScale(t, time.Now(), 10, expectedEBC(10, 98, 100, 8), true)

Expand All @@ -131,7 +131,7 @@ func TestAutoscalerStableModeDecrease(t *testing.T) {

func TestAutoscalerStableModeNoTrafficScaleToZero(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 1}
a := newTestAutoscaler(10, 75, metrics)
a := newTestAutoscaler(t, 10, 75, metrics)
a.expectScale(t, time.Now(), 1, expectedEBC(10, 75, 1, 1), true)

metrics.stableConcurrency = 0.0
Expand All @@ -140,7 +140,7 @@ func TestAutoscalerStableModeNoTrafficScaleToZero(t *testing.T) {

func TestAutoscalerPanicModeDoublePodCount(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 50, panicConcurrency: 100}
a := newTestAutoscaler(10, 84, metrics)
a := newTestAutoscaler(t, 10, 84, metrics)

// PanicConcurrency takes precedence.
a.expectScale(t, time.Now(), 10, expectedEBC(10, 84, 50, 1), true)
Expand All @@ -151,7 +151,7 @@ func TestAutoscalerPanicModeDoublePodCount(t *testing.T) {
// At 1296 QPS traffic stablizes.
func TestAutoscalerPanicModeExponentialTrackAndStablize(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 6, panicConcurrency: 6}
a := newTestAutoscaler(1, 101, metrics)
a := newTestAutoscaler(t, 1, 101, metrics)
a.expectScale(t, time.Now(), 6, expectedEBC(1, 101, 6, 1), true)

endpoints(6)
Expand All @@ -171,7 +171,7 @@ func TestAutoscalerPanicModeExponentialTrackAndStablize(t *testing.T) {

func TestAutoscalerPanicThenUnPanicScaleDown(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 100, panicConcurrency: 100}
a := newTestAutoscaler(10, 93, metrics)
a := newTestAutoscaler(t, 10, 93, metrics)
a.expectScale(t, time.Now(), 10, expectedEBC(10, 93, 100, 1), true)
endpoints(10)

Expand All @@ -190,7 +190,7 @@ func TestAutoscalerPanicThenUnPanicScaleDown(t *testing.T) {

func TestAutoscalerRateLimitScaleUp(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 1000}
a := newTestAutoscaler(10, 61, metrics)
a := newTestAutoscaler(t, 10, 61, metrics)

// Need 100 pods but only scale x10
a.expectScale(t, time.Now(), 10, expectedEBC(10, 61, 1000, 1), true)
Expand All @@ -200,26 +200,30 @@ func TestAutoscalerRateLimitScaleUp(t *testing.T) {
a.expectScale(t, time.Now(), 100, expectedEBC(10, 61, 1000, 10), true)
}

func eraseEndpoints() {
ep, _ := kubeClient.CoreV1().Endpoints(testNamespace).Get(testService, metav1.GetOptions{})
kubeClient.CoreV1().Endpoints(testNamespace).Delete(testService, nil)
kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Delete(ep)
}

func TestAutoscalerUseOnePodAsMinimumIfEndpointsNotFound(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 1000}
a := newTestAutoscaler(10, 81, metrics)
a := newTestAutoscaler(t, 10, 81, metrics)

endpoints(0)
// 2*10 as the rate limited if we can get the actual pods number.
// 1*10 as the rate limited since no read pods are there from K8S API.
a.expectScale(t, time.Now(), 10, expectedEBC(10, 81, 1000, 0), true)

ep, _ := kubeClient.CoreV1().Endpoints(testNamespace).Get(testService, metav1.GetOptions{})
kubeClient.CoreV1().Endpoints(testNamespace).Delete(testService, nil)
kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Delete(ep)
eraseEndpoints()
// 2*10 as the rate limited if we can get the actual pods number.
// 1*10 as the rate limited since no Endpoints object is there from K8S API.
a.expectScale(t, time.Now(), 10, expectedEBC(10, 81, 1000, 0), true)
}

func TestAutoscalerUpdateTarget(t *testing.T) {
metrics := &testMetricClient{stableConcurrency: 100}
a := newTestAutoscaler(10, 77, metrics)
a := newTestAutoscaler(t, 10, 77, metrics)
a.expectScale(t, time.Now(), 10, expectedEBC(10, 77, 100, 1), true)

endpoints(10)
Expand Down Expand Up @@ -277,7 +281,8 @@ func (r *mockReporter) ReportExcessBurstCapacity(v float64) error {
return nil
}

func newTestAutoscaler(targetConcurrency, targetBurstCapacity float64, metrics MetricClient) *Autoscaler {
func newTestAutoscaler(t *testing.T, targetConcurrency, targetBurstCapacity float64, metrics MetricClient) *Autoscaler {
t.Helper()
deciderSpec := DeciderSpec{
TargetConcurrency: targetConcurrency,
TotalConcurrency: targetConcurrency / targetUtilization, // For UTs presume 75% utilization
Expand All @@ -289,7 +294,12 @@ func newTestAutoscaler(targetConcurrency, targetBurstCapacity float64, metrics M
}

podCounter := resources.NewScopedEndpointsCounter(kubeInformer.Core().V1().Endpoints().Lister(), testNamespace, deciderSpec.ServiceName)
a, _ := New(testNamespace, testRevision, metrics, podCounter, deciderSpec, &mockReporter{})
// This ensures that we have endpoints object to start the autoscaler.
endpoints(0)
a, err := New(testNamespace, testRevision, metrics, podCounter, deciderSpec, &mockReporter{})
if err != nil {
t.Fatalf("Error creating test autoscaler: %v", err)
}
endpoints(1)
return a
}
Expand Down Expand Up @@ -337,3 +347,64 @@ func endpoints(count int) {
kubeClient.CoreV1().Endpoints(testNamespace).Create(ep)
kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(ep)
}

func TestStartInPanicMode(t *testing.T) {
metrics := &testMetricClient{}
deciderSpec := DeciderSpec{
TargetConcurrency: 100,
TotalConcurrency: 120,
TargetBurstCapacity: 11,
PanicThreshold: 220,
MaxScaleUpRate: 10.0,
StableWindow: stableWindow,
ServiceName: testService,
}

podCounter := resources.NewScopedEndpointsCounter(kubeInformer.Core().V1().Endpoints().Lister(), testNamespace, deciderSpec.ServiceName)
for i := 0; i < 2; i++ {
endpoints(i)
a, err := New(testNamespace, testRevision, metrics, podCounter, deciderSpec, &mockReporter{})
if err != nil {
t.Fatalf("Error creating test autoscaler: %v", err)
}
if a.panicTime != nil {
t.Errorf("Create at scale %d had panic mode on", i)
}
if got, want := int(a.maxPanicPods), i; got != want {
t.Errorf("MaxPanicPods = %d, want: %d", got, want)
}
}

// Now start with 2 and make sure we're in panic mode.
endpoints(2)
a, err := New(testNamespace, testRevision, metrics, podCounter, deciderSpec, &mockReporter{})
if err != nil {
t.Fatalf("Error creating test autoscaler: %v", err)
}
if a.panicTime == nil {
t.Error("Create at scale 2 had panic mode off")
}
if got, want := int(a.maxPanicPods), 2; got != want {
t.Errorf("MaxPanicPods = %d, want: %d", got, want)
}
}

func TestNewFail(t *testing.T) {
eraseEndpoints()
metrics := &testMetricClient{}
deciderSpec := DeciderSpec{
TargetConcurrency: 100,
TotalConcurrency: 120,
TargetBurstCapacity: 11,
PanicThreshold: 220,
MaxScaleUpRate: 10.0,
StableWindow: stableWindow,
ServiceName: testService,
}

podCounter := resources.NewScopedEndpointsCounter(kubeInformer.Core().V1().Endpoints().Lister(), testNamespace, deciderSpec.ServiceName)
_, err := New(testNamespace, testRevision, metrics, podCounter, deciderSpec, &mockReporter{})
if err == nil {
t.Error("No endpoints, but still succeeded creating the Autoscaler")
}
}
25 changes: 25 additions & 0 deletions vendor/knative.dev/pkg/kmeta/ownerrefable_accessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kmeta

// OwnerRefableAccessor is a combination of OwnerRefable interface and Accessor interface
// which inidcates that it has 1) sufficient information to produce a metav1.OwnerReference to an object,
// 2) and a collection of interfaces from metav1.TypeMeta runtime.Object and metav1.Object that Kubernetes API types
// registered with runtime.Scheme must support.
type OwnerRefableAccessor interface {
OwnerRefable
Accessor
}

0 comments on commit b52b020

Please sign in to comment.