Skip to content

Commit

Permalink
Add pod ambiguous selector check
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Beschetnov committed Nov 4, 2022
1 parent f14ebac commit caddfdd
Show file tree
Hide file tree
Showing 4 changed files with 1,228 additions and 11 deletions.
107 changes: 96 additions & 11 deletions pkg/controller/podautoscaler/horizontal.go
Expand Up @@ -50,6 +50,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/util/selectors"
)

var (
Expand Down Expand Up @@ -103,6 +104,10 @@ type HorizontalController struct {
scaleUpEventsLock sync.RWMutex
scaleDownEvents map[string][]timestampedScaleEvent
scaleDownEventsLock sync.RWMutex

// Storage of HPAs and their selectors.
hpaSelectors *selectors.BiMultimap
hpaSelectorsMux sync.Mutex
}

// NewHorizontalController creates a new HorizontalController.
Expand Down Expand Up @@ -139,6 +144,7 @@ func NewHorizontalController(
scaleUpEventsLock: sync.RWMutex{},
scaleDownEvents: map[string][]timestampedScaleEvent{},
scaleDownEventsLock: sync.RWMutex{},
hpaSelectors: selectors.NewBiMultimap(),
}

hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -203,6 +209,15 @@ func (a *HorizontalController) enqueueHPA(obj interface{}) {
// request for the HPA in the queue then a new request is always dropped. Requests spend resync
// interval in queue so HPAs are processed every resync interval.
a.queue.AddRateLimited(key)

// Register HPA in the hpaSelectors map if it's not present yet. Attaching the Nothing selector
// that does not select objects. The actual selector is going to be updated
// when it's available during the autoscaler reconciliation.
a.hpaSelectorsMux.Lock()
defer a.hpaSelectorsMux.Unlock()
if hpaKey := selectors.Parse(key); !a.hpaSelectors.SelectorExists(hpaKey) {
a.hpaSelectors.PutSelector(hpaKey, labels.Nothing())
}
}

func (a *HorizontalController) deleteHPA(obj interface{}) {
Expand All @@ -214,6 +229,11 @@ func (a *HorizontalController) deleteHPA(obj interface{}) {

// TODO: could we leak if we fail to get the key?
a.queue.Forget(key)

// Remove HPA and attached selector.
a.hpaSelectorsMux.Lock()
defer a.hpaSelectorsMux.Unlock()
a.hpaSelectors.DeleteSelector(selectors.Parse(key))
}

func (a *HorizontalController) worker(ctx context.Context) {
Expand Down Expand Up @@ -254,19 +274,10 @@ func (a *HorizontalController) processNextWorkItem(ctx context.Context) bool {
// all metrics computed.
func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
if scale.Status.Selector == "" {
errMsg := "selector is required"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
}

selector, err := labels.Parse(scale.Status.Selector)
selector, err := a.validateAndParseSelector(hpa, scale.Status.Selector)
if err != nil {
errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
return 0, "", nil, time.Time{}, err
}

specReplicas := scale.Spec.Replicas
Expand Down Expand Up @@ -305,6 +316,80 @@ func (a *HorizontalController) computeReplicasForMetrics(ctx context.Context, hp
return replicas, metric, statuses, timestamp, nil
}

// hpasControllingPodsUnderSelector returns a list of keys of all HPAs that control a given list of pods.
func (a *HorizontalController) hpasControllingPodsUnderSelector(pods []*v1.Pod) []selectors.Key {
a.hpaSelectorsMux.Lock()
defer a.hpaSelectorsMux.Unlock()

hpas := map[selectors.Key]struct{}{}
for _, p := range pods {
podKey := selectors.Key{Name: p.Name, Namespace: p.Namespace}
a.hpaSelectors.Put(podKey, p.Labels)

selectingHpas, ok := a.hpaSelectors.ReverseSelect(podKey)
if !ok {
continue
}
for _, hpa := range selectingHpas {
hpas[hpa] = struct{}{}
}
}
// Clean up all added pods.
a.hpaSelectors.KeepOnly([]selectors.Key{})

hpaList := []selectors.Key{}
for hpa := range hpas {
hpaList = append(hpaList, hpa)
}
return hpaList
}

// validateAndParseSelector verifies that:
// - selector is not empty;
// - selector format is valid;
// - all pods by current selector are controlled by only one HPA.
// Returns an error if the check has failed or the parsed selector if succeeded.
// In case of an error the ScalingActive is set to false with the corresponding reason.
func (a *HorizontalController) validateAndParseSelector(hpa *autoscalingv2.HorizontalPodAutoscaler, selector string) (labels.Selector, error) {
if selector == "" {
errMsg := "selector is required"
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
return nil, fmt.Errorf(errMsg)
}

parsedSelector, err := labels.Parse(selector)
if err != nil {
errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
return nil, fmt.Errorf(errMsg)
}

hpaKey := selectors.Key{Name: hpa.Name, Namespace: hpa.Namespace}
a.hpaSelectorsMux.Lock()
if a.hpaSelectors.SelectorExists(hpaKey) {
// Update HPA selector only if the HPA was registered in enqueueHPA.
a.hpaSelectors.PutSelector(hpaKey, parsedSelector)
}
a.hpaSelectorsMux.Unlock()

pods, err := a.podLister.Pods(hpa.Namespace).List(parsedSelector)
if err != nil {
return nil, err
}

selectingHpas := a.hpasControllingPodsUnderSelector(pods)
if len(selectingHpas) > 1 {
errMsg := fmt.Sprintf("pods by selector %v are controlled by multiple HPAs: %v", selector, selectingHpas)
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "AmbiguousSelector", errMsg)
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "AmbiguousSelector", errMsg)
return nil, fmt.Errorf(errMsg)
}

return parsedSelector, nil
}

// Computes the desired number of replicas for a specific hpa and metric specification,
// returning the metric status and a proposed condition to be set on the HPA object.
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
Expand Down
111 changes: 111 additions & 0 deletions pkg/controller/podautoscaler/horizontal_test.go
Expand Up @@ -43,6 +43,7 @@ import (
autoscalingapiv2 "k8s.io/kubernetes/pkg/apis/autoscaling/v2"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/util/selectors"
cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2"
emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
metricsapi "k8s.io/metrics/pkg/apis/metrics/v1beta1"
Expand Down Expand Up @@ -146,6 +147,7 @@ type testCase struct {
testScaleClient *scalefake.FakeScaleClient

recommendations []timestampedRecommendation
hpaSelectors *selectors.BiMultimap
}

// Needs to be called under a lock.
Expand Down Expand Up @@ -741,6 +743,9 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform
if tc.recommendations != nil {
hpaController.recommendations["test-namespace/test-hpa"] = tc.recommendations
}
if tc.hpaSelectors != nil {
hpaController.hpaSelectors = tc.hpaSelectors
}

return hpaController, informerFactory
}
Expand Down Expand Up @@ -2387,6 +2392,112 @@ func TestConditionInvalidSelectorUnparsable(t *testing.T) {
tc.runTest(t)
}

func TestConditionNoAmbiguousSelectorWhenNoSelectorOverlapBetweenHPAs(t *testing.T) {
hpaSelectors := selectors.NewBiMultimap()
hpaSelectors.PutSelector(selectors.Key{Name: "test-hpa-2", Namespace: testNamespace}, labels.SelectorFromSet(labels.Set{"cheddar": "cheese"}))

tc := testCase{
minReplicas: 2,
maxReplicas: 6,
specReplicas: 3,
statusReplicas: 3,
expectedDesiredReplicas: 5,
CPUTarget: 30,
reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsAPI: true,
hpaSelectors: hpaSelectors,
}
tc.runTest(t)
}

func TestConditionAmbiguousSelectorWhenFullSelectorOverlapBetweenHPAs(t *testing.T) {
hpaSelectors := selectors.NewBiMultimap()
hpaSelectors.PutSelector(selectors.Key{Name: "test-hpa-2", Namespace: testNamespace}, labels.SelectorFromSet(labels.Set{"name": podNamePrefix}))

tc := testCase{
minReplicas: 2,
maxReplicas: 6,
specReplicas: 3,
statusReplicas: 3,
expectedDesiredReplicas: 3,
CPUTarget: 30,
reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsAPI: true,
expectedConditions: []autoscalingv2.HorizontalPodAutoscalerCondition{
{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
Reason: "SucceededGetScale",
},
{
Type: autoscalingv2.ScalingActive,
Status: v1.ConditionFalse,
Reason: "AmbiguousSelector",
},
},
hpaSelectors: hpaSelectors,
}
tc.runTest(t)
}

func TestConditionAmbiguousSelectorWhenPartialSelectorOverlapBetweenHPAs(t *testing.T) {
hpaSelectors := selectors.NewBiMultimap()
hpaSelectors.PutSelector(selectors.Key{Name: "test-hpa-2", Namespace: testNamespace}, labels.SelectorFromSet(labels.Set{"cheddar": "cheese"}))

tc := testCase{
minReplicas: 2,
maxReplicas: 6,
specReplicas: 3,
statusReplicas: 3,
expectedDesiredReplicas: 3,
CPUTarget: 30,
reportedLevels: []uint64{300, 500, 700},
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
useMetricsAPI: true,
expectedConditions: []autoscalingv2.HorizontalPodAutoscalerCondition{
{
Type: autoscalingv2.AbleToScale,
Status: v1.ConditionTrue,
Reason: "SucceededGetScale",
},
{
Type: autoscalingv2.ScalingActive,
Status: v1.ConditionFalse,
Reason: "AmbiguousSelector",
},
},
hpaSelectors: hpaSelectors,
}

testClient, _, _, _, _ := tc.prepareTestClient(t)
tc.testClient = testClient

testClient.PrependReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) {
tc.Lock()
defer tc.Unlock()

obj := &v1.PodList{}
for i := range tc.reportedCPURequests {
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%d", podNamePrefix, i),
Namespace: testNamespace,
Labels: map[string]string{
"name": podNamePrefix, // selected by the original HPA
"cheddar": "cheese", // selected by test-hpa-2
},
},
}
obj.Items = append(obj.Items, pod)
}
return true, obj, nil
})

tc.runTest(t)
}

func TestConditionFailedGetMetrics(t *testing.T) {
targetValue := resource.MustParse("15.0")
averageValue := resource.MustParse("15.0")
Expand Down

0 comments on commit caddfdd

Please sign in to comment.