Skip to content

Commit

Permalink
operator: support subnets filters
Browse files Browse the repository at this point in the history
The `cilium-operator` start/resync time and memory consumption are excessive in our largest AWS accounts, due to the `GetInstances` regularly fetching each VPC's interfaces.
We run our clusters in dedicated subnets, so the set of relevant interfaces can be easily reduced.

In one of our AWS accounts, fetching all subnets and interfaces requires about 2GB of memory, and takes about 60s.
Filtering on cluster's subnets, we're down to 20MB and 2s for a 130 nodes cluster (or 200MB / 8s for 2300 nodes).

Signed-off-by: Benjamin Pineau <benjamin.pineau@datadoghq.com>
  • Loading branch information
bpineau authored and joestringer committed Apr 9, 2020
1 parent dcbdb1e commit f5425fe
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 11 deletions.
2 changes: 2 additions & 0 deletions Documentation/cmdref/cilium-operator.md
Expand Up @@ -49,6 +49,8 @@ cilium-operator [flags]
--operator-api-serve-addr string Address to serve API requests (default "localhost:9234")
--operator-prometheus-serve-addr string Address to serve Prometheus metrics (default ":6942")
--parallel-alloc-workers int Maximum number of parallel IPAM workers (default 50)
--subnet-ids-filter strings Subnets IDs (separated by commas)
--subnet-tags-filter stringToString Subnets tags in the form of k1=v1,k2=v2 (multiple k/v pairs can also be passed by repeating the CLI flag (default [])
--synchronize-k8s-nodes Synchronize Kubernetes nodes to kvstore and perform CNP GC (default true)
--synchronize-k8s-services Synchronize Kubernetes services to kvstore (default true)
--unmanaged-pod-watcher-interval int Interval to check for unmanaged kube-dns pods (0 to disable) (default 15)
Expand Down
6 changes: 6 additions & 0 deletions operator/flags.go
Expand Up @@ -68,6 +68,12 @@ func init() {
option.ENITags, "ENI tags in the form of k1=v1 (multiple k/v pairs can be passed by repeating the CLI flag)")
option.BindEnv(option.ENITags)

flags.StringToStringVar(&option.Config.IPAMSubnetsTags, option.IPAMSubnetsTags, option.Config.IPAMSubnetsTags,
"Subnets tags in the form of k1=v1,k2=v2 (multiple k/v pairs can also be passed by repeating the CLI flag")
option.BindEnv(option.IPAMSubnetsTags)
flags.StringSliceVar(&option.Config.IPAMSubnetsIDs, option.IPAMSubnetsIDs, option.Config.IPAMSubnetsIDs, "Subnets IDs (separated by commas)")
option.BindEnv(option.IPAMSubnetsIDs)

flags.Int64(option.ENIParallelWorkersDeprecated, defaults.ParallelAllocWorkers, "")
flags.MarkDeprecated(option.ENIParallelWorkersDeprecated, fmt.Sprintf("please use --%s", option.ParallelAllocWorkers))
option.BindEnv(option.ENIParallelWorkersDeprecated)
Expand Down
61 changes: 51 additions & 10 deletions pkg/aws/ec2/ec2.go
Expand Up @@ -31,9 +31,10 @@ import (

// Client represents an EC2 API client
type Client struct {
ec2Client *ec2.Client
limiter *helpers.ApiLimiter
metricsAPI MetricsAPI
ec2Client *ec2.Client
limiter *helpers.ApiLimiter
metricsAPI MetricsAPI
subnetsFilters []ec2.Filter
}

// MetricsAPI represents the metrics maintained by the AWS API client
Expand All @@ -43,14 +44,37 @@ type MetricsAPI interface {
}

// NewClient returns a new EC2 client
func NewClient(ec2Client *ec2.Client, metrics MetricsAPI, rateLimit float64, burst int) *Client {
func NewClient(ec2Client *ec2.Client, metrics MetricsAPI, rateLimit float64, burst int, subnetsFilters []ec2.Filter) *Client {
return &Client{
ec2Client: ec2Client,
metricsAPI: metrics,
limiter: helpers.NewApiLimiter(metrics, rateLimit, burst),
ec2Client: ec2Client,
metricsAPI: metrics,
limiter: helpers.NewApiLimiter(metrics, rateLimit, burst),
subnetsFilters: subnetsFilters,
}
}

// NewSubnetsFilters transforms a map of tags and values and a slice of subnets
// into a slice of ec2.Filter adequate to filter AWS subnets.
func NewSubnetsFilters(tags map[string]string, ids []string) []ec2.Filter {
filters := make([]ec2.Filter, 0, len(tags)+1)

for k, v := range tags {
filters = append(filters, ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", k)),
Values: []string{v},
})
}

if len(ids) > 0 {
filters = append(filters, ec2.Filter{
Name: aws.String("subnet-id"),
Values: ids,
})
}

return filters
}

// deriveStatus returns a status string based on the HTTP response provided by
// the AWS API server. If no specific status is provided, either "OK" or
// "Failed" is returned based on the error variable.
Expand All @@ -67,9 +91,10 @@ func deriveStatus(req *aws.Request, err error) string {
}

// describeNetworkInterfaces lists all ENIs
func (c *Client) describeNetworkInterfaces(ctx context.Context) ([]ec2.NetworkInterface, error) {
func (c *Client) describeNetworkInterfaces(ctx context.Context, subnets ipamTypes.SubnetMap) ([]ec2.NetworkInterface, error) {
var (
networkInterfaces []ec2.NetworkInterface
interfacesFilters []ec2.Filter
nextToken string
)

Expand All @@ -80,6 +105,18 @@ func (c *Client) describeNetworkInterfaces(ctx context.Context) ([]ec2.NetworkIn
req.NextToken = &nextToken
}

if len(c.subnetsFilters) > 0 {
subnetsIDs := make([]string, 0, len(subnets))
for id := range subnets {
subnetsIDs = append(subnetsIDs, id)
}
interfacesFilters = append(interfacesFilters, ec2.Filter{
Name: aws.String("subnet-id"),
Values: subnetsIDs,
})
req.Filters = interfacesFilters
}

sinceStart := spanstat.Start()
listReq := c.ec2Client.DescribeNetworkInterfacesRequest(req)
response, err := listReq.Send(ctx)
Expand Down Expand Up @@ -176,7 +213,7 @@ func parseENI(iface *ec2.NetworkInterface, vpcs ipamTypes.VirtualNetworkMap, sub
func (c *Client) GetInstances(ctx context.Context, vpcs ipamTypes.VirtualNetworkMap, subnets ipamTypes.SubnetMap) (*ipamTypes.InstanceMap, error) {
instances := ipamTypes.NewInstanceMap()

networkInterfaces, err := c.describeNetworkInterfaces(ctx)
networkInterfaces, err := c.describeNetworkInterfaces(ctx, subnets)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -242,7 +279,11 @@ func (c *Client) describeSubnets(ctx context.Context) ([]ec2.Subnet, error) {
c.limiter.Limit(ctx, "DescribeSubnets")

sinceStart := spanstat.Start()
listReq := c.ec2Client.DescribeSubnetsRequest(&ec2.DescribeSubnetsInput{})
reqInput := &ec2.DescribeSubnetsInput{}
if len(c.subnetsFilters) > 0 {
reqInput.Filters = c.subnetsFilters
}
listReq := c.ec2Client.DescribeSubnetsRequest(reqInput)
result, err := listReq.Send(ctx)
c.metricsAPI.ObserveAPICall("DescribeSubnets", deriveStatus(listReq.Request, err), sinceStart.Seconds())
if err != nil {
Expand Down
116 changes: 116 additions & 0 deletions pkg/aws/ec2/ec2_test.go
@@ -0,0 +1,116 @@
// Copyright 2020 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !privileged_tests

package ec2

import (
"reflect"
"sort"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
)

type Filters []ec2.Filter

func (s Filters) Len() int { return len(s) }
func (s Filters) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s Filters) Less(i, j int) bool { return strings.Compare(*s[i].Name, *s[j].Name) > 0 }

func TestNewSubnetsFilters(t *testing.T) {
type args struct {
tags map[string]string
ids []string
}
tests := []struct {
name string
args args
want []ec2.Filter
}{

{
name: "empty arguments",
args: args{
tags: map[string]string{},
ids: []string{},
},
want: []ec2.Filter{},
},

{
name: "ids only",
args: args{
tags: map[string]string{},
ids: []string{"a", "b"},
},
want: []ec2.Filter{
{
Name: aws.String("subnet-id"),
Values: []string{"a", "b"},
},
},
},

{
name: "tags only",
args: args{
tags: map[string]string{"a": "b", "c": "d"},
ids: []string{},
},
want: []ec2.Filter{
{
Name: aws.String("tag:a"),
Values: []string{"b"},
},
{
Name: aws.String("tag:c"),
Values: []string{"d"},
},
},
},

{
name: "tags and ids",
args: args{
tags: map[string]string{"a": "b"},
ids: []string{"c", "d"},
},
want: []ec2.Filter{
{
Name: aws.String("tag:a"),
Values: []string{"b"},
},
{
Name: aws.String("subnet-id"),
Values: []string{"c", "d"},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewSubnetsFilters(tt.args.tags, tt.args.ids)
sort.Sort(Filters(got))
sort.Sort(Filters(tt.want))
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewSubnetsFilters() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 2 additions & 1 deletion pkg/ipam/allocator/aws/aws.go
Expand Up @@ -90,7 +90,8 @@ func (*AllocatorAWS) Start(getterUpdater ipam.CiliumNodeGetterUpdater) (*ipam.No
iMetrics = &ipamMetrics.NoOpMetrics{}
}

ec2Client := ec2shim.NewClient(ec2.New(cfg), aMetrics, option.Config.IPAMAPIQPSLimit, option.Config.IPAMAPIBurst)
subnetsFilters := ec2shim.NewSubnetsFilters(option.Config.IPAMSubnetsTags, option.Config.IPAMSubnetsIDs)
ec2Client := ec2shim.NewClient(ec2.New(cfg), aMetrics, option.Config.IPAMAPIQPSLimit, option.Config.IPAMAPIBurst, subnetsFilters)
log.Info("Connected to EC2 service API")
instances := eni.NewInstancesManager(ec2Client, option.Config.ENITags)
nodeManager, err := ipam.NewNodeManager(instances, getterUpdater, iMetrics,
Expand Down
22 changes: 22 additions & 0 deletions pkg/option/config.go
Expand Up @@ -719,6 +719,12 @@ const (
// IPAMAPIBurst is the burst value allowed when accessing external IPAM APIs
IPAMAPIBurst = "limit-ipam-api-burst"

// IPAMSubnetsIDs are optional subnets IDs used to filter subnets and interfaces listing
IPAMSubnetsIDs = "subnet-ids-filter"

// IAPMSubnetsTags are optional tags used to filter subnets, and interfaces within those subnets
IPAMSubnetsTags = "subnet-tags-filter"

// AWSClientBurstDeprecated is the deprecated version of IPAMAPIBurst and will be rewmoved in v1.9
AWSClientBurstDeprecated = "aws-client-burst"

Expand Down Expand Up @@ -1851,6 +1857,12 @@ type DaemonConfig struct {
// IPAMAPIBurst is the burst value allowed when accessing external IPAM APIs
IPAMAPIBurst int

// IPAMSubnetsIDs are optional subnets IDs used to filter subnets and interfaces listing
IPAMSubnetsIDs []string

// IPAMSubnetsTags are optional tags used to filter subnets, and interfaces within those subnets
IPAMSubnetsTags map[string]string

// AWS options

// ENITags are the tags that will be added to every ENI created by the AWS ENI IPAM
Expand Down Expand Up @@ -1932,6 +1944,8 @@ var (
EnableL7Proxy: defaults.EnableL7Proxy,
EndpointStatus: make(map[string]struct{}),
ENITags: make(map[string]string),
IPAMSubnetsIDs: make([]string, 0),
IPAMSubnetsTags: make(map[string]string),
ToFQDNsMaxIPsPerHost: defaults.ToFQDNsMaxIPsPerHost,
KVstorePeriodicSync: defaults.KVstorePeriodicSync,
KVstoreConnectivityTimeout: defaults.KVstoreConnectivityTimeout,
Expand Down Expand Up @@ -2508,6 +2522,14 @@ func (c *DaemonConfig) Populate() {
c.ENITags = m
}

if m := viper.GetStringMapString(IPAMSubnetsTags); len(m) != 0 {
c.IPAMSubnetsTags = m
}

if m := viper.GetStringSlice(IPAMSubnetsIDs); len(m) != 0 {
c.IPAMSubnetsIDs = m
}

if m := viper.GetStringMapString(LogOpt); len(m) != 0 {
c.LogOpt = m
}
Expand Down

0 comments on commit f5425fe

Please sign in to comment.