From a5519e8b1845930b6e378760eb84e9a328e42e6f Mon Sep 17 00:00:00 2001 From: Yusuke Kuoka Date: Mon, 2 Mar 2020 20:33:16 +0900 Subject: [PATCH] feat: alb-ingress-controller integration The algorithm is enhanced to add `alpha.service-controller.kubernetes.io/exclude-balancer: true` before de-registering the node, so that node-detacher will not race with alb-ingress-controller trying to reconcile the target to be added back. --- README.md | 13 +++++++++++ aws.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++----- controller.go | 5 ++-- detacher.go | 33 +++++++++++++++++++++++--- labeler.go | 24 ++++++++++++++----- 5 files changed, 122 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index a29bcf6..5c88d95 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,19 @@ With this application in place, the overall node shutdown process with Cluster A - ELB(s) still gradually stop directing the traffic to the nodes. The backend Kubernetes service and pods will starst to receive less and less traffic. - ELB(s) stops directing traffic as the EC2 instances are detached. Application processes running inside pods can safely terminates +## Algorithm + +`node-detacher` runs the following steps in a control loop: + +- On `Node` resource change - +- Is the node exists? + - No -> The node is already terminated. We have nothing to do no matter if it's properly detached from LBs or not. Exit this loop. +- Is the node is unschedulable? + - No -> The node is not scheduled for termination. Exit this loop. +- Is the node has condition `NodeDetatching` set to `True`? + - Yes -> The node is already scheduled for detachment/deregistration. All we need is to hold on and wish the node to properly deregistered from LBs in time. Ecit the loop. +- Deregister the node from all the target groups, CLBs and ASGs + ## Recommended Usage - Run [`aws-asg-roller`](https://github.com/deitch/aws-asg-roller) along with `node-detacher` in order to avoid potential downtime due to ELB not reacting to node termination fast enough diff --git a/aws.go b/aws.go index 50b6edc..86fd785 100644 --- a/aws.go +++ b/aws.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/aws/aws-sdk-go/service/elb" "github.com/aws/aws-sdk-go/service/elb/elbiface" + "strconv" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -82,9 +83,9 @@ func getIdToASGs(svc autoscalingiface.AutoScalingAPI, ids []string) (map[string] return idToASGs, nil } -func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, error) { +func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, map[string]map[string][]elbv2.TargetDescription, error) { if len(ids) == 0 { - return nil, nil + return nil, nil, nil } tgInput := &elbv2.DescribeTargetGroupsInput{ @@ -98,17 +99,19 @@ func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, err return !lastPage }) if err != nil { - return nil, fmt.Errorf("Unable to get description for node %v: %v", ids, err) + return nil, nil, fmt.Errorf("Unable to get description for node %v: %v", ids, err) } idToTGs := map[string][]string{} + idToTDs := map[string]map[string][]elbv2.TargetDescription{} + for _, tg := range tgs { output, err := svc.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{ TargetGroupArn: tg.TargetGroupArn, }) if err != nil { - return nil, err + return nil, nil, err } for _, desc := range output.TargetHealthDescriptions { @@ -118,11 +121,23 @@ func getIdToTGs(svc elbv2iface.ELBV2API, ids []string) (map[string][]string, err idToTGs[id] = []string{} } - idToTGs[id] = append(idToTGs[id], *tg.TargetGroupArn) + arn := *tg.TargetGroupArn + + idToTGs[id] = append(idToTGs[id], arn) + + if _, ok := idToTDs[id]; !ok { + idToTDs[id] = map[string][]elbv2.TargetDescription{} + } + + if _, ok := idToTDs[id][arn]; !ok { + idToTDs[id][arn] = []elbv2.TargetDescription{} + } + + idToTDs[id][arn] = append(idToTDs[id][arn], *desc.Target) } } - return idToTGs, nil + return idToTGs, idToTDs, nil } func detachInstancesFromASGs(svc autoscalingiface.AutoScalingAPI, asgName string, instanceIDs []string) error { @@ -190,6 +205,43 @@ func deregisterInstancesFromCLBs(svc elbiface.ELBAPI, lbName string, instanceIDs return nil } +func deregisterInstanceFromTG(svc elbv2iface.ELBV2API, tgName string, instanceID string, port string) error { + descs := []*elbv2.TargetDescription{} + + portNum, err := strconv.Atoi(port) + if err != nil { + return fmt.Errorf("invalid port %q: %w", port, err) + } + + descs = append(descs, &elbv2.TargetDescription{ + Id: aws.String(instanceID), + Port: aws.Int64(int64(portNum)), + }) + + input := &elbv2.DeregisterTargetsInput{ + TargetGroupArn: aws.String(tgName), + Targets: descs, + } + + // See https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_DetachInstances.html for the API spec + if _, err := svc.DeregisterTargets(input); err != nil { + if aerr, ok := err.(awserr.Error); ok { + + switch aerr.Code() { + case autoscaling.ErrCodeResourceContentionFault: + return fmt.Errorf("Could not deregister targets, any resource is in contention, will try in next loop") + default: + return fmt.Errorf("Unknown aws error when deregistering targets: %v", aerr.Error()) + } + } else { + // Print the error, cast err to awserr.Error to get the Code and + // Message from an error. + return fmt.Errorf("Unknown non-aws error when deregistering targets: %v", err.Error()) + } + } + return nil +} + func deregisterInstancesFromTGs(svc elbv2iface.ELBV2API, tgName string, instanceIDs []string) error { descs := []*elbv2.TargetDescription{} diff --git a/controller.go b/controller.go index f5216c4..54e3057 100644 --- a/controller.go +++ b/controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "time" corev1 "k8s.io/api/core/v1" ) @@ -86,7 +87,7 @@ func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ); err != nil { log.Error(err, "Failed to detach nodes") - return ctrl.Result{}, err + return ctrl.Result{RequeueAfter: 1 * time.Second}, err } updated.Status.Conditions = append(updated.Status.Conditions, corev1.NodeCondition{ @@ -105,7 +106,7 @@ func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { } func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { - r.Recorder = mgr.GetEventRecorderFor("runner-controller") + r.Recorder = mgr.GetEventRecorderFor("node-detacher") return ctrl.NewControllerManagedBy(mgr). For(&corev1.Node{}). diff --git a/detacher.go b/detacher.go index b6b943b..cf1602f 100644 --- a/detacher.go +++ b/detacher.go @@ -35,7 +35,7 @@ func (n *Nodes) detachNodes(unschedulableNodes []corev1.Node) error { for k, v := range node.Labels { ks := strings.Split("k", "/") - if len(ks) != 2 || !strings.Contains(k, NodeLabelPrefix) || k == KeyLabeled || v == LabelValueDetached { + if len(ks) < 2 || !strings.Contains(k, NodeLabelPrefix) || k == KeyLabeled || v == LabelValueDetached { continue } @@ -51,8 +51,35 @@ func (n *Nodes) detachNodes(unschedulableNodes []corev1.Node) error { labelUpdates[k] = LabelValueDetached case "tg": - if err := deregisterInstancesFromTGs(n.elbv2Svc, id, []string{instanceId}); err != nil { - return err + { + // Prevents alb-ingress-controller from re-registering the target + // i.e. avoids race between node-detacher and the alb-ingress-controller) + var latest corev1.Node + + if err := n.client.Get(context.Background(), types.NamespacedName{Name: node.Name}, &latest); err != nil { + return err + } + + // See https://github.com/kubernetes-sigs/aws-alb-ingress-controller/blob/27e5d2a7dc8584123e3997a5dd3d80a58fa7bbd7/internal/ingress/annotations/class/main.go#L52 + node.Labels["alpha.service-controller.kubernetes.io/exclude-balancer"] = "true" + + if err := n.client.Update(context.Background(), &latest); err != nil { + return err + } + + // Note that we continue by de-registering the target on our own, instead of waiting for the + // alb-ingress-controller to do it for us in favor of "alpha.service-controller.kubernetes.io/exclude-balancer" + // just to start de-registering the target earlier. + } + + if len(ks) == 3 { + if err := deregisterInstanceFromTG(n.elbv2Svc, id, instanceId, ks[2]); err != nil { + return err + } + } else { + if err := deregisterInstancesFromTGs(n.elbv2Svc, id, []string{instanceId}); err != nil { + return err + } } labelUpdates[k] = LabelValueDetached diff --git a/labeler.go b/labeler.go index 732def6..b7d7edf 100644 --- a/labeler.go +++ b/labeler.go @@ -80,7 +80,7 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error { return err } - instanceToTGs, err := getIdToTGs(n.elbv2Svc, instanceIDs) + _, instancToTDs, err := getIdToTGs(n.elbv2Svc, instanceIDs) if err != nil { return err } @@ -89,7 +89,7 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error { instance := nodeToInstance[node.Name] asgs := instanceToASGs[instance] clbs := instanceToCLBs[instance] - tgs := instanceToTGs[instance] + tds := instancToTDs[instance] var latest corev1.Node @@ -99,16 +99,28 @@ func (n *Nodes) labelNodes(nodes []corev1.Node) error { return err } + tryset := func(k string) { + if _, ok := latest.Labels[k]; !ok { + latest.Labels[k] = "" + } + } + for _, asg := range asgs { - latest.Labels[fmt.Sprintf("asg.%s/%s", NodeLabelPrefix, asg)] = "" + tryset(fmt.Sprintf("asg.%s/%s", NodeLabelPrefix, asg)) } - for _, tg := range tgs { - latest.Labels[fmt.Sprintf("tg.%s/%s", NodeLabelPrefix, tg)] = "" + for arn, tds := range tds { + for _, td := range tds { + if td.Port == nil { + tryset(fmt.Sprintf("tg.%s/%s", NodeLabelPrefix, arn)) + } else { + tryset(fmt.Sprintf("tg.%s/%s/%d", NodeLabelPrefix, arn, *td.Port)) + } + } } for _, clb := range clbs { - latest.Labels[fmt.Sprintf("clb.%s/%s", NodeLabelPrefix, clb)] = "" + tryset(fmt.Sprintf("clb.%s/%s", NodeLabelPrefix, clb)) } latest.Labels[KeyLabeled] = "true"