Skip to content

Commit

Permalink
filter out nodes waiting for CSI driver
Browse files Browse the repository at this point in the history
When a new node becomes ready, a CSI driver is not going to be running on it
immediately. This can cause the cluster autoscaler to scale up once more
because of pending pods that can run on that new node once the driver is ready.

The actual check is about CSIStorageCapacity. By comparing the published
information about the new node against the information for a template node, we
can determine whether the CSI driver is done with starting up on the node.

The new CSI processor needs information about existing CSIStorageCapacity
objects in the cluster, just like the scheduler predicate. Both can share the
same informer. For that to work, managing the informer factory must be moved up
the call chain so that the setup code for both can use the same factory.
  • Loading branch information
pohly committed Oct 6, 2021
1 parent 1aac77d commit 28c124b
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 26 deletions.
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/autoscaler.go
Expand Up @@ -31,13 +31,15 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
)

// AutoscalerOptions is the whole set of options for configuring an autoscaler
type AutoscalerOptions struct {
config.AutoscalingOptions
KubeClient kube_client.Interface
InformerFactory informers.SharedInformerFactory
EventsKubeClient kube_client.Interface
AutoscalingKubeClients *context.AutoscalingKubeClients
CloudProvider cloudprovider.CloudProvider
Expand Down Expand Up @@ -81,14 +83,13 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
// Initialize default options if not provided.
func initializeDefaultOptions(opts *AutoscalerOptions) error {
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors()
opts.Processors = ca_processors.DefaultProcessors(opts.InformerFactory)
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient)
}
if opts.PredicateChecker == nil {
predicateCheckerStopChannel := make(chan struct{})
predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel)
predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, opts.InformerFactory)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_test_common.go
Expand Up @@ -192,7 +192,7 @@ func NewScaleTestAutoscalingContext(
// Ignoring error here is safe - if a test doesn't specify valid estimatorName,
// it either doesn't need one, or should fail when it turns out to be nil.
estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName)
predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(fakeClient, make(chan struct{}))
predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(fakeClient, nil)
if err != nil {
return context.AutoscalingContext{}, err
}
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/core/static_autoscaler.go
Expand Up @@ -744,6 +744,12 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.nodeTransformation.IgnoredTaints, allNodes, readyNodes)

// Filter out nodes that aren't ready because of a missing CSI driver.
if a.processors.CSIProcessor != nil {
allNodes, readyNodes = a.processors.CSIProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
}

return allNodes, readyNodes, nil
}

Expand Down
31 changes: 29 additions & 2 deletions cluster-autoscaler/main.go
Expand Up @@ -49,6 +49,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/replace"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/autoscaler/cluster-autoscaler/version"
"k8s.io/client-go/informers"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -317,15 +318,21 @@ func buildAutoscaler() (core.Autoscaler, error) {
autoscalingOptions := createAutoscalingOptions()
kubeClient := createKubeClient(getKubeConfig())
eventsKubeClient := createKubeClient(getKubeConfig())
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)

// This isn't particularly useful at the moment. Perhaps we can accept
// a context and then our caller can decide about a suitable deadline.
stopChannel := make(chan struct{})

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: simulator.NewDeltaClusterSnapshot(),
KubeClient: kubeClient,
InformerFactory: informerFactory,
EventsKubeClient: eventsKubeClient,
}

opts.Processors = ca_processors.DefaultProcessors()
opts.Processors = ca_processors.DefaultProcessors(informerFactory)
opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()

nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator
Expand All @@ -348,7 +355,27 @@ func buildAutoscaler() (core.Autoscaler, error) {
metrics.UpdateMemoryLimitsBytes(autoscalingOptions.MinMemoryTotal, autoscalingOptions.MaxMemoryTotal)

// Create autoscaler.
return core.NewAutoscaler(opts)
autoscaler, err := core.NewAutoscaler(opts)
if err != nil {
return nil, err
}

// this MUST be called after all the informers/listers are acquired via the
// informerFactory....Lister()/informerFactory....Informer() methods
informerFactory.Start(stopChannel)

// Also wait for all informers to be up-to-date. This is necessary for
// objects that were added when creating the fake client. Without
// this wait, those objects won't be visible via the informers when
// the test runs.
synced := informerFactory.WaitForCacheSync(stopChannel)
for k, v := range synced {
if !v {
return nil, fmt.Errorf("failed to sync informer %v", k)
}
}

return autoscaler, err
}

func run(healthCheck *metrics.HealthCheck) {
Expand Down
159 changes: 159 additions & 0 deletions cluster-autoscaler/processors/csi/csi_processor.go
@@ -0,0 +1,159 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package csi

import (
apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/client-go/informers"
storagev1beta1listers "k8s.io/client-go/listers/storage/v1beta1"
"k8s.io/klog/v2"
)

// CSIProcessor checks whether a node is ready for applications using volumes
// provided by a CSI driver. This is relevant when the autoscaler has been
// configured to check storage capacity.
//
// Without this processor, the following happens:
// - autoscaler determines that it needs a new node to get volumes
// for a pending pod created
// - the new node starts and is ready to run pods, but the CSI driver
// itself hasn't started running on it yet
// - autoscaler checks for pending pods, finds that the pod still
// cannot run and asks for another node
// - the CSI driver starts, creates volumes and the pod runs
// => the extra node is redundant
//
// To determine whether a node will have a CSI driver, a heuristic is used: if
// a template node derived from the node has a CSIStorageCapacity object, then
// the node itself should also have one, otherwise it is not ready.
type CSIProcessor interface {
// FilterOutNodesWithUnreadyResources removes nodes that should have a CSI
// driver, but don't have CSIStorageCapacity information yet.
FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node)

// CleanUp frees resources.
CleanUp()
}

type csiProcessor struct {
csiStorageCapacityLister storagev1beta1listers.CSIStorageCapacityLister
}

// FilterOutNodesWithUnreadyResources removes nodes that should have a CSI
// driver, but don't have CSIStorageCapacity information yet.
func (p csiProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) {
newAllNodes := make([]*apiv1.Node, 0, len(allNodes))
newReadyNodes := make([]*apiv1.Node, 0, len(readyNodes))
nodesWithUnreadyCSI := make(map[string]*apiv1.Node)
for _, node := range readyNodes {
// TODO: short-circuit this check if the node has been in the
// ready state long enough? If all of the tests below hit the
// API server to query CSIDriver and CSIStorageCapacity objects
// and all nodes in a cluster get checked again during each
// scale up run, then this might create a lot of additional
// load.
klog.V(3).Infof("checking CSIStorageCapacity of node %s", node.Name)
if p.isReady(context, node) {
newReadyNodes = append(newReadyNodes, node)
} else {
nodesWithUnreadyCSI[node.Name] = kubernetes.GetUnreadyNodeCopy(node)
}
}
// Override any node with unready CSI with its "unready" copy
for _, node := range allNodes {
if newNode, found := nodesWithUnreadyCSI[node.Name]; found {
newAllNodes = append(newAllNodes, newNode)
} else {
newAllNodes = append(newAllNodes, node)
}
}
return newAllNodes, newReadyNodes
}

func (p csiProcessor) isReady(context *context.AutoscalingContext, node *v1.Node) bool {
cloudProvider := context.CloudProvider
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil || nodeGroup == nil {
// Not a node that is part of a node group? Assume that the normal
// ready state applies and continue.
klog.V(3).Infof("node %s has no node group, skip CSI check (error: %v)", node.Name, err)
return true
}
nodeInfo, err := nodeGroup.TemplateNodeInfo()
if err != nil {
// Again, ignore the node.
klog.V(3).Infof("node %s has no node info, skip CSI check: %v", node.Name, err)
return true
}
templateNode := nodeInfo.Node()
expected := p.numStorageCapacityObjects(templateNode)
if expected == 0 {
// Node cannot be unready because no CSI storage is expected.
klog.V(3).Infof("node %s is not expected to have CSI storage: %v", node.Name)
return true
}
actual := p.numStorageCapacityObjects(node)
if expected <= actual {
klog.V(3).Infof("node %s has enough CSIStorageCapacity objects (expected %d, have %d)",
node.Name, expected, actual)
return true
}

// CSI driver should have published capacity information and
// hasn't done it yet -> treat the node as not ready yet.
klog.V(3).Infof("node %s is expected to have %d CSIStorageCapacity objects, only has %d -> treat it as unready",
node.Name, expected, actual)
return false
}

func (p csiProcessor) numStorageCapacityObjects(node *v1.Node) int {
count := 0
capacities, err := p.csiStorageCapacityLister.List(labels.Everything())
if err != nil {
klog.Error(err, "list CSIStorageCapacity")
return 0
}
for _, capacity := range capacities {
// match labels
if capacity.NodeTopology == nil {
continue
}
selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology)
if err != nil {
// Invalid capacity object? Ignore it.
continue
}
if selector.Matches(labels.Set(node.Labels)) {
count++
}
}
return count
}

func (p csiProcessor) CleanUp() {}

// NewDefaultCSIProcessor returns a default instance of CSIProcessor.
func NewDefaultCSIProcessor(informerFactory informers.SharedInformerFactory) CSIProcessor {
return csiProcessor{
csiStorageCapacityLister: informerFactory.Storage().V1beta1().CSIStorageCapacities().Lister(),
}
}
8 changes: 7 additions & 1 deletion cluster-autoscaler/processors/processors.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package processors

import (
"k8s.io/autoscaler/cluster-autoscaler/processors/csi"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/client-go/informers"
)

// AutoscalingProcessors are a set of customizable processors used for encapsulating
Expand Down Expand Up @@ -55,10 +57,12 @@ type AutoscalingProcessors struct {
NodeGroupConfigProcessor nodegroupconfig.NodeGroupConfigProcessor
// CustomResourcesProcessor is interface defining handling custom resources
CustomResourcesProcessor customresources.CustomResourcesProcessor
// CSIProcessor checks for nodes that are not ready because the CSI driver is still starting up.
CSIProcessor csi.CSIProcessor
}

// DefaultProcessors returns default set of processors.
func DefaultProcessors() *AutoscalingProcessors {
func DefaultProcessors(informerFactory informers.SharedInformerFactory) *AutoscalingProcessors {
return &AutoscalingProcessors{
PodListProcessor: pods.NewDefaultPodListProcessor(),
NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(),
Expand All @@ -71,6 +75,7 @@ func DefaultProcessors() *AutoscalingProcessors {
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
CSIProcessor: csi.NewDefaultCSIProcessor(informerFactory),
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
}
}
Expand All @@ -88,5 +93,6 @@ func (ap *AutoscalingProcessors) CleanUp() {
ap.NodeInfoProcessor.CleanUp()
ap.NodeGroupConfigProcessor.CleanUp()
ap.CustomResourcesProcessor.CleanUp()
ap.CSIProcessor.CleanUp()
ap.TemplateNodeInfoProvider.CleanUp()
}
38 changes: 20 additions & 18 deletions cluster-autoscaler/simulator/scheduler_based_predicates_checker.go
Expand Up @@ -42,8 +42,15 @@ type SchedulerBasedPredicateChecker struct {
}

// NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker.
func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
// The informer factory is optional. If nil, then this function will create one,
// start it and wait for cache syncing.
func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) (*SchedulerBasedPredicateChecker, error) {
startInformer := false
if informerFactory == nil {
informerFactory = informers.NewSharedInformerFactory(kubeClient, 0)
startInformer = true
}

config, err := scheduler_config.Default()
if err != nil {
return nil, fmt.Errorf("couldn't create scheduler config: %v", err)
Expand All @@ -64,26 +71,21 @@ func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-
return nil, fmt.Errorf("couldn't create scheduler framework; %v", err)
}

if startInformer {
stopChannel := make(chan struct{})
informerFactory.Start(stopChannel)
synced := informerFactory.WaitForCacheSync(stopChannel)
for k, v := range synced {
if !v {
return nil, fmt.Errorf("failed to sync informer %v", k)
}
}
}

checker := &SchedulerBasedPredicateChecker{
framework: framework,
delegatingSharedLister: sharedLister,
}

// this MUST be called after all the informers/listers are acquired via the
// informerFactory....Lister()/informerFactory....Informer() methods
informerFactory.Start(stop)

// Also wait for all informers to be up-to-date. This is necessary for
// objects that were added when creating the fake client. Without
// this wait, those objects won't be visible via the informers when
// the test runs.
synced := informerFactory.WaitForCacheSync(stop)
for k, v := range synced {
if !v {
return nil, fmt.Errorf("failed to sync informer %v", k)
}
}

return checker, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/simulator/test_predicates_checker.go
Expand Up @@ -23,5 +23,5 @@ import (
// NewTestPredicateChecker builds test version of PredicateChecker.
func NewTestPredicateChecker() (PredicateChecker, error) {
// just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient
return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), make(chan struct{}))
return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), nil)
}

0 comments on commit 28c124b

Please sign in to comment.