Skip to content

Commit

Permalink
feat: alb-ingress-controller integration
Browse files Browse the repository at this point in the history
The algorithm is enhanced to add `alpha.service-controller.kubernetes.io/exclude-balancer: true` before de-registering the node, so that node-detacher will not race with alb-ingress-controller trying to reconcile the target to be added back.
  • Loading branch information
mumoshu committed Mar 2, 2020
1 parent 4f851c8 commit a5519e8
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 17 deletions.
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,19 @@ With this application in place, the overall node shutdown process with Cluster A
- ELB(s) still gradually stop directing the traffic to the nodes. The backend Kubernetes service and pods will starst to receive less and less traffic.
- ELB(s) stops directing traffic as the EC2 instances are detached. Application processes running inside pods can safely terminates

## Algorithm

`node-detacher` runs the following steps in a control loop:

- On `Node` resource change -
- Is the node exists?
- No -> The node is already terminated. We have nothing to do no matter if it's properly detached from LBs or not. Exit this loop.
- Is the node is unschedulable?
- No -> The node is not scheduled for termination. Exit this loop.
- Is the node has condition `NodeDetatching` set to `True`?
- Yes -> The node is already scheduled for detachment/deregistration. All we need is to hold on and wish the node to properly deregistered from LBs in time. Ecit the loop.
- Deregister the node from all the target groups, CLBs and ASGs

## Recommended Usage

- Run [`aws-asg-roller`](https://github.com/deitch/aws-asg-roller) along with `node-detacher` in order to avoid potential downtime due to ELB not reacting to node termination fast enough
Expand Down
64 changes: 58 additions & 6 deletions aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elb/elbiface"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -82,9 +83,9 @@ func getIdToASGs(svc autoscalingiface.AutoScalingAPI, ids []string) (map[string]
return idToASGs, nil
}

func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, error) {
func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, map[string]map[string][]elbv2.TargetDescription, error) {
if len(ids) == 0 {
return nil, nil
return nil, nil, nil
}

tgInput := &elbv2.DescribeTargetGroupsInput{
Expand All @@ -98,17 +99,19 @@ func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, err
return !lastPage
})
if err != nil {
return nil, fmt.Errorf("Unable to get description for node %v: %v", ids, err)
return nil, nil, fmt.Errorf("Unable to get description for node %v: %v", ids, err)
}

idToTGs := map[string][]string{}

idToTDs := map[string]map[string][]elbv2.TargetDescription{}

for _, tg := range tgs {
output, err := svc.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{
TargetGroupArn: tg.TargetGroupArn,
})
if err != nil {
return nil, err
return nil, nil, err
}

for _, desc := range output.TargetHealthDescriptions {
Expand All @@ -118,11 +121,23 @@ func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, err
idToTGs[id] = []string{}
}

idToTGs[id] = append(idToTGs[id], *tg.TargetGroupArn)
arn := *tg.TargetGroupArn

idToTGs[id] = append(idToTGs[id], arn)

if _, ok := idToTDs[id]; !ok {
idToTDs[id] = map[string][]elbv2.TargetDescription{}
}

if _, ok := idToTDs[id][arn]; !ok {
idToTDs[id][arn] = []elbv2.TargetDescription{}
}

idToTDs[id][arn] = append(idToTDs[id][arn], *desc.Target)
}
}

return idToTGs, nil
return idToTGs, idToTDs, nil
}

func detachInstancesFromASGs(svc autoscalingiface.AutoScalingAPI, asgName string, instanceIDs []string) error {
Expand Down Expand Up @@ -190,6 +205,43 @@ func deregisterInstancesFromCLBs(svc elbiface.ELBAPI, lbName string, instanceIDs
return nil
}

func deregisterInstanceFromTG(svc elbv2iface.ELBV2API, tgName string, instanceID string, port string) error {
descs := []*elbv2.TargetDescription{}

portNum, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("invalid port %q: %w", port, err)
}

descs = append(descs, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(int64(portNum)),
})

input := &elbv2.DeregisterTargetsInput{
TargetGroupArn: aws.String(tgName),
Targets: descs,
}

// See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_DetachInstances.html for the API spec
if _, err := svc.DeregisterTargets(input); err != nil {
if aerr, ok := err.(awserr.Error); ok {

switch aerr.Code() {
case autoscaling.ErrCodeResourceContentionFault:
return fmt.Errorf("Could not deregister targets, any resource is in contention, will try in next loop")
default:
return fmt.Errorf("Unknown aws error when deregistering targets: %v", aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
return fmt.Errorf("Unknown non-aws error when deregistering targets: %v", err.Error())
}
}
return nil
}

func deregisterInstancesFromTGs(svc elbv2iface.ELBV2API, tgName string, instanceIDs []string) error {
descs := []*elbv2.TargetDescription{}

Expand Down
5 changes: 3 additions & 2 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"

corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -86,7 +87,7 @@ func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
); err != nil {
log.Error(err, "Failed to detach nodes")

return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: 1 * time.Second}, err
}

updated.Status.Conditions = append(updated.Status.Conditions, corev1.NodeCondition{
Expand All @@ -105,7 +106,7 @@ func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
}

func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.Recorder = mgr.GetEventRecorderFor("runner-controller")
r.Recorder = mgr.GetEventRecorderFor("node-detacher")

return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
Expand Down
33 changes: 30 additions & 3 deletions detacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (n *Nodes) detachNodes(unschedulableNodes []corev1.Node) error {

for k, v := range node.Labels {
ks := strings.Split("k", "/")
if len(ks) != 2 || !strings.Contains(k, NodeLabelPrefix) || k == KeyLabeled || v == LabelValueDetached {
if len(ks) < 2 || !strings.Contains(k, NodeLabelPrefix) || k == KeyLabeled || v == LabelValueDetached {
continue
}

Expand All @@ -51,8 +51,35 @@ func (n *Nodes) detachNodes(unschedulableNodes []corev1.Node) error {

labelUpdates[k] = LabelValueDetached
case "tg":
if err := deregisterInstancesFromTGs(n.elbv2Svc, id, []string{instanceId}); err != nil {
return err
{
// Prevents alb-ingress-controller from re-registering the target
// i.e. avoids race between node-detacher and the alb-ingress-controller)
var latest corev1.Node

if err := n.client.Get(context.Background(), types.NamespacedName{Name: node.Name}, &latest); err != nil {
return err
}

// See https://github.com/kubernetes-sigs/aws-alb-ingress-controller/blob/27e5d2a7dc8584123e3997a5dd3d80a58fa7bbd7/internal/ingress/annotations/class/main.go#L52
node.Labels["alpha.service-controller.kubernetes.io/exclude-balancer"] = "true"

if err := n.client.Update(context.Background(), &latest); err != nil {
return err
}

// Note that we continue by de-registering the target on our own, instead of waiting for the
// alb-ingress-controller to do it for us in favor of "alpha.service-controller.kubernetes.io/exclude-balancer"
// just to start de-registering the target earlier.
}

if len(ks) == 3 {
if err := deregisterInstanceFromTG(n.elbv2Svc, id, instanceId, ks[2]); err != nil {
return err
}
} else {
if err := deregisterInstancesFromTGs(n.elbv2Svc, id, []string{instanceId}); err != nil {
return err
}
}

labelUpdates[k] = LabelValueDetached
Expand Down
24 changes: 18 additions & 6 deletions labeler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error {
return err
}

instanceToTGs, err := getIdToTGs(n.elbv2Svc, instanceIDs)
_, instancToTDs, err := getIdToTGs(n.elbv2Svc, instanceIDs)
if err != nil {
return err
}
Expand All @@ -89,7 +89,7 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error {
instance := nodeToInstance[node.Name]
asgs := instanceToASGs[instance]
clbs := instanceToCLBs[instance]
tgs := instanceToTGs[instance]
tds := instancToTDs[instance]

var latest corev1.Node

Expand All @@ -99,16 +99,28 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error {
return err
}

tryset := func(k string) {
if _, ok := latest.Labels[k]; !ok {
latest.Labels[k] = ""
}
}

for _, asg := range asgs {
latest.Labels[fmt.Sprintf("asg.%s/%s", NodeLabelPrefix, asg)] = ""
tryset(fmt.Sprintf("asg.%s/%s", NodeLabelPrefix, asg))
}

for _, tg := range tgs {
latest.Labels[fmt.Sprintf("tg.%s/%s", NodeLabelPrefix, tg)] = ""
for arn, tds := range tds {
for _, td := range tds {
if td.Port == nil {
tryset(fmt.Sprintf("tg.%s/%s", NodeLabelPrefix, arn))
} else {
tryset(fmt.Sprintf("tg.%s/%s/%d", NodeLabelPrefix, arn, *td.Port))
}
}
}

for _, clb := range clbs {
latest.Labels[fmt.Sprintf("clb.%s/%s", NodeLabelPrefix, clb)] = ""
tryset(fmt.Sprintf("clb.%s/%s", NodeLabelPrefix, clb))
}

latest.Labels[KeyLabeled] = "true"
Expand Down

0 comments on commit a5519e8

Please sign in to comment.