Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request processing HPA into the queue after processing is finished. #72373

Merged
merged 1 commit into from Jan 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 14 additions & 10 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -207,14 +207,18 @@ func (a *HorizontalController) processNextWorkItem() bool {
}
defer a.queue.Done(key)

err := a.reconcileKey(key.(string))
if err == nil {
// don't "forget" here because we want to only process a given HPA once per resync interval
return true
deleted, err := a.reconcileKey(key.(string))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should still have a comment as to why we're not forgetting the key, since this works significantly different from normal controllers.

if err != nil {
utilruntime.HandleError(err)
}
// Add request processing HPA after resync interval just in case last resync didn't insert
// request into the queue. Request is not inserted into queue by resync if previous one wasn't processed yet.
// This happens quite often because requests from previous resync are removed from the queue at the same moment
// as next resync inserts new requests.
if !deleted {
Copy link
Member

@liggitt liggitt Dec 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading this correctly that it always requeues items that still exist, even in non-error cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, see comment below

a.queue.AddRateLimited(key)
}

a.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}

Expand Down Expand Up @@ -298,20 +302,20 @@ func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.Hori
return replicas, metric, statuses, timestamp, nil
}

func (a *HorizontalController) reconcileKey(key string) error {
func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
return true, err
}

hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
delete(a.recommendations, key)
return nil
return true, nil
}

return a.reconcileAutoscaler(hpa, key)
return false, a.reconcileAutoscaler(hpa, key)
}

// computeStatusForObjectMetric computes the desired number of replicas for the specified metric of type ObjectMetricSourceType.
Expand Down
55 changes: 41 additions & 14 deletions pkg/controller/podautoscaler/horizontal_test.go
Expand Up @@ -2176,7 +2176,7 @@ func TestComputedToleranceAlgImplementation(t *testing.T) {
finalPods := int32(math.Ceil(resourcesUsedRatio * float64(startPods)))

// To breach tolerance we will create a utilization ratio difference of tolerance to usageRatioToleranceValue)
tc := testCase{
tc1 := testCase{
minReplicas: 0,
maxReplicas: 1000,
initialReplicas: startPods,
Expand Down Expand Up @@ -2209,22 +2209,49 @@ func TestComputedToleranceAlgImplementation(t *testing.T) {
useMetricsAPI: true,
recommendations: []timestampedRecommendation{},
}
tc1.runTest(t)

tc.runTest(t)

// Reuse the data structure above, now testing "unscaling".
// Now, we test that no scaling happens if we are in a very close margin to the tolerance
target = math.Abs(1/(requestedToUsed*(1-defaultTestingTolerance))) + .004
finalCPUPercentTarget = int32(target * 100)
tc.CPUTarget = finalCPUPercentTarget
tc.initialReplicas = startPods
tc.expectedDesiredReplicas = startPods
tc.expectedConditions = statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
Reason: "ReadyForNewScale",
})
tc.runTest(t)
tc2 := testCase{
minReplicas: 0,
maxReplicas: 1000,
initialReplicas: startPods,
expectedDesiredReplicas: startPods,
CPUTarget: finalCPUPercentTarget,
reportedLevels: []uint64{
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
totalUsedCPUOfAllPods / 10,
},
reportedCPURequests: []resource.Quantity{
resource.MustParse(fmt.Sprint(perPodRequested+100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-100) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-10) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-2) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested+1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested-1) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
resource.MustParse(fmt.Sprint(perPodRequested) + "m"),
},
useMetricsAPI: true,
recommendations: []timestampedRecommendation{},
expectedConditions: statusOkWithOverrides(autoscalingv2.HorizontalPodAutoscalerCondition{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
Reason: "ReadyForNewScale",
}),
}
tc2.runTest(t)
}

func TestScaleUpRCImmediately(t *testing.T) {
Expand Down
12 changes: 9 additions & 3 deletions pkg/controller/podautoscaler/legacy_horizontal_test.go
Expand Up @@ -100,6 +100,8 @@ type legacyTestCase struct {
// Last scale time
lastScaleTime *metav1.Time
recommendations []timestampedRecommendation

finished bool
}

// Needs to be called under a lock.
Expand Down Expand Up @@ -462,12 +464,14 @@ func (tc *legacyTestCase) verifyResults(t *testing.T) {
func (tc *legacyTestCase) runTest(t *testing.T) {
testClient, testScaleClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)

eventClient := &fake.Clientset{}
eventClient.AddReactor("*", "events", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()

if tc.finished {
return true, &v1.Event{}, nil
}
obj := action.(core.CreateAction).GetObject().(*v1.Event)
if tc.verifyEvents {
switch obj.Reason {
Expand Down Expand Up @@ -514,17 +518,19 @@ func (tc *legacyTestCase) runTest(t *testing.T) {
informerFactory.Start(stop)
go hpaController.Run(stop)

// Wait for HPA to be processed.
<-tc.processed
tc.Lock()
tc.finished = true
if tc.verifyEvents {
tc.Unlock()
// We need to wait for events to be broadcasted (sleep for longer than record.sleepDuration).
time.Sleep(2 * time.Second)
} else {
tc.Unlock()
}
// Wait for HPA to be processed.
<-tc.processed
tc.verifyResults(t)

}

func TestLegacyScaleUp(t *testing.T) {
Expand Down