diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go index d7f0e977e4f51..541ee26011d2b 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go @@ -904,12 +904,28 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e // Implements EC2.DescribeSecurityGroups func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { - // Security groups are not paged - response, err := s.ec2.DescribeSecurityGroups(request) - if err != nil { - return nil, fmt.Errorf("error listing AWS security groups: %q", err) + // Security groups are paged + results := []*ec2.SecurityGroup{} + var nextToken *string + requestTime := time.Now() + for { + response, err := s.ec2.DescribeSecurityGroups(request) + if err != nil { + recordAWSMetric("describe_security_groups", 0, err) + return nil, fmt.Errorf("error listing AWS security groups: %q", err) + } + + results = append(results, response.SecurityGroups...) + + nextToken = response.NextToken + if aws.StringValue(nextToken) == "" { + break + } + request.NextToken = nextToken } - return response.SecurityGroups, nil + timeTaken := time.Since(requestTime).Seconds() + recordAWSMetric("describe_security_groups", timeTaken, nil) + return results, nil } func (s *awsSdkEC2) AttachVolume(request *ec2.AttachVolumeInput) (*ec2.VolumeAttachment, error) { @@ -1034,12 +1050,27 @@ func (s *awsSdkEC2) CreateTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOut } func (s *awsSdkEC2) DescribeRouteTables(request *ec2.DescribeRouteTablesInput) ([]*ec2.RouteTable, error) { - // Not paged - response, err := s.ec2.DescribeRouteTables(request) - if err != nil { - return nil, fmt.Errorf("error listing AWS route tables: %q", err) + results := []*ec2.RouteTable{} + var nextToken *string + requestTime := time.Now() + for { + response, err := s.ec2.DescribeRouteTables(request) + if err != nil { + recordAWSMetric("describe_route_tables", 0, err) + return nil, fmt.Errorf("error listing AWS route tables: %q", err) + } + + results = append(results, response.RouteTables...) + + nextToken = response.NextToken + if aws.StringValue(nextToken) == "" { + break + } + request.NextToken = nextToken } - return response.RouteTables, nil + timeTaken := time.Since(requestTime).Seconds() + recordAWSMetric("describe_route_tables", timeTaken, nil) + return results, nil } func (s *awsSdkEC2) CreateRoute(request *ec2.CreateRouteInput) (*ec2.CreateRouteOutput, error) { @@ -1573,13 +1604,32 @@ func (c *Cloud) GetCandidateZonesForDynamicVolume() (sets.String, error) { // TODO: Caching / expose v1.Nodes to the cloud provider? // TODO: We could also query for subnets, I think - filters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} + // Note: It is more efficient to call the EC2 API twice with different tag + // filters than to call it once with a tag filter that results in a logical + // OR. For really large clusters the logical OR will result in EC2 API rate + // limiting. + instances := []*ec2.Instance{} - instances, err := c.describeInstances(filters) + baseFilters := []*ec2.Filter{newEc2Filter("instance-state-name", "running")} + + filters := c.tagging.addFilters(baseFilters) + di, err := c.describeInstances(filters) if err != nil { return nil, err } + instances = append(instances, di...) + + if c.tagging.usesLegacyTags { + filters = c.tagging.addLegacyFilters(baseFilters) + di, err = c.describeInstances(filters) + if err != nil { + return nil, err + } + + instances = append(instances, di...) + } + if len(instances) == 0 { return nil, fmt.Errorf("no instances returned") } @@ -3022,17 +3072,16 @@ func (c *Cloud) ensureSecurityGroup(name string, description string, additionalT for { attempt++ - request := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ - newEc2Filter("group-name", name), - newEc2Filter("vpc-id", c.vpcID), - } // Note that we do _not_ add our tag filters; group-name + vpc-id is the EC2 primary key. // However, we do check that it matches our tags. // If it doesn't have any tags, we tag it; this is how we recover if we failed to tag before. // If it has a different cluster's tags, that is an error. // This shouldn't happen because name is expected to be globally unique (UUID derived) - request.Filters = filters + request := &ec2.DescribeSecurityGroupsInput{} + request.Filters = []*ec2.Filter{ + newEc2Filter("group-name", name), + newEc2Filter("vpc-id", c.vpcID), + } securityGroups, err := c.ec2.DescribeSecurityGroups(request) if err != nil { @@ -3108,8 +3157,7 @@ func findTag(tags []*ec2.Tag, key string) (string, bool) { // However, in future this will likely be treated as an error. func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) { request := &ec2.DescribeSubnetsInput{} - filters := []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)} - request.Filters = c.tagging.addFilters(filters) + request.Filters = []*ec2.Filter{newEc2Filter("vpc-id", c.vpcID)} subnets, err := c.ec2.DescribeSubnets(request) if err != nil { @@ -3131,8 +3179,7 @@ func (c *Cloud) findSubnets() ([]*ec2.Subnet, error) { klog.Warningf("No tagged subnets found; will fall-back to the current subnet only. This is likely to be an error in a future version of k8s.") request = &ec2.DescribeSubnetsInput{} - filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)} - request.Filters = filters + request.Filters = []*ec2.Filter{newEc2Filter("subnet-id", c.selfAWSInstance.subnetID)} subnets, err = c.ec2.DescribeSubnets(request) if err != nil { @@ -3888,7 +3935,6 @@ func findSecurityGroupForInstance(instance *ec2.Instance, taggedSecurityGroups m // Return all the security groups that are tagged as being part of our cluster func (c *Cloud) getTaggedSecurityGroups() (map[string]*ec2.SecurityGroup, error) { request := &ec2.DescribeSecurityGroupsInput{} - request.Filters = c.tagging.addFilters(nil) groups, err := c.ec2.DescribeSecurityGroups(request) if err != nil { return nil, fmt.Errorf("error querying security groups: %q", err) @@ -3937,10 +3983,9 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer var actualGroups []*ec2.SecurityGroup { describeRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ + describeRequest.Filters = []*ec2.Filter{ newEc2Filter("ip-permission.group-id", loadBalancerSecurityGroupID), } - describeRequest.Filters = c.tagging.addFilters(filters) response, err := c.ec2.DescribeSecurityGroups(describeRequest) if err != nil { return fmt.Errorf("error querying security groups for ELB: %q", err) @@ -4098,10 +4143,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin { // Server side filter describeRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ + describeRequest.Filters = []*ec2.Filter{ newEc2Filter("ip-permission.protocol", "tcp"), } - describeRequest.Filters = c.tagging.addFilters(filters) response, err := c.ec2.DescribeSecurityGroups(describeRequest) if err != nil { return fmt.Errorf("Error querying security groups for NLB: %q", err) @@ -4229,10 +4273,9 @@ func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName strin var loadBalancerSGs = aws.StringValueSlice(lb.SecurityGroups) describeRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ + describeRequest.Filters = []*ec2.Filter{ newEc2Filter("group-id", loadBalancerSGs...), } - describeRequest.Filters = c.tagging.addFilters(filters) response, err := c.ec2.DescribeSecurityGroups(describeRequest) if err != nil { return fmt.Errorf("error querying security groups for ELB: %q", err) @@ -4444,7 +4487,6 @@ func (c *Cloud) getInstancesByNodeNames(nodeNames []string, states ...string) ([ // TODO: Move to instanceCache func (c *Cloud) describeInstances(filters []*ec2.Filter) ([]*ec2.Instance, error) { - filters = c.tagging.addFilters(filters) request := &ec2.DescribeInstancesInput{ Filters: filters, } diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go index cddf970359b66..3d6b50840d748 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_loadbalancer.go @@ -935,10 +935,10 @@ func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, in { // Server side filter describeRequest := &ec2.DescribeSecurityGroupsInput{} - filters := []*ec2.Filter{ + describeRequest.Filters = []*ec2.Filter{ newEc2Filter("ip-permission.protocol", "tcp"), + newEc2Filter("vpc-id", c.vpcID), } - describeRequest.Filters = c.tagging.addFilters(filters) response, err := c.ec2.DescribeSecurityGroups(describeRequest) if err != nil { return fmt.Errorf("Error querying security groups for NLB: %q", err) diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go index 2827596dce49b..36e6696f2ca19 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_routes.go @@ -42,7 +42,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) { tables = response } else { - request := &ec2.DescribeRouteTablesInput{Filters: c.tagging.addFilters(nil)} + request := &ec2.DescribeRouteTablesInput{} response, err := c.ec2.DescribeRouteTables(request) if err != nil { return nil, err diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/tags.go b/staging/src/k8s.io/legacy-cloud-providers/aws/tags.go index de6cad543e35a..7e6ad916400a5 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/tags.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/tags.go @@ -247,9 +247,32 @@ func (t *awsTagging) addFilters(filters []*ec2.Filter) []*ec2.Filter { } return filters } - // For 1.6, we always recognize the legacy tag, for the 1.5 -> 1.6 upgrade - // There are no "or" filters by key, so we look for both the legacy and new key, and then we have to post-filter - f := newEc2Filter("tag-key", TagNameKubernetesClusterLegacy, t.clusterTagKey()) + + f := newEc2Filter("tag-key", t.clusterTagKey()) + + // We can't pass a zero-length Filters to AWS (it's an error) + // So if we end up with no filters; we need to return nil + filters = append(filters, f) + return filters +} + +// Add additional filters, to match on our tags. This uses the tag for legacy +// 1.5 -> 1.6 clusters and exists for backwards compatibility +// +// This lets us run multiple k8s clusters in a single EC2 AZ +func (t *awsTagging) addLegacyFilters(filters []*ec2.Filter) []*ec2.Filter { + // if there are no clusterID configured - no filtering by special tag names + // should be applied to revert to legacy behaviour. + if len(t.ClusterID) == 0 { + if len(filters) == 0 { + // We can't pass a zero-length Filters to AWS (it's an error) + // So if we end up with no filters; just return nil + return nil + } + return filters + } + + f := newEc2Filter(fmt.Sprintf("tag:%s", TagNameKubernetesClusterLegacy), t.ClusterID) // We can't pass a zero-length Filters to AWS (it's an error) // So if we end up with no filters; we need to return nil