Skip to content
This repository was archived by the owner on Apr 17, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions cluster-autoscaler/cluster_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ import (
"time"

"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/utils/gce"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"

"github.com/golang/glog"
)

var (
migConfig config.MigConfigFlag
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
migConfigFlag config.MigConfigFlag
kubernetes = flag.String("kubernetes", "", "Kuberentes master location. Leave blank for default")
)

func main() {
flag.Var(&migConfig, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
flag.Var(&migConfigFlag, "nodes", "sets min,max size and url of a MIG to be controlled by Cluster Autoscaler. "+
"Can be used multiple times. Format: <min>:<max>:<migurl>")
flag.Parse()

glog.Infof("MIG: %s\n", migConfig.String())

url, err := url.Parse(*kubernetes)
if err != nil {
glog.Fatalf("Failed to parse Kuberentes url: %v", err)
Expand All @@ -52,6 +51,12 @@ func main() {
unscheduledPodLister := NewUnscheduledPodLister(kubeClient)
nodeLister := NewNodeLister(kubeClient)

migConfigs := make([]*config.MigConfig, 0, len(migConfigFlag))
gceManager, err := gce.CreateGceManager(migConfigs)
if err != nil {
glog.Fatalf("Failed to create GCE Manager %v", err)
}

for {
select {
case <-time.After(time.Minute):
Expand Down Expand Up @@ -80,7 +85,10 @@ func main() {
continue
}

// TODO: Checking if all nodes are present.
if err := CheckMigsAndNodes(nodes, gceManager); err != nil {
glog.Warningf("Cluster is not ready for autoscaling: %v", err)
continue
}

// Checks if scheduler tried to schedule the pods after thew newest node was added.
newestNode := GetNewestNode(nodes)
Expand Down
6 changes: 6 additions & 0 deletions cluster-autoscaler/config/migconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ type MigConfig struct {
Name string
}

// Url builds GCE url for the MIG.
func (migconfig *MigConfig) Url() string {
return gceurl.GenerateMigUrl(migconfig.Project, migconfig.Zone, migconfig.Name)
}

// MigConfigFlag is an array of MIG configuration details. Working as a multi-value flag.
type MigConfigFlag []MigConfig

// String returns string representation of the MIG.
func (migconfigflag *MigConfigFlag) String() string {
configs := make([]string, len(*migconfigflag))
for _, migconfig := range *migconfigflag {
Expand Down
52 changes: 44 additions & 8 deletions cluster-autoscaler/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ limitations under the License.
package main

import (
"fmt"
"time"

"k8s.io/contrib/cluster-autoscaler/config"
"k8s.io/contrib/cluster-autoscaler/utils/gce"

kube_api "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kube_client "k8s.io/kubernetes/pkg/client/unversioned"
Expand Down Expand Up @@ -58,16 +62,16 @@ type ReadyNodeLister struct {
}

// List returns ready nodes.
func (readyNodeLister *ReadyNodeLister) List() ([]kube_api.Node, error) {
func (readyNodeLister *ReadyNodeLister) List() ([]*kube_api.Node, error) {
nodes, err := readyNodeLister.nodeLister.List()
if err != nil {
return []kube_api.Node{}, err
return []*kube_api.Node{}, err
}
readyNodes := make([]kube_api.Node, 0, len(nodes.Items))
for _, node := range nodes.Items {
readyNodes := make([]*kube_api.Node, 0, len(nodes.Items))
for i, node := range nodes.Items {
for _, condition := range node.Status.Conditions {
if condition.Type == kube_api.NodeReady && condition.Status == kube_api.ConditionTrue {
readyNodes = append(readyNodes, node)
readyNodes = append(readyNodes, &nodes.Items[i])
break
}
}
Expand All @@ -87,11 +91,11 @@ func NewNodeLister(kubeClient *kube_client.Client) *ReadyNodeLister {
}

// GetNewestNode returns the newest node from the given list.
func GetNewestNode(nodes []kube_api.Node) *kube_api.Node {
func GetNewestNode(nodes []*kube_api.Node) *kube_api.Node {
var result *kube_api.Node
for i, node := range nodes {
for _, node := range nodes {
if result == nil || node.CreationTimestamp.After(result.CreationTimestamp.Time) {
result = &(nodes[i])
result = node
}
}
return result
Expand All @@ -105,3 +109,35 @@ func GetOldestFailedSchedulingTrail(pods []*kube_api.Pod) *time.Time {
now := time.Now()
return &now
}

// CheckMigsAndNodes checks if all migs have all required nodes.
func CheckMigsAndNodes(nodes []*kube_api.Node, gceManager *gce.GceManager) error {
migCount := make(map[string]int)
migs := make(map[string]*config.MigConfig)
for _, node := range nodes {
instanceConfig, err := config.InstanceConfigFromProviderId(node.Spec.ProviderID)
if err != nil {
return err
}

migConfig, err := gceManager.GetMigForInstance(instanceConfig)
if err != nil {
return err
}
url := migConfig.Url()
count, _ := migCount[url]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't need the second variable here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

migCount[url] = count + 1
migs[url] = migConfig
}
for url, mig := range migs {
size, err := gceManager.GetMigSize(mig)
if err != nil {
return err
}
count := migCount[url]
if size != int64(count) {
return fmt.Errorf("wrong number of nodes for mig: %s expected: %d actual: %d", url, size, count)
}
}
return nil
}
26 changes: 15 additions & 11 deletions cluster-autoscaler/utils/gce/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,24 @@ const (
operationPollInterval = 100 * time.Millisecond
)

type gceManager struct {
// GceManager is handles gce communication and data caching.
type GceManager struct {
migs []*config.MigConfig
service *gce.Service
migCache map[config.InstanceConfig]*config.MigConfig
cacheMutex sync.Mutex
}

// CreateGceManager constructs gceManager object.
func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) {
func CreateGceManager(migs []*config.MigConfig) (*GceManager, error) {
// Create Google Compute Engine service.
client := oauth2.NewClient(oauth2.NoContext, google.ComputeTokenSource(""))
gceService, err := gce.New(client)
if err != nil {
return nil, err
}

manager := &gceManager{
manager := &GceManager{
migs: migs,
service: gceService,
migCache: map[config.InstanceConfig]*config.MigConfig{},
Expand All @@ -63,15 +64,17 @@ func CreateGceManager(migs []*config.MigConfig) (*gceManager, error) {
return manager, nil
}

func (m *gceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
// GetMigSize gets MIG size.
func (m *GceManager) GetMigSize(migConf *config.MigConfig) (int64, error) {
mig, err := m.service.InstanceGroupManagers.Get(migConf.Project, migConf.Zone, migConf.Name).Do()
if err != nil {
return -1, err
}
return mig.TargetSize, nil
}

func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
// SetMigSize sets MIG size.
func (m *GceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
op, err := m.service.InstanceGroupManagers.Resize(migConf.Project, migConf.Zone, migConf.Name, size).Do()
if err != nil {
return err
Expand All @@ -82,7 +85,7 @@ func (m *gceManager) SetMigSize(migConf *config.MigConfig, size int64) error {
return nil
}

func (m *gceManager) waitForOp(operation *gce.Operation, project string) error {
func (m *GceManager) waitForOp(operation *gce.Operation, project string) error {
for start := time.Now(); time.Since(start) < operationWaitTimeout; time.Sleep(operationPollInterval) {
if op, err := m.service.ZoneOperations.Get(project, operation.Zone, operation.Name).Do(); err == nil {
if op.Status == "DONE" {
Expand All @@ -95,8 +98,8 @@ func (m *gceManager) waitForOp(operation *gce.Operation, project string) error {
return fmt.Errorf("Timeout while waiting for operation %s on %s to complete.", operation.Name, operation.TargetLink)
}

// All instances must be controlled by the same MIG.
func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error {
// DeleteInstances deletes the given instances. All instances must be controlled by the same MIG.
func (m *GceManager) DeleteInstances(instances []*config.InstanceConfig) error {
if len(instances) == 0 {
return nil
}
Expand Down Expand Up @@ -131,7 +134,8 @@ func (m *gceManager) DeleteInstances(instances []*config.InstanceConfig) error {
return nil
}

func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) {
// GetMigForInstance returns MigConfig of the given Instance
func (m *GceManager) GetMigForInstance(instance *config.InstanceConfig) (*config.MigConfig, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if mig, found := m.migCache[*instance]; found {
Expand All @@ -146,15 +150,15 @@ func (m *gceManager) GetMigForInstance(instance *config.InstanceConfig) (*config
return nil, fmt.Errorf("Instance %+v does not belong to any known MIG", *instance)
}

func (m *gceManager) regenerateCacheIgnoreError() {
func (m *GceManager) regenerateCacheIgnoreError() {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if err := m.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Mig cache: %v", err)
}
}

func (m *gceManager) regenerateCache() error {
func (m *GceManager) regenerateCache() error {
newMigCache := map[config.InstanceConfig]*config.MigConfig{}

for _, mig := range m.migs {
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/utils/gce_url/gce_url.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func GenerateInstanceUrl(project, zone, name string) string {
return fmt.Sprintf(instanceUrlTemplate, project, zone, name)
}

// GenerateMigUrl generates url for instance.
func GenerateMigUrl(project, zone, name string) string {
return fmt.Sprintf(migUrlTemplate, project, zone, name)
}

func parseGceUrl(url, expectedResource string) (project string, zone string, name string, err error) {
errMsg := fmt.Errorf("Wrong url: expected format https://content.googleapis.com/compute/v1/projects/<project-id>/zones/<zone>/%s/<name>, got %s", expectedResource, url)
if !strings.HasPrefix(url, gcePrefix) {
Expand Down