Skip to content

Commit

Permalink
chore: limit queue depth to 1 (#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran committed Nov 17, 2023
1 parent 91f7d0b commit 2066417
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
return reconcile.Result{}, fmt.Errorf("removing taint from nodes, %w", err)
}

// Check if the queue is processing an item. If it is, retry again later.
// TODO this should be removed when disruption budgets are added in.
if !c.queue.IsEmpty() {
return reconcile.Result{RequeueAfter: time.Second}, nil
}

// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
c.recordRun(fmt.Sprintf("%T", m))
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,9 @@ func (q *Queue) Reset() {
q.RateLimitingInterface = &controllertest.Queue{Interface: workqueue.New()}
q.providerIDToCommand = map[string]*Command{}
}

func (q *Queue) IsEmpty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.providerIDToCommand) == 0
}
138 changes: 138 additions & 0 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,144 @@ var _ = AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
})

// TODO remove this when Budgets are added in
var _ = Describe("Queue Limits", func() {
var nodePool *v1beta1.NodePool
var nodeClaim, nodeClaim2 *v1beta1.NodeClaim
var node, node2 *v1.Node
BeforeEach(func() {
currentInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "current-on-demand",
Offerings: []cloudprovider.Offering{
{
CapacityType: v1beta1.CapacityTypeOnDemand,
Zone: "test-zone-1a",
Price: 1.5,
Available: false,
},
},
})
replacementInstance := fake.NewInstanceType(fake.InstanceTypeOptions{
Name: "spot-replacement",
Offerings: []cloudprovider.Offering{
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1a",
Price: 1.0,
Available: true,
},
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1b",
Price: 0.2,
Available: true,
},
{
CapacityType: v1beta1.CapacityTypeSpot,
Zone: "test-zone-1c",
Price: 0.4,
Available: true,
},
},
})
nodePool = test.NodePool()
nodeClaim, node = test.NodeClaimAndNode(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1beta1.CapacityTypeLabelKey: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
Status: v1beta1.NodeClaimStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
nodeClaim2, node2 = test.NodeClaimAndNode(v1beta1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.LabelInstanceTypeStable: currentInstance.Name,
v1beta1.CapacityTypeLabelKey: currentInstance.Offerings[0].CapacityType,
v1.LabelTopologyZone: currentInstance.Offerings[0].Zone,
v1beta1.NodePoolLabelKey: nodePool.Name,
},
},
Status: v1beta1.NodeClaimStatus{
ProviderID: test.RandomProviderID(),
Allocatable: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("3"),
v1.ResourcePods: resource.MustParse("100"),
},
},
})
cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{
currentInstance,
replacementInstance,
}
// Mark the nodes as drifted so they'll be both be candidates
nodeClaim.StatusConditions().MarkTrue(v1beta1.Drifted)
nodeClaim2.StatusConditions().MarkTrue(v1beta1.Drifted)
})
It("should be able to disrupt two nodes with replace, but only ever be disrupting one at a time", func() {
labels := map[string]string{
"app": "test",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)
Expect(env.Client.Get(ctx, client.ObjectKeyFromObject(rs), rs)).To(Succeed())

pods := test.Pods(2, test.PodOptions{
ResourceRequirements: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2"),
},
},
ObjectMeta: metav1.ObjectMeta{Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: ptr.Bool(true),
BlockOwnerDeletion: ptr.Bool(true),
},
}}})

ExpectApplied(ctx, env.Client, rs, pods[0], pods[1], nodeClaim, nodeClaim2, node, node2, nodePool)

// bind the pods to the nodes so that they're both non-empty
ExpectManualBinding(ctx, env.Client, pods[0], node)
ExpectManualBinding(ctx, env.Client, pods[1], node2)

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node, node2}, []*v1beta1.NodeClaim{nodeClaim, nodeClaim2})

// Do one reconcile to add one node to the queue
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})

Expect(queue.Len()).To(BeNumerically("==", 1))
// Process the item but still expect it to be in the queue, since it's replacements aren't created
ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectNodeExists(ctx, env.Client, node2.Name)
Expect(queue.Len()).To(BeNumerically("==", 1))

// Do another reconcile to try to add another node to the queue.
ExpectReconcileSucceeded(ctx, disruptionController, types.NamespacedName{})
ExpectNodeExists(ctx, env.Client, node.Name)
ExpectNodeExists(ctx, env.Client, node2.Name)
// Expect that the queue length has not increased
Expect(queue.Len()).To(BeNumerically("==", 1))
})
})

var _ = Describe("Disruption Taints", func() {
var nodePool *v1beta1.NodePool
var nodeClaim *v1beta1.NodeClaim
Expand Down

0 comments on commit 2066417

Please sign in to comment.