Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) feature: Drain nodes prior to termination when using RollingUpgrade strategy #259

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion controllers/instancegroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type InstanceGroupReconciler struct {
ConfigMap *corev1.ConfigMap
Namespaces map[string]corev1.Namespace
NamespacesLock *sync.Mutex
DrainGroups *sync.Map
ConfigRetention int
}

Expand Down Expand Up @@ -123,6 +124,7 @@ func (r *InstanceGroupReconciler) Reconcile(ctxt context.Context, req ctrl.Reque

// set/unset finalizer
r.SetFinalizer(instanceGroup)
namespacedName := instanceGroup.NamespacedName()

input := provisioners.ProvisionerInput{
AwsWorker: r.Auth.Aws,
Expand All @@ -139,6 +141,9 @@ func (r *InstanceGroupReconciler) Reconcile(ctxt context.Context, req ctrl.Reque
)
status.SetConfigHash(configHash)

drainGroup, _ := r.DrainGroups.LoadOrStore(namespacedName, &sync.WaitGroup{})
input.DrainGroup = drainGroup.(*sync.WaitGroup)

if !reflect.DeepEqual(*r.ConfigMap, corev1.ConfigMap{}) {
// Configmap exist - apply defaults/boundaries if namespace is not excluded
namespace := instanceGroup.GetNamespace()
Expand All @@ -150,7 +155,7 @@ func (r *InstanceGroupReconciler) Reconcile(ctxt context.Context, req ctrl.Reque
}

if err = defaultConfig.SetDefaults(); err != nil {
r.Log.Error(err, "failed to set configuration defaults", "instancegroup", instanceGroup.NamespacedName())
r.Log.Error(err, "failed to set configuration defaults", "instancegroup", namespacedName)
return ctrl.Result{}, err
}

Expand Down
18 changes: 18 additions & 0 deletions controllers/providers/aws/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
package aws

import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/eks"
Expand Down Expand Up @@ -98,3 +100,19 @@ func IsUsingMixedInstances(group *autoscaling.Group) bool {
}
return false
}

func IsDesiredInService(scalingGroup *autoscaling.Group) bool {
var (
desired = aws.Int64Value(scalingGroup.DesiredCapacity)
inServiceCount int64
)

for _, instance := range scalingGroup.Instances {
lifecycle := aws.StringValue(instance.LifecycleState)
if strings.EqualFold(lifecycle, autoscaling.LifecycleStateInService) {
inServiceCount++
}
}

return desired == inServiceCount
}
88 changes: 0 additions & 88 deletions controllers/providers/kubernetes/rollingupdate.go

This file was deleted.

27 changes: 24 additions & 3 deletions controllers/providers/kubernetes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
log = ctrl.Log.WithName("kubernetes-provider")
)

type KubernetesClientSet struct {
Kubernetes kubernetes.Interface
KubeDynamic dynamic.Interface
Expand All @@ -63,7 +68,7 @@ func IsDesiredNodesReady(nodes *corev1.NodeList, instanceIds []string, desiredCo
return false, nil
}

readyInstances := GetReadyNodesByInstance(instanceIds, nodes)
readyInstances := GetReadyNodeNamesByInstance(instanceIds, nodes)

// if discovered nodes match provided instance ids, condition is ready
if common.StringSliceEquals(readyInstances, instanceIds) {
Expand All @@ -79,7 +84,7 @@ func IsMinNodesReady(nodes *corev1.NodeList, instanceIds []string, minCount int)
return false, nil
}

readyInstances := GetReadyNodesByInstance(instanceIds, nodes)
readyInstances := GetReadyNodeNamesByInstance(instanceIds, nodes)

// if discovered nodes match provided instance ids, condition is ready
if common.StringSliceContains(readyInstances, instanceIds) {
Expand All @@ -89,7 +94,7 @@ func IsMinNodesReady(nodes *corev1.NodeList, instanceIds []string, minCount int)
return false, nil
}

func GetReadyNodesByInstance(instanceIds []string, nodes *corev1.NodeList) []string {
func GetReadyNodeNamesByInstance(instanceIds []string, nodes *corev1.NodeList) []string {
readyInstances := make([]string, 0)
for _, id := range instanceIds {
for _, node := range nodes.Items {
Expand All @@ -101,6 +106,22 @@ func GetReadyNodesByInstance(instanceIds []string, nodes *corev1.NodeList) []str
return readyInstances
}

func GetNodesByInstance(instanceIds []string, nodes *corev1.NodeList) *corev1.NodeList {
nodeList := &corev1.NodeList{
ListMeta: metav1.ListMeta{},
Items: []corev1.Node{},
}

for _, id := range instanceIds {
for _, node := range nodes.Items {
if common.GetLastElementBy(node.Spec.ProviderID, "/") == id {
nodeList.Items = append(nodeList.Items, node)
}
}
}
return nodeList
}

func IsNodeReady(n corev1.Node) bool {
for _, condition := range n.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status == corev1.ConditionTrue {
Expand Down
6 changes: 3 additions & 3 deletions controllers/provisioners/eks/eks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"fmt"
"sync"

corev1 "k8s.io/api/core/v1"

"github.com/go-logr/logr"

"github.com/keikoproj/instance-manager/api/v1alpha1"
awsprovider "github.com/keikoproj/instance-manager/controllers/providers/aws"
kubeprovider "github.com/keikoproj/instance-manager/controllers/providers/kubernetes"
"github.com/keikoproj/instance-manager/controllers/provisioners"
corev1 "k8s.io/api/core/v1"
)

const (
Expand Down Expand Up @@ -65,6 +63,7 @@ func New(p provisioners.ProvisionerInput) *EksInstanceGroupContext {

ctx := &EksInstanceGroupContext{
InstanceGroup: instanceGroup,
DrainGroup: p.DrainGroup,
KubernetesClient: p.Kubernetes,
AwsWorker: p.AwsWorker,
Log: p.Log.WithName("eks"),
Expand All @@ -81,6 +80,7 @@ func New(p provisioners.ProvisionerInput) *EksInstanceGroupContext {

type EksInstanceGroupContext struct {
sync.Mutex
DrainGroup *sync.WaitGroup
InstanceGroup *v1alpha1.InstanceGroup
KubernetesClient kubeprovider.KubernetesClientSet
AwsWorker awsprovider.AwsWorker
Expand Down
2 changes: 1 addition & 1 deletion controllers/provisioners/eks/eks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func MockAwsCRDStrategy(spec string) v1alpha1.AwsUpgradeStrategy {

func MockAwsRollingUpdateStrategy(maxUnavailable *intstr.IntOrString) v1alpha1.AwsUpgradeStrategy {
return v1alpha1.AwsUpgradeStrategy{
Type: kubeprovider.RollingUpdateStrategyName,
Type: RollingUpdateStrategyName,
RollingUpdateType: &v1alpha1.RollingUpdateStrategy{
MaxUnavailable: maxUnavailable,
},
Expand Down
Loading