Skip to content

Commit

Permalink
ListOwnedTargets function to return only targets the cluster owns, an…
Browse files Browse the repository at this point in the history
…d a new instance cache
  • Loading branch information
Alex-Waring committed Nov 12, 2023
1 parent ab7de10 commit f5cd1b7
Show file tree
Hide file tree
Showing 2 changed files with 659 additions and 5 deletions.
116 changes: 114 additions & 2 deletions pkg/targetgroupbinding/targets_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,25 @@ package targetgroupbinding

import (
"context"
"net/netip"
"regexp"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
elbv2sdk "github.com/aws/aws-sdk-go/service/elbv2"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/cache"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
"sync"
"time"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
)

const (
defaultTargetsCacheTTL = 5 * time.Minute
defaultRegisterTargetsChunkSize = 200
defaultDeregisterTargetsChunkSize = 200

defaultInstanceCacheTTL = 30 * time.Minute
)

// TargetsManager is an abstraction around ELBV2's targets API.
Expand All @@ -27,6 +33,9 @@ type TargetsManager interface {

// List Targets from TargetGroup.
ListTargets(ctx context.Context, tgARN string) ([]TargetInfo, error)

// List Targets from TargetGroup that the cluster controls.
ListOwnedTargets(ctx context.Context, tgARN string, eksInfoResolver networking.EKSInfoResolver) ([]TargetInfo, error)
}

// NewCachedTargetsManager constructs new cachedTargetsManager
Expand All @@ -35,6 +44,8 @@ func NewCachedTargetsManager(elbv2Client services.ELBV2, logger logr.Logger) *ca
elbv2Client: elbv2Client,
targetsCache: cache.NewExpiring(),
targetsCacheTTL: defaultTargetsCacheTTL,
instanceCache: cache.NewExpiring(),
instanceCacheTTL: defaultInstanceCacheTTL,
registerTargetsChunkSize: defaultRegisterTargetsChunkSize,
deregisterTargetsChunkSize: defaultDeregisterTargetsChunkSize,
logger: logger,
Expand All @@ -59,6 +70,13 @@ type cachedTargetsManager struct {
// targetsCacheMutex protects targetsCache
targetsCacheMutex sync.RWMutex

// Cache of instances by instanceID
// Instances are long lived and once in a cluster are very unlikely to change the cluster they are part of
// We can therefore keep a very aggresive cache and avoid excessive API calls
instanceCache *cache.Expiring
instanceCacheTTL time.Duration
instanceCacheMutex sync.RWMutex

// chunk size for registerTargets API call.
registerTargetsChunkSize int
// chunk size for deregisterTargets API call.
Expand All @@ -75,6 +93,14 @@ type targetsCacheItem struct {
targets []TargetInfo
}

// cache entry for instanceCache
type instanceCacheItem struct {
// mutex protects below fields
mutex sync.RWMutex
// If the instance is in the cluster
cluster bool
}

func (m *cachedTargetsManager) RegisterTargets(ctx context.Context, tgARN string, targets []elbv2sdk.TargetDescription) error {
targetsChunks := chunkTargetDescriptions(targets, m.registerTargetsChunkSize)
for _, targetsChunk := range targetsChunks {
Expand Down Expand Up @@ -145,6 +171,92 @@ func (m *cachedTargetsManager) ListTargets(ctx context.Context, tgARN string) ([
return cloneTargetInfoSlice(refreshedTargets), nil
}

// A copy of ListTargets, except ListOwnedTargets requires an EKSInfoResolver so that it can only return targets that
// are owned by the cluster
func (m *cachedTargetsManager) ListOwnedTargets(ctx context.Context, tgARN string, eksInfoResolver networking.EKSInfoResolver) ([]TargetInfo, error) {
m.targetsCacheMutex.Lock()
defer m.targetsCacheMutex.Unlock()

if rawTargetsCacheItem, exists := m.targetsCache.Get(tgARN); exists {
targetsCacheItem := rawTargetsCacheItem.(*targetsCacheItem)
targetsCacheItem.mutex.Lock()
defer targetsCacheItem.mutex.Unlock()
refreshedTargets, err := m.refreshUnhealthyTargets(ctx, tgARN, targetsCacheItem.targets)
if err != nil {
return nil, err
}
targetsCacheItem.targets = refreshedTargets
return cloneTargetInfoSlice(refreshedTargets), nil
}

refreshedTargets, err := m.refreshAllTargets(ctx, tgARN)
if err != nil {
return nil, err
}
targetsCacheItem := &targetsCacheItem{
mutex: sync.RWMutex{},
targets: refreshedTargets,
}
m.targetsCache.Set(tgARN, targetsCacheItem, m.targetsCacheTTL)

var parsedTargets []TargetInfo
clusterCIDRs, err := eksInfoResolver.ListCIDRs(ctx)
if err != nil {
return nil, err
}

var isInstanceID = regexp.MustCompile("i-[a-z0-9]{17}")

m.instanceCacheMutex.Lock()
defer m.instanceCacheMutex.Unlock()

for _, target := range refreshedTargets {
if isInstanceID.MatchString(*target.Target.Id) {
isInstanceInCluster, err := m.isCachedInstanceInCluster(ctx, target, eksInfoResolver)
if err != nil {
return nil, err
}
if isInstanceInCluster {
parsedTargets = append(parsedTargets, target)
}
} else {
ip, err := netip.ParseAddr(*target.Target.Id)
// Ignore the cases where this errors, the target will be a lambda or ALB ARN, neither of which
// the cluster will have ownership of
if err == nil {
if networking.IsIPWithinCIDRs(ip, clusterCIDRs) {
parsedTargets = append(parsedTargets, target)
}
}
}
}

return cloneTargetInfoSlice(parsedTargets), nil
}

// This handles identifying if an instance is in the cluster, using the cache. Whenver it is called
// we have asserted that the target is an instance.
func (m *cachedTargetsManager) isCachedInstanceInCluster(ctx context.Context, instanceTarget TargetInfo, eksInfoResolver networking.EKSInfoResolver) (bool, error) {
instanceID := instanceTarget.Target.Id
if rawInstanceCacheItem, exists := m.instanceCache.Get(*instanceID); exists {
instanceCacheItem := rawInstanceCacheItem.(*instanceCacheItem)
instanceCacheItem.mutex.Lock()
defer instanceCacheItem.mutex.Unlock()
return instanceCacheItem.cluster, nil
}

isInstanceInCluster, err := eksInfoResolver.IsInstanceInCluster(ctx, *instanceID)
if err != nil {
return false, err
}
instanceCacheItem := &instanceCacheItem{
mutex: sync.RWMutex{},
cluster: isInstanceInCluster,
}
m.instanceCache.Set(*instanceID, instanceCacheItem, m.instanceCacheTTL)
return isInstanceInCluster, nil
}

// refreshAllTargets will refresh all targets for targetGroup.
func (m *cachedTargetsManager) refreshAllTargets(ctx context.Context, tgARN string) ([]TargetInfo, error) {
targets, err := m.listTargetsFromAWS(ctx, tgARN, nil)
Expand Down
Loading

0 comments on commit f5cd1b7

Please sign in to comment.