Skip to content

Commit

Permalink
Stop pruning the desired targets by the health api call, this strippe…
Browse files Browse the repository at this point in the history
…d out all new nodes from desired state. Instead, when fetching all of the nodes from the cluster, only grab the "running" nodes and cache those API calls for a few minutes
  • Loading branch information
bigkraig committed Jul 31, 2018
1 parent 54f942d commit 8f0e403
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 25 deletions.
2 changes: 1 addition & 1 deletion internal/alb/tg/targetgroup.go
Expand Up @@ -53,6 +53,7 @@ func NewDesiredTargetGroup(o *NewDesiredTargetGroupOptions) *TargetGroup {

n := fmt.Sprintf("%s-%s", o.SvcName, hex.EncodeToString(hasher.Sum(nil)))
id := fmt.Sprintf("%.12s-%.19s", o.Store.GetConfig().ALBNamePrefix, n)
id = strings.TrimRight(id, "-")

// TODO: Quick fix as we can't have the loadbalancer and target groups share pointers to the same
// tags. Each modify tags individually and can cause bad side-effects.
Expand Down Expand Up @@ -439,7 +440,6 @@ func (t *TargetGroup) registerTargets(additions albelbv2.TargetDescriptions, rOp

if _, err := albelbv2.ELBV2svc.RegisterTargets(in); err != nil {
// Flush the cached health of the TG so that on the next iteration it will get fresh data, these change often
albelbv2.ELBV2svc.CacheDelete(albelbv2.DescribeTargetGroupTargetsForArnCache, *t.CurrentARN())
return err
}

Expand Down
9 changes: 0 additions & 9 deletions internal/alb/tg/targetgroups.go
Expand Up @@ -172,15 +172,6 @@ func NewDesiredTargetGroups(o *NewDesiredTargetGroupsOptions) (TargetGroups, err
// If this target group is already defined, copy the current state to our new TG
if i, _ := o.ExistingTargetGroups.FindById(targetGroup.ID); i >= 0 {
output[i].copyDesiredState(targetGroup)

// If there is a current TG ARN we can use it to purge the desired targets of unready instances
if output[i].CurrentARN() != nil && *tgAnnotations.TargetGroup.TargetType == "instance" {
desired, err := albelbv2.ELBV2svc.DescribeTargetGroupTargetsForArn(output[i].CurrentARN(), output[i].targets.desired)
if err != nil {
return nil, err
}
output[i].targets.desired = desired
}
} else {
output = append(output, targetGroup)
}
Expand Down
36 changes: 36 additions & 0 deletions internal/aws/albec2/ec2.go
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/golang/glog"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws/albrgt"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/pkg/util/log"

Expand All @@ -33,6 +34,8 @@ const (

GetSecurityGroupsCacheTTL = time.Minute * 60
GetSubnetsCacheTTL = time.Minute * 60

IsNodeHealthyCacheTTL = time.Minute * 5
)

// EC2svc is a pointer to the awsutil EC2 service
Expand Down Expand Up @@ -887,3 +890,36 @@ func subnetIsUsable(new *ec2.Subnet, existing []*ec2.Subnet) bool {
}
return true
}

// IsNodeHealthy returns true if the node is ready
func (e *EC2) IsNodeHealthy(instanceid string) bool {
cacheName := "ec2.IsNodeHealthy"
item := albcache.Get(cacheName, instanceid)

if item != nil {
return item.Value().(bool)
}

in := &ec2.DescribeInstanceStatusInput{
InstanceIds: []*string{aws.String(instanceid)},
}
o, err := e.DescribeInstanceStatus(in)
if err != nil {
glog.Errorf("Unable to fetch instance health for %s", instanceid)
return false
}

for _, instanceStatus := range o.InstanceStatuses {
if *instanceStatus.InstanceId != instanceid {
continue
}
if *instanceStatus.InstanceState.Code == 16 { // running
albcache.Set(cacheName, instanceid, true, IsNodeHealthyCacheTTL)
return true
}
albcache.Set(cacheName, instanceid, false, IsNodeHealthyCacheTTL)
return false
}

return false
}
2 changes: 1 addition & 1 deletion internal/aws/albelbv2/dummy.go
Expand Up @@ -34,7 +34,7 @@ func (d *Dummy) UpdateTags(arn *string, old util.ELBv2Tags, new util.ELBv2Tags)
func (d *Dummy) RemoveTargetGroup(arn *string) error { return nil }

// DescribeTargetGroupTargetsForArn ...
func (d *Dummy) DescribeTargetGroupTargetsForArn(arn *string, targets ...TargetDescriptions) (TargetDescriptions, error) {
func (d *Dummy) DescribeTargetGroupTargetsForArn(arn *string) (TargetDescriptions, error) {
return nil, nil
}

Expand Down
20 changes: 6 additions & 14 deletions internal/aws/albelbv2/elbv2.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws/albec2"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws/albrgt"

"github.com/kubernetes-sigs/aws-alb-ingress-controller/pkg/util/log"
util "github.com/kubernetes-sigs/aws-alb-ingress-controller/pkg/util/types"
)

Expand All @@ -47,7 +46,7 @@ type ELBV2API interface {
ClusterTargetGroups() (map[string][]*elbv2.TargetGroup, error)
UpdateTags(arn *string, old util.ELBv2Tags, new util.ELBv2Tags) error
RemoveTargetGroup(arn *string) error
DescribeTargetGroupTargetsForArn(arn *string, targets ...TargetDescriptions) (TargetDescriptions, error)
DescribeTargetGroupTargetsForArn(arn *string) (TargetDescriptions, error)
RemoveListener(arn *string) error
DescribeListenersForLoadBalancer(loadBalancerArn *string) ([]*elbv2.Listener, error)
Status() func() error
Expand Down Expand Up @@ -435,9 +434,9 @@ func (e *ELBV2) CacheDelete(cacheName, key string) {
}

// DescribeTargetGroupTargetsForArn looks up target group targets by an ARN.
func (e *ELBV2) DescribeTargetGroupTargetsForArn(arn *string, targets ...TargetDescriptions) (result TargetDescriptions, err error) {
func (e *ELBV2) DescribeTargetGroupTargetsForArn(arn *string) (result TargetDescriptions, err error) {
cacheName := DescribeTargetGroupTargetsForArnCache
key := *arn + "." + log.Prettify(targets)
key := *arn
item := albcache.Get(cacheName, key)

if item != nil {
Expand All @@ -449,24 +448,17 @@ func (e *ELBV2) DescribeTargetGroupTargetsForArn(arn *string, targets ...TargetD
opts := &elbv2.DescribeTargetHealthInput{
TargetGroupArn: arn,
}
for _, target := range targets {
opts.Targets = append(opts.Targets, target...)
}

targetHealth, err = e.DescribeTargetHealth(opts)
if err != nil {
return
}
for _, targetHealthDescription := range targetHealth.TargetHealthDescriptions {
switch aws.StringValue(targetHealthDescription.TargetHealth.State) {
case elbv2.TargetHealthStateEnumDraining:
// We don't need to count this instance
default:
result = append(result, targetHealthDescription.Target)
}
result = append(result, targetHealthDescription.Target)
}
result = result.Sorted()

albcache.Set(cacheName, key, result, time.Minute*5)
albcache.Set(cacheName, key, result, time.Minute*1)
return
}

Expand Down
4 changes: 4 additions & 0 deletions internal/ingress/controller/store/store.go
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws/albec2"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws/albelbv2"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -623,6 +624,9 @@ func (s *k8sStore) GetTargets(mode *string, namespace string, svc string, port *

if *mode == "instance" {
for _, node := range s.ListNodes() {
if !albec2.EC2svc.IsNodeHealthy(s.GetNodeInstanceId(node)) {
continue
}
result = append(result,
&elbv2.TargetDescription{
Id: aws.String(s.GetNodeInstanceId(node)),
Expand Down
4 changes: 4 additions & 0 deletions internal/ingress/status/status.go
Expand Up @@ -223,6 +223,10 @@ func runUpdate(ing *extensions.Ingress, client clientset.Interface, rc *ingress.
return true, nil
}

if len(current) == 0 {
return nil, nil
}

ingClient := client.ExtensionsV1beta1().Ingresses(ing.Namespace)

currIng, err := ingClient.Get(ing.Name, metav1.GetOptions{})
Expand Down

0 comments on commit 8f0e403

Please sign in to comment.