diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling.go new file mode 100644 index 000000000000..4ebd63e23aec --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling.go @@ -0,0 +1,144 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "errors" + "fmt" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/golang/glog" +) + +// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA +type autoScaling interface { + DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) + DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error) + SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) + TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) +} + +// autoScalingWrapper provides several utility methods over the auto-scaling service provided by AWS SDK +type autoScalingWrapper struct { + autoScaling +} + +func (m autoScalingWrapper) getAutoscalingGroupByName(name string) (*autoscaling.Group, error) { + params := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: []*string{aws.String(name)}, + MaxRecords: aws.Int64(1), + } + groups, err := m.DescribeAutoScalingGroups(params) + if err != nil { + glog.V(4).Infof("Failed ASG info request for %s: %v", name, err) + return nil, err + } + if len(groups.AutoScalingGroups) < 1 { + return nil, fmt.Errorf("Unable to get first autoscaling.Group for %s", name) + } + return groups.AutoScalingGroups[0], nil +} + +func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) { + glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names) + + nameRefs := []*string{} + for _, n := range names { + nameRefs = append(nameRefs, aws.String(n)) + } + params := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: nameRefs, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + } + description, err := m.DescribeAutoScalingGroups(params) + if err != nil { + glog.V(4).Infof("Failed to describe ASGs : %v", err) + return nil, err + } + if len(description.AutoScalingGroups) < 1 { + return nil, errors.New("No ASGs found") + } + + asgs := description.AutoScalingGroups + for description.NextToken != nil { + description, err = m.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{ + NextToken: description.NextToken, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }) + if err != nil { + glog.V(4).Infof("Failed to describe ASGs : %v", err) + return nil, err + } + asgs = append(asgs, description.AutoScalingGroups...) + } + + glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs) + + return asgs, nil +} + +func (m *autoScalingWrapper) getAutoscalingGroupsByTag(key string) ([]*autoscaling.Group, error) { + glog.V(6).Infof("Starting getAutoscalingGroupsByTag with key=%v", key) + + tags := []*autoscaling.TagDescription{} + + description, err := m.DescribeTags(&autoscaling.DescribeTagsInput{ + Filters: []*autoscaling.Filter{ + { + Name: aws.String("key"), + Values: []*string{aws.String(key)}, + }, + }, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }) + if err != nil { + glog.V(4).Infof("Failed to describe ASG tags for key %s : %v", key, err) + return nil, err + } + if len(description.Tags) < 1 { + return nil, fmt.Errorf("Unable to find ASGs for tag key %s", key) + } + tags = append(tags, description.Tags...) + + for description.NextToken != nil { + description, err = m.DescribeTags(&autoscaling.DescribeTagsInput{ + NextToken: description.NextToken, + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }) + if err != nil { + glog.V(4).Infof("Failed to describe ASG tags for key %s: %v", key, err) + return nil, err + } + tags = append(tags, description.Tags...) + } + + asgNames := []string{} + for _, t := range tags { + asgName := t.ResourceId + asgNames = append(asgNames, *asgName) + } + + asgs, err := m.getAutoscalingGroupsByNames(asgNames) + if err != nil { + return nil, err + } + + glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs) + + return asgs, nil +} diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go new file mode 100644 index 000000000000..4623abd740ca --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -0,0 +1,108 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aws + +import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" +) + +type autoScalingGroups struct { + registeredAsgs []*asgInformation + instanceToAsg map[AwsRef]*Asg + cacheMutex sync.Mutex + instancesNotInManagedAsg map[AwsRef]struct{} + service autoScalingWrapper +} + +func newAutoScalingGroups(service autoScalingWrapper) *autoScalingGroups { + registry := &autoScalingGroups{ + registeredAsgs: make([]*asgInformation, 0), + service: service, + instanceToAsg: make(map[AwsRef]*Asg), + instancesNotInManagedAsg: make(map[AwsRef]struct{}), + } + + go wait.Forever(func() { + registry.cacheMutex.Lock() + defer registry.cacheMutex.Unlock() + if err := registry.regenerateCache(); err != nil { + glog.Errorf("Error while regenerating Asg cache: %v", err) + } + }, time.Hour) + + return registry +} + +// Register registers asg in Aws Manager. +func (m *autoScalingGroups) Register(asg *Asg) { + m.cacheMutex.Lock() + defer m.cacheMutex.Unlock() + + m.registeredAsgs = append(m.registeredAsgs, &asgInformation{ + config: asg, + }) +} + +// FindForInstance returns AsgConfig of the given Instance +func (m *autoScalingGroups) FindForInstance(instance *AwsRef) (*Asg, error) { + m.cacheMutex.Lock() + defer m.cacheMutex.Unlock() + if config, found := m.instanceToAsg[*instance]; found { + return config, nil + } + if _, found := m.instancesNotInManagedAsg[*instance]; found { + // The instance is already known to not belong to any configured ASG + // Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups + // See https://github.com/kubernetes/contrib/issues/2541 + return nil, nil + } + if err := m.regenerateCache(); err != nil { + return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err) + } + if config, found := m.instanceToAsg[*instance]; found { + return config, nil + } + // instance does not belong to any configured ASG + glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance) + m.instancesNotInManagedAsg[*instance] = struct{}{} + return nil, nil +} + +func (m *autoScalingGroups) regenerateCache() error { + newCache := make(map[AwsRef]*Asg) + + for _, asg := range m.registeredAsgs { + glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name) + + group, err := m.service.getAutoscalingGroupByName(asg.config.Name) + if err != nil { + return err + } + for _, instance := range group.Instances { + ref := AwsRef{Name: *instance.InstanceId} + newCache[ref] = asg.config + } + } + + m.instanceToAsg = newCache + return nil +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 214d953ea7c7..74077259002d 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -59,11 +59,28 @@ func (a *AutoScalingMock) TerminateInstanceInAutoScalingGroup(input *autoscaling return args.Get(0).(*autoscaling.TerminateInstanceInAutoScalingGroupOutput), nil } +var testService = autoScalingWrapper{&AutoScalingMock{}} + var testAwsManager = &AwsManager{ - asgs: make([]*asgInformation, 0), - service: &AutoScalingMock{}, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), + asgs: &autoScalingGroups{ + registeredAsgs: make([]*asgInformation, 0), + instanceToAsg: make(map[AwsRef]*Asg), + instancesNotInManagedAsg: make(map[AwsRef]struct{}), + }, + service: testService, +} + +func newTestAwsManagerWithService(service autoScaling) *AwsManager { + wrapper := autoScalingWrapper{service} + return &AwsManager{ + service: wrapper, + asgs: &autoScalingGroups{ + registeredAsgs: make([]*asgInformation, 0), + instanceToAsg: make(map[AwsRef]*Asg), + instancesNotInManagedAsg: make(map[AwsRef]struct{}), + service: wrapper, + }, + } } func testDescribeAutoScalingGroupsOutput(desiredCap int64, instanceIds ...string) *autoscaling.DescribeAutoScalingGroupsOutput { @@ -129,12 +146,7 @@ func TestNodeGroupForNode(t *testing.T) { }, } service := &AutoScalingMock{} - m := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - } + m := newTestAwsManagerWithService(service) provider := testProvider(t, m) err := provider.addNodeGroup("1:5:test-asg") assert.NoError(t, err) @@ -195,12 +207,7 @@ func TestMinSize(t *testing.T) { func TestTargetSize(t *testing.T) { service := &AutoScalingMock{} - m := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - } + m := newTestAwsManagerWithService(service) provider := testProvider(t, m) err := provider.addNodeGroup("1:5:test-asg") assert.NoError(t, err) @@ -219,12 +226,7 @@ func TestTargetSize(t *testing.T) { func TestIncreaseSize(t *testing.T) { service := &AutoScalingMock{} - m := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - } + m := newTestAwsManagerWithService(service) provider := testProvider(t, m) err := provider.addNodeGroup("1:5:test-asg") assert.NoError(t, err) @@ -249,12 +251,7 @@ func TestIncreaseSize(t *testing.T) { func TestBelongs(t *testing.T) { service := &AutoScalingMock{} - m := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - } + m := newTestAwsManagerWithService(service) provider := testProvider(t, m) err := provider.addNodeGroup("1:5:test-asg") assert.NoError(t, err) @@ -288,12 +285,7 @@ func TestBelongs(t *testing.T) { func TestDeleteNodes(t *testing.T) { service := &AutoScalingMock{} - m := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), - } + m := newTestAwsManagerWithService(service) service.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ InstanceId: aws.String("test-instance-id"), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 02f380f3a5ce..2956b02853a8 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -17,10 +17,8 @@ limitations under the License. package aws import ( - "errors" "fmt" "io" - "sync" "time" "gopkg.in/gcfg.v1" @@ -29,7 +27,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/util/wait" provider_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" ) @@ -44,21 +41,10 @@ type asgInformation struct { basename string } -type autoScaling interface { - DescribeAutoScalingGroups(input *autoscaling.DescribeAutoScalingGroupsInput) (*autoscaling.DescribeAutoScalingGroupsOutput, error) - DescribeTags(input *autoscaling.DescribeTagsInput) (*autoscaling.DescribeTagsOutput, error) - SetDesiredCapacity(input *autoscaling.SetDesiredCapacityInput) (*autoscaling.SetDesiredCapacityOutput, error) - TerminateInstanceInAutoScalingGroup(input *autoscaling.TerminateInstanceInAutoScalingGroupInput) (*autoscaling.TerminateInstanceInAutoScalingGroupOutput, error) -} - // AwsManager is handles aws communication and data caching. type AwsManager struct { - asgs []*asgInformation - asgCache map[AwsRef]*Asg - instancesNotInManagedAsg map[AwsRef]struct{} - - service autoScaling - cacheMutex sync.Mutex + service autoScalingWrapper + asgs *autoScalingGroups } // CreateAwsManager constructs awsManager object. @@ -71,33 +57,29 @@ func CreateAwsManager(configReader io.Reader) (*AwsManager, error) { } } - service := autoscaling.New(session.New()) + service := autoScalingWrapper{ + autoscaling.New(session.New()), + } manager := &AwsManager{ - asgs: make([]*asgInformation, 0), - service: service, - asgCache: make(map[AwsRef]*Asg), - instancesNotInManagedAsg: make(map[AwsRef]struct{}), + asgs: newAutoScalingGroups(service), + service: service, } - go wait.Forever(func() { - manager.cacheMutex.Lock() - defer manager.cacheMutex.Unlock() - if err := manager.regenerateCache(); err != nil { - glog.Errorf("Error while regenerating Asg cache: %v", err) - } - }, time.Hour) - return manager, nil } -// RegisterAsg registers asg in Aws Manager. +// Register registers asg in Aws Manager. func (m *AwsManager) RegisterAsg(asg *Asg) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() + m.asgs.Register(asg) +} + +// GetAsgForInstance returns AsgConfig of the given Instance +func (m *AwsManager) GetAsgForInstance(instance *AwsRef) (*Asg, error) { + return m.asgs.FindForInstance(instance) +} - m.asgs = append(m.asgs, &asgInformation{ - config: asg, - }) +func (m *AwsManager) getAutoscalingGroupsByTag(key string) ([]*autoscaling.Group, error) { + return m.service.getAutoscalingGroupsByTag(key) } // GetAsgSize gets ASG size. @@ -139,12 +121,12 @@ func (m *AwsManager) DeleteInstances(instances []*AwsRef) error { if len(instances) == 0 { return nil } - commonAsg, err := m.GetAsgForInstance(instances[0]) + commonAsg, err := m.asgs.FindForInstance(instances[0]) if err != nil { return err } for _, instance := range instances { - asg, err := m.GetAsgForInstance(instance) + asg, err := m.asgs.FindForInstance(instance) if err != nil { return err } @@ -168,160 +150,10 @@ func (m *AwsManager) DeleteInstances(instances []*AwsRef) error { return nil } -// GetAsgForInstance returns AsgConfig of the given Instance -func (m *AwsManager) GetAsgForInstance(instance *AwsRef) (*Asg, error) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - if config, found := m.asgCache[*instance]; found { - return config, nil - } - if _, found := m.instancesNotInManagedAsg[*instance]; found { - // The instance is already known to not belong to any configured ASG - // Skip regenerateCache so that we won't unnecessarily call DescribeAutoScalingGroups - // See https://github.com/kubernetes/contrib/issues/2541 - return nil, nil - } - if err := m.regenerateCache(); err != nil { - return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err) - } - if config, found := m.asgCache[*instance]; found { - return config, nil - } - // instance does not belong to any configured ASG - glog.V(6).Infof("Instance %+v is not in any ASG managed by CA. CA is now memorizing the fact not to unnecessarily call AWS API afterwards trying to find the unexistent managed ASG for the instance", *instance) - m.instancesNotInManagedAsg[*instance] = struct{}{} - return nil, nil -} - -func (m *AwsManager) regenerateCache() error { - newCache := make(map[AwsRef]*Asg) - - for _, asg := range m.asgs { - glog.V(4).Infof("Regenerating ASG information for %s", asg.config.Name) - - group, err := m.getAutoscalingGroup(asg.config.Name) - if err != nil { - return err - } - for _, instance := range group.Instances { - ref := AwsRef{Name: *instance.InstanceId} - newCache[ref] = asg.config - } - } - - m.asgCache = newCache - return nil -} - -func (m *AwsManager) getAutoscalingGroup(name string) (*autoscaling.Group, error) { - params := &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: []*string{aws.String(name)}, - MaxRecords: aws.Int64(1), - } - groups, err := m.service.DescribeAutoScalingGroups(params) - if err != nil { - glog.V(4).Infof("Failed ASG info request for %s: %v", name, err) - return nil, err - } - if len(groups.AutoScalingGroups) < 1 { - return nil, fmt.Errorf("Unable to get first autoscaling.Group for %s", name) - } - return groups.AutoScalingGroups[0], nil -} - -func (m *AwsManager) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) { - glog.V(6).Infof("Starting getAutoscalingGroupsByNames with names=%v", names) - - nameRefs := []*string{} - for _, n := range names { - nameRefs = append(nameRefs, aws.String(n)) - } - params := &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: nameRefs, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - } - description, err := m.service.DescribeAutoScalingGroups(params) - if err != nil { - glog.V(4).Infof("Failed to describe ASGs : %v", err) - return nil, err - } - if len(description.AutoScalingGroups) < 1 { - return nil, errors.New("No ASGs found") - } - - asgs := description.AutoScalingGroups - for description.NextToken != nil { - description, err = m.service.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{ - NextToken: description.NextToken, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASGs : %v", err) - return nil, err - } - asgs = append(asgs, description.AutoScalingGroups...) - } - - glog.V(6).Infof("Finishing getAutoscalingGroupsByNames asgs=%v", asgs) - - return asgs, nil -} - -func (m *AwsManager) getAutoscalingGroupsByTag(key string) ([]*autoscaling.Group, error) { - glog.V(6).Infof("Starting getAutoscalingGroupsByTag with key=%v", key) - - tags := []*autoscaling.TagDescription{} - - description, err := m.service.DescribeTags(&autoscaling.DescribeTagsInput{ - Filters: []*autoscaling.Filter{ - { - Name: aws.String("key"), - Values: []*string{aws.String(key)}, - }, - }, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASG tags for key %s : %v", key, err) - return nil, err - } - if len(description.Tags) < 1 { - return nil, fmt.Errorf("Unable to find ASGs for tag key %s", key) - } - tags = append(tags, description.Tags...) - - for description.NextToken != nil { - description, err = m.service.DescribeTags(&autoscaling.DescribeTagsInput{ - NextToken: description.NextToken, - MaxRecords: aws.Int64(maxRecordsReturnedByAPI), - }) - if err != nil { - glog.V(4).Infof("Failed to describe ASG tags for key %s: %v", key, err) - return nil, err - } - tags = append(tags, description.Tags...) - } - - asgNames := []string{} - for _, t := range tags { - asgName := t.ResourceId - asgNames = append(asgNames, *asgName) - } - - asgs, err := m.getAutoscalingGroupsByNames(asgNames) - if err != nil { - return nil, err - } - - glog.V(6).Infof("Finishing getAutoscalingGroupsByTag with asgs=%v", asgs) - - return asgs, nil -} - // GetAsgNodes returns Asg nodes. func (m *AwsManager) GetAsgNodes(asg *Asg) ([]string, error) { result := make([]string, 0) - group, err := m.getAutoscalingGroup(asg.Name) + group, err := m.service.getAutoscalingGroupByName(asg.Name) if err != nil { return []string{}, err }