From 69088e24b80c0fc27110b12c27462ad0c8ba17e3 Mon Sep 17 00:00:00 2001 From: ampsingram Date: Tue, 5 Mar 2019 13:49:41 -0500 Subject: [PATCH] Allow custom AWS region overrides #1707 Replicated changes from kubernetes "Add AWS Custom Endpoint capability #70588" into cluster-autoscaler: - Modified aws_manager snd aws_manager_test similar to kubernetes aws and aws_test. --- .../cloudprovider/aws/aws_manager.go | 201 ++++++++++++- .../cloudprovider/aws/aws_manager_test.go | 274 ++++++++++++++++++ 2 files changed, 465 insertions(+), 10 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 4b9f98c5312c..4d9c4e835fd9 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -22,10 +22,13 @@ import ( "fmt" "io" "math/rand" + "os" "strings" "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/golang/glog" @@ -36,7 +39,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - provider_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) @@ -68,24 +70,188 @@ type asgTemplate struct { Tags []*autoscaling.TagDescription } +// CloudConfig wraps the settings for the AWS cloud provider. +type CloudConfig struct { + Global struct { + // TODO: Is there any use for this? We can get it from the instance metadata service + // Maybe if we're not running on AWS, e.g. bootstrap; for now it is not very useful + Zone string + + // The AWS VPC flag enables the possibility to run the master components + // on a different aws account, on a different cloud provider or on-premises. + // If the flag is set also the KubernetesClusterTag must be provided + VPC string + // SubnetID enables using a specific subnet to use for ELB's + SubnetID string + // RouteTableID enables using a specific RouteTable + RouteTableID string + + // RoleARN is the IAM role to assume when interaction with AWS APIs. + RoleARN string + + // KubernetesClusterTag is the legacy cluster id we'll use to identify our cluster resources + KubernetesClusterTag string + // KubernetesClusterID is the cluster id we'll use to identify our cluster resources + KubernetesClusterID string + + //The aws provider creates an inbound rule per load balancer on the node security + //group. However, this can run into the AWS security group rule limit of 50 if + //many LoadBalancers are created. + // + //This flag disables the automatic ingress creation. It requires that the user + //has setup a rule that allows inbound traffic on kubelet ports from the + //local VPC subnet (so load balancers can access it). E.g. 10.82.0.0/16 30000-32000. + DisableSecurityGroupIngress bool + + //AWS has a hard limit of 500 security groups. For large clusters creating a security group for each ELB + //can cause the max number of security groups to be reached. If this is set instead of creating a new + //Security group for each ELB this security group will be used instead. + ElbSecurityGroup string + + //During the instantiation of an new AWS cloud provider, the detected region + //is validated against a known set of regions. + // + //In a non-standard, AWS like environment (e.g. Eucalyptus), this check may + //be undesirable. Setting this to true will disable the check and provide + //a warning that the check was skipped. Please note that this is an + //experimental feature and work-in-progress for the moment. If you find + //yourself in an non-AWS cloud and open an issue, please indicate that in thei + //issue body. + DisableStrictZoneCheck bool + } + // [ServiceOverride "1"] + // Service = s3 + // Region = region1 + // URL = https://s3.foo.bar + // SigningRegion = signing_region + // SigningMethod = signing_method + // + // [ServiceOverride "2"] + // Service = ec2 + // Region = region2 + // URL = https://ec2.foo.bar + // SigningRegion = signing_region + // SigningMethod = signing_method + ServiceOverride map[string]*struct { + Service string + Region string + URL string + SigningRegion string + SigningMethod string + SigningName string + } +} + +func validateOverrides(cfg *CloudConfig) error { + if len(cfg.ServiceOverride) == 0 { + return nil + } + set := make(map[string]bool) + for onum, ovrd := range cfg.ServiceOverride { + // Note: gcfg does not space trim, so we have to when comparing to empty string "" + name := strings.TrimSpace(ovrd.Service) + if name == "" { + return fmt.Errorf("service name is missing [Service is \"\"] in override %s", onum) + } + // insure the map service name is space trimmed + ovrd.Service = name + + region := strings.TrimSpace(ovrd.Region) + if region == "" { + return fmt.Errorf("service region is missing [Region is \"\"] in override %s", onum) + } + // insure the map region is space trimmed + ovrd.Region = region + + url := strings.TrimSpace(ovrd.URL) + if url == "" { + return fmt.Errorf("url is missing [URL is \"\"] in override %s", onum) + } + signingRegion := strings.TrimSpace(ovrd.SigningRegion) + if signingRegion == "" { + return fmt.Errorf("signingRegion is missing [SigningRegion is \"\"] in override %s", onum) + } + signature := name + "_" + region + if set[signature] { + return fmt.Errorf("duplicate entry found for service override [%s] (%s in %s)", onum, name, region) + } + set[signature] = true + } + return nil +} + +func getResolver(cfg *CloudConfig) endpoints.ResolverFunc { + defaultResolver := endpoints.DefaultResolver() + defaultResolverFn := func(service, region string, + optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + return defaultResolver.EndpointFor(service, region, optFns...) + } + if len(cfg.ServiceOverride) == 0 { + return defaultResolverFn + } + + return func(service, region string, + optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) { + for _, override := range cfg.ServiceOverride { + if override.Service == service && override.Region == region { + return endpoints.ResolvedEndpoint{ + URL: override.URL, + SigningRegion: override.SigningRegion, + SigningMethod: override.SigningMethod, + SigningName: override.SigningName, + }, nil + } + } + return defaultResolver.EndpointFor(service, region, optFns...) + } +} + +type awsSDKProvider struct { + cfg *CloudConfig +} + +func newAWSSDKProvider(cfg *CloudConfig) *awsSDKProvider { + return &awsSDKProvider{ + cfg: cfg, + } +} + +// getRegion deduces the current AWS Region. +func getRegion(cfg ...*aws.Config) string { + region, present := os.LookupEnv("AWS_REGION") + if !present { + svc := ec2metadata.New(session.New(), cfg...) + if r, err := svc.Region(); err == nil { + region = r + } + } + return region +} + // createAwsManagerInternal allows for a customer autoScalingWrapper to be passed in by tests func createAWSManagerInternal( configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, service *autoScalingWrapper, ) (*AwsManager, error) { - if configReader != nil { - var cfg provider_aws.CloudConfig - if err := gcfg.ReadInto(&cfg, configReader); err != nil { - glog.Errorf("Couldn't read config: %v", err) - return nil, err - } + + cfg, err := readAWSCloudConfig(configReader) + if err != nil { + glog.Errorf("Couldn't read config: %v", err) + return nil, err + } + + if err = validateOverrides(cfg); err != nil { + glog.Errorf("Unable to validate custom endpoint overrides: %v", err) + return nil, err } if service == nil { - service = &autoScalingWrapper{ - autoscaling.New(session.New()), - } + awsSdkProvider := newAWSSDKProvider(cfg) + sess := session.New(aws.NewConfig().WithRegion(getRegion()). + WithEndpointResolver(getResolver(awsSdkProvider.cfg))) + + service = &autoScalingWrapper{autoscaling.New(sess)} } cache, err := newASGCache(*service) @@ -116,6 +282,21 @@ func createAWSManagerInternal( return manager, nil } +// readAWSCloudConfig reads an instance of AWSCloudConfig from config reader. +func readAWSCloudConfig(config io.Reader) (*CloudConfig, error) { + var cfg CloudConfig + var err error + + if config != nil { + err = gcfg.ReadInto(&cfg, config) + if err != nil { + return nil, err + } + } + + return &cfg, nil +} + // CreateAwsManager constructs awsManager object. func CreateAwsManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AwsManager, error) { return createAWSManagerInternal(configReader, discoveryOpts, nil) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index aa11a62661b8..c52e35a124a7 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -18,6 +18,7 @@ package aws import ( "fmt" + "io" "strings" "testing" @@ -28,6 +29,7 @@ import ( "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + provider_aws "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" ) @@ -244,3 +246,275 @@ func TestFetchAutoAsgs(t *testing.T) { assert.NoError(t, err) assert.Empty(t, m.asgCache.get()) } + +type ServiceDescriptor struct { + name string + region string + signingRegion, signingMethod string + signingName string +} + +func TestOverridesActiveConfig(t *testing.T) { + tests := []struct { + name string + + reader io.Reader + aws provider_aws.Services + + expectError bool + active bool + servicesOverridden []ServiceDescriptor + }{ + { + "No overrides", + strings.NewReader(` + [global] + `), + nil, + false, false, + []ServiceDescriptor{}, + }, + { + "Missing Service Name", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Region=sregion + URL=https://s3.foo.bar + SigningRegion=sregion + SigningMethod = sign + `), + nil, + true, false, + []ServiceDescriptor{}, + }, + { + "Missing Service Region", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Service=s3 + URL=https://s3.foo.bar + SigningRegion=sregion + SigningMethod = sign + `), + nil, + true, false, + []ServiceDescriptor{}, + }, + { + "Missing URL", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Service="s3" + Region=sregion + SigningRegion=sregion + SigningMethod = sign + `), + nil, + true, false, + []ServiceDescriptor{}, + }, + { + "Missing Signing Region", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Service=s3 + Region=sregion + URL=https://s3.foo.bar + SigningMethod = sign + `), + nil, + true, false, + []ServiceDescriptor{}, + }, + { + "Active Overrides", + strings.NewReader(` + [Global] + [ServiceOverride "1"] + Service = "s3 " + Region = sregion + URL = https://s3.foo.bar + SigningRegion = sregion + SigningMethod = v4 + `), + nil, + false, true, + []ServiceDescriptor{{name: "s3", region: "sregion", signingRegion: "sregion", signingMethod: "v4"}}, + }, + { + "Multiple Overridden Services", + strings.NewReader(` + [Global] + vpc = vpc-abc1234567 + [ServiceOverride "1"] + Service=s3 + Region=sregion1 + URL=https://s3.foo.bar + SigningRegion=sregion1 + SigningMethod = v4 + [ServiceOverride "2"] + Service=ec2 + Region=sregion2 + URL=https://ec2.foo.bar + SigningRegion=sregion2 + SigningMethod = v4 + `), + nil, + false, true, + []ServiceDescriptor{{name: "s3", region: "sregion1", signingRegion: "sregion1", signingMethod: "v4"}, + {name: "ec2", region: "sregion2", signingRegion: "sregion2", signingMethod: "v4"}}, + }, + { + "Duplicate Services", + strings.NewReader(` + [Global] + vpc = vpc-abc1234567 + [ServiceOverride "1"] + Service=s3 + Region=sregion1 + URL=https://s3.foo.bar + SigningRegion=sregion + SigningMethod = sign + [ServiceOverride "2"] + Service=s3 + Region=sregion1 + URL=https://s3.foo.bar + SigningRegion=sregion + SigningMethod = sign + `), + nil, + true, false, + []ServiceDescriptor{}, + }, + { + "Multiple Overridden Services in Multiple regions", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Service=s3 + Region=region1 + URL=https://s3.foo.bar + SigningRegion=sregion1 + [ServiceOverride "2"] + Service=ec2 + Region=region2 + URL=https://ec2.foo.bar + SigningRegion=sregion + SigningMethod = v4 + `), + nil, + false, true, + []ServiceDescriptor{{name: "s3", region: "region1", signingRegion: "sregion1", signingMethod: ""}, + {name: "ec2", region: "region2", signingRegion: "sregion", signingMethod: "v4"}}, + }, + { + "Multiple regions, Same Service", + strings.NewReader(` + [global] + [ServiceOverride "1"] + Service=s3 + Region=region1 + URL=https://s3.foo.bar + SigningRegion=sregion1 + SigningMethod = v3 + [ServiceOverride "2"] + Service=s3 + Region=region2 + URL=https://s3.foo.bar + SigningRegion=sregion1 + SigningMethod = v4 + SigningName = "name" + `), + nil, + false, true, + []ServiceDescriptor{{name: "s3", region: "region1", signingRegion: "sregion1", signingMethod: "v3"}, + {name: "s3", region: "region2", signingRegion: "sregion1", signingMethod: "v4", signingName: "name"}}, + }, + } + + for _, test := range tests { + t.Logf("Running test case %s", test.name) + cfg, err := readAWSCloudConfig(test.reader) + if err == nil { + err = validateOverrides(cfg) + } + if test.expectError { + if err == nil { + t.Errorf("Should error for case %s (cfg=%v)", test.name, cfg) + } + } else { + if err != nil { + t.Errorf("Should succeed for case: %s, got %v", test.name, err) + } + + if len(cfg.ServiceOverride) != len(test.servicesOverridden) { + t.Errorf("Expected %d overridden services, received %d for case %s", + len(test.servicesOverridden), len(cfg.ServiceOverride), test.name) + } else { + for _, sd := range test.servicesOverridden { + var found *struct { + Service string + Region string + URL string + SigningRegion string + SigningMethod string + SigningName string + } + for _, v := range cfg.ServiceOverride { + if v.Service == sd.name && v.Region == sd.region { + found = v + break + } + } + if found == nil { + t.Errorf("Missing override for service %s in case %s", + sd.name, test.name) + } else { + if found.SigningRegion != sd.signingRegion { + t.Errorf("Expected signing region '%s', received '%s' for case %s", + sd.signingRegion, found.SigningRegion, test.name) + } + if found.SigningMethod != sd.signingMethod { + t.Errorf("Expected signing method '%s', received '%s' for case %s", + sd.signingMethod, found.SigningRegion, test.name) + } + targetName := fmt.Sprintf("https://%s.foo.bar", sd.name) + if found.URL != targetName { + t.Errorf("Expected Endpoint '%s', received '%s' for case %s", + targetName, found.URL, test.name) + } + if found.SigningName != sd.signingName { + t.Errorf("Expected signing name '%s', received '%s' for case %s", + sd.signingName, found.SigningName, test.name) + } + + fn := getResolver(cfg) + ep1, e := fn(sd.name, sd.region, nil) + if e != nil { + t.Errorf("Expected a valid endpoint for %s in case %s", + sd.name, test.name) + } else { + targetName := fmt.Sprintf("https://%s.foo.bar", sd.name) + if ep1.URL != targetName { + t.Errorf("Expected endpoint url: %s, received %s in case %s", + targetName, ep1.URL, test.name) + } + if ep1.SigningRegion != sd.signingRegion { + t.Errorf("Expected signing region '%s', received '%s' in case %s", + sd.signingRegion, ep1.SigningRegion, test.name) + } + if ep1.SigningMethod != sd.signingMethod { + t.Errorf("Expected signing method '%s', received '%s' in case %s", + sd.signingMethod, ep1.SigningRegion, test.name) + } + } + } + } + } + } + } +}