Skip to content

Commit

Permalink
fix: Check node readiness before force terminating (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Mar 14, 2024
1 parent 2f605fe commit c6b8b81
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 30 deletions.
8 changes: 4 additions & 4 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ var _ = Describe("Simulate Scheduling", func() {
})
Expect(new).To(BeTrue())
// which needs to be deployed
ExpectNodeClaimDeployed(ctx, env.Client, cluster, cloudProvider, nc)
ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc)
nodeClaimNames[nc.Name] = struct{}{}

ExpectTriggerVerifyAction(&wg)
Expand All @@ -334,7 +334,7 @@ var _ = Describe("Simulate Scheduling", func() {
return !ok
})
Expect(new).To(BeTrue())
ExpectNodeClaimDeployed(ctx, env.Client, cluster, cloudProvider, nc)
ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc)
nodeClaimNames[nc.Name] = struct{}{}

ExpectTriggerVerifyAction(&wg)
Expand All @@ -349,7 +349,7 @@ var _ = Describe("Simulate Scheduling", func() {
return !ok
})
Expect(new).To(BeTrue())
ExpectNodeClaimDeployed(ctx, env.Client, cluster, cloudProvider, nc)
ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc)
nodeClaimNames[nc.Name] = struct{}{}

// Try one more time, but fail since the budgets only allow 3 disruptions.
Expand Down Expand Up @@ -1976,7 +1976,7 @@ func ExpectMakeNewNodeClaimsReady(ctx context.Context, c client.Client, wg *sync
if existingNodeClaimNames.Has(nc.Name) {
continue
}
nc, n := ExpectNodeClaimDeployed(ctx, c, cluster, cloudProvider, nc)
nc, n := ExpectNodeClaimDeployedAndStateUpdated(ctx, c, cluster, cloudProvider, nc)
ExpectMakeNodeClaimsInitialized(ctx, c, nc)
ExpectMakeNodesInitialized(ctx, c, n)

Expand Down
17 changes: 12 additions & 5 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
operatorcontroller "sigs.k8s.io/karpenter/pkg/operator/controller"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

Expand Down Expand Up @@ -88,12 +89,18 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying nodeclaim no longer exists.
if _, err := c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, v1.NodeReady).Status != v1.ConditionTrue {
if _, err := c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,9 @@ var _ = Describe("Termination", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, pods[0], pods[1])

// Make Node NotReady since it's automatically marked as Ready on first deploy
ExpectMakeNodesNotReady(ctx, env.Client, node)

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
Expand All @@ -565,6 +568,37 @@ var _ = Describe("Termination", func() {
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNotFound(ctx, env.Client, node)
})
It("should not delete nodes with no underlying instance if the node is still Ready", func() {
pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
ExpectApplied(ctx, env.Client, node, pods[0], pods[1])

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{})

// Expect the pods to be evicted
EventuallyExpectTerminating(ctx, env.Client, pods[0], pods[1])

// Expect node to exist and be draining, but not deleted
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNodeWithNodeClaimDraining(env.Client, node.Name)

// After this, the node still has one pod that is evicting.
ExpectDeleted(ctx, env.Client, pods[1])

// Remove the node from created nodeclaims so that the cloud provider returns DNE
cloudProvider.CreatedNodeClaims = map[string]*v1beta1.NodeClaim{}

// Reconcile to try to delete the node, but don't succeed because the readiness condition
// of the node still won't let us delete it
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectNodeExists(ctx, env.Client, node.Name)
})
It("should wait for pods to terminate", func() {
pod := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}})
fakeClock.SetTime(time.Now()) // make our fake clock match the pod creation time
Expand Down
21 changes: 19 additions & 2 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
Expand All @@ -31,6 +32,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -70,15 +74,28 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
cloudProviderProviderIDs := sets.New[string](lo.Map(cloudProviderNodeClaims, func(nc *v1beta1.NodeClaim, _ int) string {
return nc.Status.ProviderID
})...)
// Only consider NodeClaims that are Registered since we don't want to fully rely on the CloudProvider
// API to trigger deletion of the Node. Instead, we'll wait for our registration timeout to trigger
nodeClaims := lo.Filter(lo.ToSlicePtr(nodeClaimList.Items), func(n *v1beta1.NodeClaim, _ int) bool {
return n.StatusConditions().GetCondition(v1beta1.Launched).IsTrue() &&
return n.StatusConditions().GetCondition(v1beta1.Registered).IsTrue() &&
n.DeletionTimestamp.IsZero() &&
c.clock.Since(n.StatusConditions().GetCondition(v1beta1.Launched).LastTransitionTime.Inner.Time) > time.Second*10 &&
!cloudProviderProviderIDs.Has(n.Status.ProviderID)
})

errs := make([]error, len(nodeClaims))
workqueue.ParallelizeUntil(ctx, 20, len(nodeClaims), func(i int) {
node, err := nodeclaimutil.NodeForNodeClaim(ctx, c.kubeClient, nodeClaims[i])
// Ignore these errors since a registered NodeClaim should only have a NotFound node when
// the Node was deleted out from under us and a Duplicate Node is an invalid state
if nodeclaimutil.IgnoreDuplicateNodeError(nodeclaimutil.IgnoreNodeNotFoundError(err)) != nil {
errs[i] = err
}
// We do a check on the Ready condition of the node since, even though the CloudProvider says the instance
// is not around, we know that the kubelet process is still running if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if node != nil && nodeutils.GetCondition(node, v1.NodeReady).Status == v1.ConditionTrue {
return
}
if err := c.kubeClient.Delete(ctx, nodeClaims[i]); err != nil {
errs[i] = client.IgnoreNotFound(err)
return
Expand Down
76 changes: 67 additions & 9 deletions pkg/controllers/nodeclaim/garbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ var _ = Describe("GarbageCollection", func() {
BeforeEach(func() {
nodePool = test.NodePool()
})
It("should delete the NodeClaim when the Node never appears and the instance is gone", func() {
It("should delete the NodeClaim when the Node is there in a NotReady state and the instance is gone", func() {
nodeClaim := test.NodeClaim(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -99,21 +99,49 @@ var _ = Describe("GarbageCollection", func() {
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectReconcileSucceeded(ctx, nodeClaimController, client.ObjectKeyFromObject(nodeClaim))
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

nodeClaim, node, err := ExpectNodeClaimDeployed(ctx, env.Client, cloudProvider, nodeClaim)
Expect(err).ToNot(HaveOccurred())

// Mark the node as NotReady after the launch
ExpectMakeNodesNotReady(ctx, env.Client, node)

// Step forward to move past the cache eventual consistency timeout
fakeClock.SetTime(time.Now().Add(time.Second * 20))

// Delete the nodeClaim from the cloudprovider
Expect(cloudProvider.Delete(ctx, nodeClaim)).To(Succeed())

// Expect the NodeClaim to be removed now that the Instance is gone
// Expect the NodeClaim to not be removed since there is a Node that exists that has a Ready "true" condition
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should delete many NodeClaims when the Node never appears and the instance is gone", func() {
It("shouldn't delete the NodeClaim when the Node is there in a Ready state and the instance is gone", func() {
nodeClaim := test.NodeClaim(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)

nodeClaim, _, err := ExpectNodeClaimDeployed(ctx, env.Client, cloudProvider, nodeClaim)
Expect(err).ToNot(HaveOccurred())

// Step forward to move past the cache eventual consistency timeout
fakeClock.SetTime(time.Now().Add(time.Second * 20))

// Delete the nodeClaim from the cloudprovider
Expect(cloudProvider.Delete(ctx, nodeClaim)).To(Succeed())

// Expect the NodeClaim to not be removed since there is a Node that exists that has a Ready "true" condition
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectExists(ctx, env.Client, nodeClaim)
})
It("should delete many NodeClaims when the Nodes are there in a NotReady state and the instances are gone", func() {
var nodeClaims []*v1beta1.NodeClaim
for i := 0; i < 100; i++ {
nodeClaims = append(nodeClaims, test.NodeClaim(v1beta1.NodeClaim{
Expand All @@ -128,8 +156,13 @@ var _ = Describe("GarbageCollection", func() {
workqueue.ParallelizeUntil(ctx, len(nodeClaims), len(nodeClaims), func(i int) {
defer GinkgoRecover()
ExpectApplied(ctx, env.Client, nodeClaims[i])
ExpectReconcileSucceeded(ctx, nodeClaimController, client.ObjectKeyFromObject(nodeClaims[i]))
nodeClaims[i] = ExpectExists(ctx, env.Client, nodeClaims[i])
var node *v1.Node
var err error
nodeClaims[i], node, err = ExpectNodeClaimDeployed(ctx, env.Client, cloudProvider, nodeClaims[i])
Expect(err).ToNot(HaveOccurred())

// Mark the node as NotReady after the launch
ExpectMakeNodesNotReady(ctx, env.Client, node)
})

// Step forward to move past the cache eventual consistency timeout
Expand All @@ -150,6 +183,29 @@ var _ = Describe("GarbageCollection", func() {
})
ExpectNotFound(ctx, env.Client, lo.Map(nodeClaims, func(n *v1beta1.NodeClaim, _ int) client.Object { return n })...)
})
It("shouldn't delete the NodeClaim when the Node isn't there and the instance is gone", func() {
nodeClaim := test.NodeClaim(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
nodeClaim, err := ExpectNodeClaimDeployedNoNode(ctx, env.Client, cloudProvider, nodeClaim)
Expect(err).ToNot(HaveOccurred())

// Step forward to move past the cache eventual consistency timeout
fakeClock.SetTime(time.Now().Add(time.Second * 20))

// Delete the nodeClaim from the cloudprovider
Expect(cloudProvider.Delete(ctx, nodeClaim)).To(Succeed())

// Expect the NodeClaim to not be removed since the NodeClaim isn't registered
ExpectReconcileSucceeded(ctx, garbageCollectionController, client.ObjectKey{})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectExists(ctx, env.Client, nodeClaim)
})
It("shouldn't delete the NodeClaim when the Node isn't there but the instance is there", func() {
nodeClaim := test.NodeClaim(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -159,8 +215,10 @@ var _ = Describe("GarbageCollection", func() {
},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectReconcileSucceeded(ctx, nodeClaimController, client.ObjectKeyFromObject(nodeClaim))
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
nodeClaim, node, err := ExpectNodeClaimDeployed(ctx, env.Client, cloudProvider, nodeClaim)
Expect(err).ToNot(HaveOccurred())

Expect(env.Client.Delete(ctx, node)).To(Succeed())

// Step forward to move past the cache eventual consistency timeout
fakeClock.SetTime(time.Now().Add(time.Second * 20))
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2387,10 +2387,11 @@ var _ = Context("Scheduling", func() {
})
ExpectApplied(ctx, env.Client, nc)
if i == elem {
nc, node = ExpectNodeClaimDeployed(ctx, env.Client, cluster, cloudProvider, nc)
nc, node = ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc)
} else {
var err error
nc, err = ExpectNodeClaimDeployedNoNode(ctx, env.Client, cluster, cloudProvider, nc)
nc, err = ExpectNodeClaimDeployedNoNode(ctx, env.Client, cloudProvider, nc)
cluster.UpdateNodeClaim(nc)
Expect(err).ToNot(HaveOccurred())
}
nodeClaims = append(nodeClaims, nc)
Expand Down Expand Up @@ -2481,7 +2482,7 @@ var _ = Context("Scheduling", func() {
},
})
ExpectApplied(ctx, env.Client, nc)
nc, n := ExpectNodeClaimDeployed(ctx, env.Client, cluster, cloudProvider, nc)
nc, n := ExpectNodeClaimDeployedAndStateUpdated(ctx, env.Client, cluster, cloudProvider, nc)
nodeClaims = append(nodeClaims, nc)
nodes = append(nodes, n)
}
Expand Down
25 changes: 18 additions & 7 deletions pkg/test/expectations/expectations.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func ExpectProvisionedNoBinding(ctx context.Context, c client.Client, cluster *s
}
nodeClaim := &v1beta1.NodeClaim{}
Expect(c.Get(ctx, types.NamespacedName{Name: nodeClaimName}, nodeClaim)).To(Succeed())
nodeClaim, node := ExpectNodeClaimDeployed(ctx, c, cluster, cloudProvider, nodeClaim)
nodeClaim, node := ExpectNodeClaimDeployedAndStateUpdated(ctx, c, cluster, cloudProvider, nodeClaim)
if nodeClaim != nil && node != nil {
for _, pod := range m.Pods {
bindings[pod] = &Binding{
Expand All @@ -298,8 +298,9 @@ func ExpectProvisionedNoBinding(ctx context.Context, c client.Client, cluster *s
return bindings
}

func ExpectNodeClaimDeployedNoNode(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, nc *v1beta1.NodeClaim) (*v1beta1.NodeClaim, error) {
func ExpectNodeClaimDeployedNoNode(ctx context.Context, c client.Client, cloudProvider cloudprovider.CloudProvider, nc *v1beta1.NodeClaim) (*v1beta1.NodeClaim, error) {
GinkgoHelper()

resolved, err := cloudProvider.Create(ctx, nc)
// TODO @joinnis: Check this error rather than swallowing it. This is swallowed right now due to how we are doing some testing in the cloudprovider
if err != nil {
Expand All @@ -311,24 +312,34 @@ func ExpectNodeClaimDeployedNoNode(ctx context.Context, c client.Client, cluster
nc = lifecycle.PopulateNodeClaimDetails(nc, resolved)
nc.StatusConditions().MarkTrue(v1beta1.Launched)
ExpectApplied(ctx, c, nc)
cluster.UpdateNodeClaim(nc)
return nc, nil
}

func ExpectNodeClaimDeployed(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, nc *v1beta1.NodeClaim) (*v1beta1.NodeClaim, *v1.Node) {
func ExpectNodeClaimDeployed(ctx context.Context, c client.Client, cloudProvider cloudprovider.CloudProvider, nc *v1beta1.NodeClaim) (*v1beta1.NodeClaim, *v1.Node, error) {
GinkgoHelper()
nc, err := ExpectNodeClaimDeployedNoNode(ctx, c, cluster, cloudProvider, nc)

nc, err := ExpectNodeClaimDeployedNoNode(ctx, c, cloudProvider, nc)
if err != nil {
return nc, nil
return nc, nil, err
}
nc.StatusConditions().MarkTrue(v1beta1.Registered)

// Mock the nodeclaim launch and node joining at the apiserver
node := test.NodeClaimLinkedNode(nc)
node.Labels = lo.Assign(node.Labels, map[string]string{v1beta1.NodeRegisteredLabelKey: "true"})
ExpectApplied(ctx, c, nc, node)
Expect(cluster.UpdateNode(ctx, node)).To(Succeed())
return nc, node, nil
}

func ExpectNodeClaimDeployedAndStateUpdated(ctx context.Context, c client.Client, cluster *state.Cluster, cloudProvider cloudprovider.CloudProvider, nc *v1beta1.NodeClaim) (*v1beta1.NodeClaim, *v1.Node) {
GinkgoHelper()

nc, node, err := ExpectNodeClaimDeployed(ctx, c, cloudProvider, nc)
cluster.UpdateNodeClaim(nc)
if err != nil {
return nc, nil
}
Expect(cluster.UpdateNode(ctx, node)).To(Succeed())
return nc, node
}

Expand Down

0 comments on commit c6b8b81

Please sign in to comment.