Skip to content

Commit

Permalink
cluster-autoscaler: Refactor AwsManager for less complexity by extrac…
Browse files Browse the repository at this point in the history
…ting types

Accordingly to the discussion made [here](#46 (comment))
  • Loading branch information
mumoshu committed Jun 8, 2017
1 parent 2235046 commit 37b8225
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 221 deletions.
144 changes: 144 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
"errors"
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/golang/glog"
)

// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
type autoScaling interface {
DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error)
DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error)
SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error)
TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error)
}

// autoScalingWrapper provides several utility methods over the auto-scaling service provided by AWS SDK
type autoScalingWrapper struct {
autoScaling
}

func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling.Group, error) {
params := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []*string{aws.String(name)},
MaxRecords: aws.Int64(1),
}
groups, err := m.DescribeAutoScalingGroups(params)
if err != nil {
glog.V(4).Infof("Failed ASG info request for %s: %v", name, err)
return nil, err
}
if len(groups.AutoScalingGroups) < 1 {
return nil, fmt.Errorf("Unable to get first autoscaling.Group for %s", name)
}
return groups.AutoScalingGroups[0], nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names)

nameRefs := []*string{}
for _, n := range names {
nameRefs = append(nameRefs, aws.String(n))
}
params := &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: nameRefs,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
}
description, err := m.DescribeAutoScalingGroups(params)
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
return nil, err
}
if len(description.AutoScalingGroups) < 1 {
return nil, errors.New("No ASGs found")
}

asgs := description.AutoScalingGroups
for description.NextToken != nil {
description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASGs : %v", err)
return nil, err
}
asgs = append(asgs, description.AutoScalingGroups...)
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs)

return asgs, nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByTag(key string) ([]*autoscaling.Group, error) {
glog.V(6).Infof("Starting getAutoscalingGroupsByTag with key=%v", key)

tags := []*autoscaling.TagDescription{}

description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{
Filters: []*autoscaling.Filter{
{
Name: aws.String("key"),
Values: []*string{aws.String(key)},
},
},
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for key %s : %v", key, err)
return nil, err
}
if len(description.Tags) < 1 {
return nil, fmt.Errorf("Unable to find ASGs for tag key %s", key)
}
tags = append(tags, description.Tags...)

for description.NextToken != nil {
description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{
NextToken: description.NextToken,
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
})
if err != nil {
glog.V(4).Infof("Failed to describe ASG tags for key %s: %v", key, err)
return nil, err
}
tags = append(tags, description.Tags...)
}

asgNames := []string{}
for _, t := range tags {
asgName := t.ResourceId
asgNames = append(asgNames, *asgName)
}

asgs, err := m.getAutoscalingGroupsByNames(asgNames)
if err != nil {
return nil, err
}

glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs)

return asgs, nil
}
108 changes: 108 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aws

import (
"fmt"
"sync"
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)

type autoScalingGroups struct {
registeredAsgs []*asgInformation
instanceToAsg map[AwsRef]*Asg
cacheMutex sync.Mutex
instancesNotInManagedAsg map[AwsRef]struct{}
service autoScalingWrapper
}

func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups {
registry := &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
service: service,
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}

go wait.Forever(func() {
registry.cacheMutex.Lock()
defer registry.cacheMutex.Unlock()
if err := registry.regenerateCache(); err != nil {
glog.Errorf("Error while regenerating Asg cache: %v", err)
}
}, time.Hour)

return registry
}

// Register registers asg in Aws Manager.
func (m *autoScalingGroups) Register(asg *Asg) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()

m.registeredAsgs = append(m.registeredAsgs, &asgInformation{
config: asg,
})
}

// FindForInstance returns AsgConfig of the given Instance
func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
if _, found := m.instancesNotInManagedAsg[*instance]; found {
// The instance is already known to not belong to any configured ASG
// Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups
// See https://github.com/kubernetes/contrib/issues/2541
return nil, nil
}
if err := m.regenerateCache(); err != nil {
return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err)
}
if config, found := m.instanceToAsg[*instance]; found {
return config, nil
}
// instance does not belong to any configured ASG
glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance)
m.instancesNotInManagedAsg[*instance] = struct{}{}
return nil, nil
}

func (m *autoScalingGroups) regenerateCache() error {
newCache := make(map[AwsRef]*Asg)

for _, asg := range m.registeredAsgs {
glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name)

group, err := m.service.getAutoscalingGroupByName(asg.config.Name)
if err != nil {
return err
}
for _, instance := range group.Instances {
ref := AwsRef{Name: *instance.InstanceId}
newCache[ref] = asg.config
}
}

m.instanceToAsg = newCache
return nil
}
60 changes: 26 additions & 34 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,28 @@ func (a *AutoScalingMock) TerminateInstanceInAutoScalingGroup(input *autoscaling
return args.Get(0).(*autoscaling.TerminateInstanceInAutoScalingGroupOutput), nil
}

var testService = autoScalingWrapper{&AutoScalingMock{}}

var testAwsManager = &AwsManager{
asgs: make([]*asgInformation, 0),
service: &AutoScalingMock{},
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
asgs: &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
},
service: testService,
}

func newTestAwsManagerWithService(service autoScaling) *AwsManager {
wrapper := autoScalingWrapper{service}
return &AwsManager{
service: wrapper,
asgs: &autoScalingGroups{
registeredAsgs: make([]*asgInformation, 0),
instanceToAsg: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
service: wrapper,
},
}
}

func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput {
Expand Down Expand Up @@ -129,12 +146,7 @@ func TestNodeGroupForNode(t *testing.T) {
},
}
service := &AutoScalingMock{}
m := &AwsManager{
asgs: make([]*asgInformation, 0),
service: service,
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}
m := newTestAwsManagerWithService(service)
provider := testProvider(t, m)
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)
Expand Down Expand Up @@ -195,12 +207,7 @@ func TestMinSize(t *testing.T) {

func TestTargetSize(t *testing.T) {
service := &AutoScalingMock{}
m := &AwsManager{
asgs: make([]*asgInformation, 0),
service: service,
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}
m := newTestAwsManagerWithService(service)
provider := testProvider(t, m)
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)
Expand All @@ -219,12 +226,7 @@ func TestTargetSize(t *testing.T) {

func TestIncreaseSize(t *testing.T) {
service := &AutoScalingMock{}
m := &AwsManager{
asgs: make([]*asgInformation, 0),
service: service,
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}
m := newTestAwsManagerWithService(service)
provider := testProvider(t, m)
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)
Expand All @@ -249,12 +251,7 @@ func TestIncreaseSize(t *testing.T) {

func TestBelongs(t *testing.T) {
service := &AutoScalingMock{}
m := &AwsManager{
asgs: make([]*asgInformation, 0),
service: service,
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}
m := newTestAwsManagerWithService(service)
provider := testProvider(t, m)
err := provider.addNodeGroup("1:5:test-asg")
assert.NoError(t, err)
Expand Down Expand Up @@ -288,12 +285,7 @@ func TestBelongs(t *testing.T) {

func TestDeleteNodes(t *testing.T) {
service := &AutoScalingMock{}
m := &AwsManager{
asgs: make([]*asgInformation, 0),
service: service,
asgCache: make(map[AwsRef]*Asg),
instancesNotInManagedAsg: make(map[AwsRef]struct{}),
}
m := newTestAwsManagerWithService(service)

service.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String("test-instance-id"),
Expand Down

0 comments on commit 37b8225

Please sign in to comment.