diff --git a/cluster-autoscaler/cluster_autoscaler.go b/cluster-autoscaler/cluster_autoscaler.go index 5012cdb058..0ffcaeff5f 100644 --- a/cluster-autoscaler/cluster_autoscaler.go +++ b/cluster-autoscaler/cluster_autoscaler.go @@ -22,23 +22,22 @@ import ( "time" "k8s.io/contrib/cluster-autoscaler/config" + "k8s.io/contrib/cluster-autoscaler/utils/gce" kube_client "k8s.io/kubernetes/pkg/client/unversioned" "github.com/golang/glog" ) var ( - migConfig config.MigConfigFlag - kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") + migConfigFlag config.MigConfigFlag + kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default") ) func main() { - flag.Var(&migConfig, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+ + flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+ "Can be used multiple times. Format: ::") flag.Parse() - glog.Infof("MIG: %s\n", migConfig.String()) - url, err := url.Parse(*kubernetes) if err != nil { glog.Fatalf("Failed to parse Kuberentes url: %v", err) @@ -52,6 +51,12 @@ func main() { unscheduledPodLister := NewUnscheduledPodLister(kubeClient) nodeLister := NewNodeLister(kubeClient) + migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag)) + gceManager, err := gce.CreateGceManager(migConfigs) + if err != nil { + glog.Fatalf("Failed to create GCE Manager %v", err) + } + for { select { case <-time.After(time.Minute): @@ -80,7 +85,10 @@ func main() { continue } - // TODO: Checking if all nodes are present. + if err := CheckMigsAndNodes(nodes, gceManager); err != nil { + glog.Warningf("Cluster is not ready for autoscaling: %v", err) + continue + } // Checks if scheduler tried to schedule the pods after thew newest node was added. newestNode := GetNewestNode(nodes) diff --git a/cluster-autoscaler/config/migconfig.go b/cluster-autoscaler/config/migconfig.go index 887b8dfa77..b6813a0ed3 100644 --- a/cluster-autoscaler/config/migconfig.go +++ b/cluster-autoscaler/config/migconfig.go @@ -56,9 +56,15 @@ type MigConfig struct { Name string } +// Url builds GCE url for the MIG. +func (migconfig *MigConfig) Url() string { + return gceurl.GenerateMigUrl(migconfig.Project, migconfig.Zone, migconfig.Name) +} + // MigConfigFlag is an array of MIG configuration details. Working as a multi-value flag. type MigConfigFlag []MigConfig +// String returns string representation of the MIG. func (migconfigflag *MigConfigFlag) String() string { configs := make([]string, len(*migconfigflag)) for _, migconfig := range *migconfigflag { diff --git a/cluster-autoscaler/utils.go b/cluster-autoscaler/utils.go index 1d9f966e74..2a8402fb88 100644 --- a/cluster-autoscaler/utils.go +++ b/cluster-autoscaler/utils.go @@ -17,8 +17,12 @@ limitations under the License. package main import ( + "fmt" "time" + "k8s.io/contrib/cluster-autoscaler/config" + "k8s.io/contrib/cluster-autoscaler/utils/gce" + kube_api "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/cache" kube_client "k8s.io/kubernetes/pkg/client/unversioned" @@ -58,16 +62,16 @@ type ReadyNodeLister struct { } // List returns ready nodes. -func (readyNodeLister *ReadyNodeLister) List() ([]kube_api.Node, error) { +func (readyNodeLister *ReadyNodeLister) List() ([]*kube_api.Node, error) { nodes, err := readyNodeLister.nodeLister.List() if err != nil { - return []kube_api.Node{}, err + return []*kube_api.Node{}, err } - readyNodes := make([]kube_api.Node, 0, len(nodes.Items)) - for _, node := range nodes.Items { + readyNodes := make([]*kube_api.Node, 0, len(nodes.Items)) + for i, node := range nodes.Items { for _, condition := range node.Status.Conditions { if condition.Type == kube_api.NodeReady && condition.Status == kube_api.ConditionTrue { - readyNodes = append(readyNodes, node) + readyNodes = append(readyNodes, &nodes.Items[i]) break } } @@ -87,11 +91,11 @@ func NewNodeLister(kubeClient *kube_client.Client) *ReadyNodeLister { } // GetNewestNode returns the newest node from the given list. -func GetNewestNode(nodes []kube_api.Node) *kube_api.Node { +func GetNewestNode(nodes []*kube_api.Node) *kube_api.Node { var result *kube_api.Node - for i, node := range nodes { + for _, node := range nodes { if result == nil || node.CreationTimestamp.After(result.CreationTimestamp.Time) { - result = &(nodes[i]) + result = node } } return result @@ -105,3 +109,35 @@ func GetOldestFailedSchedulingTrail(pods []*kube_api.Pod) *time.Time { now := time.Now() return &now } + +// CheckMigsAndNodes checks if all migs have all required nodes. +func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error { + migCount := make(map[string]int) + migs := make(map[string]*config.MigConfig) + for _, node := range nodes { + instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID) + if err != nil { + return err + } + + migConfig, err := gceManager.GetMigForInstance(instanceConfig) + if err != nil { + return err + } + url := migConfig.Url() + count, _ := migCount[url] + migCount[url] = count + 1 + migs[url] = migConfig + } + for url, mig := range migs { + size, err := gceManager.GetMigSize(mig) + if err != nil { + return err + } + count := migCount[url] + if size != int64(count) { + return fmt.Errorf("wrong number of nodes for mig: %s expected: %d actual: %d", url, size, count) + } + } + return nil +} diff --git a/cluster-autoscaler/utils/gce/gce.go b/cluster-autoscaler/utils/gce/gce.go index 9629e18826..3b63edc2bc 100644 --- a/cluster-autoscaler/utils/gce/gce.go +++ b/cluster-autoscaler/utils/gce/gce.go @@ -36,7 +36,8 @@ const ( operationPollInterval = 100 * time.Millisecond ) -type gceManager struct { +// GceManager is handles gce communication and data caching. +type GceManager struct { migs []*config.MigConfig service *gce.Service migCache map[config.InstanceConfig]*config.MigConfig @@ -44,7 +45,7 @@ type gceManager struct { } // CreateGceManager constructs gceManager object. -func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) { +func CreateGceManager(migs []*config.MigConfig) (*GceManager, error) { // Create Google Compute Engine service. client := oauth2.NewClient(oauth2.NoContext, google.ComputeTokenSource("")) gceService, err := gce.New(client) @@ -52,7 +53,7 @@ func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) { return nil, err } - manager := &gceManager{ + manager := &GceManager{ migs: migs, service: gceService, migCache: map[config.InstanceConfig]*config.MigConfig{}, @@ -63,7 +64,8 @@ func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) { return manager, nil } -func (m *gceManager) GetMigSize(migConf *config.MigConfig) (int64, error) { +// GetMigSize gets MIG size. +func (m *GceManager) GetMigSize(migConf *config.MigConfig) (int64, error) { mig, err := m.service.InstanceGroupManagers.Get(migConf.Project, migConf.Zone, migConf.Name).Do() if err != nil { return -1, err @@ -71,7 +73,8 @@ func (m *gceManager) GetMigSize(migConf *config.MigConfig) (int64, error) { return mig.TargetSize, nil } -func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error { +// SetMigSize sets MIG size. +func (m *GceManager) SetMigSize(migConf *config.MigConfig, size int64) error { op, err := m.service.InstanceGroupManagers.Resize(migConf.Project, migConf.Zone, migConf.Name, size).Do() if err != nil { return err @@ -82,7 +85,7 @@ func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error { return nil } -func (m *gceManager) waitForOp(operation *gce.Operation, project string) error { +func (m *GceManager) waitForOp(operation *gce.Operation, project string) error { for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) { if op, err := m.service.ZoneOperations.Get(project, operation.Zone, operation.Name).Do(); err == nil { if op.Status == "DONE" { @@ -95,8 +98,8 @@ func (m *gceManager) waitForOp(operation *gce.Operation, project string) error { return fmt.Errorf("Timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink) } -// All instances must be controlled by the same MIG. -func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error { +// DeleteInstances deletes the given instances. All instances must be controlled by the same MIG. +func (m *GceManager) DeleteInstances(instances []*config.InstanceConfig) error { if len(instances) == 0 { return nil } @@ -131,7 +134,8 @@ func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error { return nil } -func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) { +// GetMigForInstance returns MigConfig of the given Instance +func (m *GceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) { m.cacheMutex.Lock() defer m.cacheMutex.Unlock() if mig, found := m.migCache[*instance]; found { @@ -146,7 +150,7 @@ func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config return nil, fmt.Errorf("Instance %+v does not belong to any known MIG", *instance) } -func (m *gceManager) regenerateCacheIgnoreError() { +func (m *GceManager) regenerateCacheIgnoreError() { m.cacheMutex.Lock() defer m.cacheMutex.Unlock() if err := m.regenerateCache(); err != nil { @@ -154,7 +158,7 @@ func (m *gceManager) regenerateCacheIgnoreError() { } } -func (m *gceManager) regenerateCache() error { +func (m *GceManager) regenerateCache() error { newMigCache := map[config.InstanceConfig]*config.MigConfig{} for _, mig := range m.migs { diff --git a/cluster-autoscaler/utils/gce_url/gce_url.go b/cluster-autoscaler/utils/gce_url/gce_url.go index 17da62a537..e921c113af 100644 --- a/cluster-autoscaler/utils/gce_url/gce_url.go +++ b/cluster-autoscaler/utils/gce_url/gce_url.go @@ -44,6 +44,11 @@ func GenerateInstanceUrl(project, zone, name string) string { return fmt.Sprintf(instanceUrlTemplate, project, zone, name) } +// GenerateMigUrl generates url for instance. +func GenerateMigUrl(project, zone, name string) string { + return fmt.Sprintf(migUrlTemplate, project, zone, name) +} + func parseGceUrl(url, expectedResource string) (project string, zone string, name string, err error) { errMsg := fmt.Errorf("Wrong url: expected format https://content.googleapis.com/compute/v1/projects//zones//%s/, got %s", expectedResource, url) if !strings.HasPrefix(url, gcePrefix) {