Skip to content

Commit

Permalink
chore: Add queue depth metrics for provisioning and termination (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Mar 5, 2024
1 parent 831d3c6 commit 1489b70
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/leasegarbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *Controller) Reconcile(ctx context.Context, l *v1.Lease) (reconcile.Resu
err := c.kubeClient.Delete(ctx, l)
if err == nil {
logging.FromContext(ctx).Debug("found and delete leaked lease")
NodeLeaseDeletedCounter.WithLabelValues().Inc()
NodeLeasesDeletedCounter.WithLabelValues().Inc()
}

return reconcile.Result{}, client.IgnoreNotFound(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/leasegarbagecollection/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
)

var (
NodeLeaseDeletedCounter = prometheus.NewCounterVec(
NodeLeasesDeletedCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "karpenter",
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "leases_deleted",
Help: "Number of deleted leaked leases.",
Expand All @@ -36,5 +36,5 @@ var (
)

func init() {
crmetrics.Registry.MustRegister(NodeLeaseDeletedCounter)
crmetrics.Registry.MustRegister(NodeLeasesDeletedCounter)
}
2 changes: 1 addition & 1 deletion pkg/controllers/leasegarbagecollection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ var _ = Describe("GarbageCollection", func() {
})
AfterEach(func() {
// Reset the metrics collectors
leasegarbagecollection.NodeLeaseDeletedCounter.Reset()
leasegarbagecollection.NodeLeasesDeletedCounter.Reset()
})
Context("Metrics", func() {
It("should fire the leaseDeletedCounter metric when deleting leases", func() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/node/termination/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(TerminationSummary)
}

var (
TerminationSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "karpenter",
Subsystem: "nodes",
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "termination_time_seconds",
Help: "The time taken between a node's deletion request and the removal of its finalizer",
Objectives: metrics.SummaryObjectives(),
},
[]string{metrics.NodePoolLabel},
)
)

func init() {
crmetrics.Registry.MustRegister(TerminationSummary)
}
27 changes: 25 additions & 2 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ var _ = Describe("Termination", func() {
nodeClaim, node = test.NodeClaimAndNode(v1beta1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1beta1.TerminationFinalizer}}})
node.Labels[v1beta1.NodePoolLabelKey] = test.NodePool().Name
cloudProvider.CreatedNodeClaims[node.Spec.ProviderID] = nodeClaim
queue.Reset()
})

AfterEach(func() {
ExpectCleanedUp(ctx, env.Client)
fakeClock.SetTime(time.Now())
cloudProvider.Reset()
queue.Reset()

// Reset the metrics collectors
metrics.NodesTerminatedCounter.Reset()
termination.TerminationSummary.Reset()
terminator.EvictionQueueDepth.Set(0)
})

Context("Reconciliation", func() {
Expand Down Expand Up @@ -294,7 +295,7 @@ var _ = Describe("Termination", func() {
ExpectNotFound(ctx, env.Client, node)
})
It("should fail to evict pods that violate a PDB", func() {
minAvailable := intstr.FromInt(1)
minAvailable := intstr.FromInt32(1)
labelSelector := map[string]string{test.RandomName(): test.RandomName()}
pdb := test.PodDisruptionBudget(test.PDBOptions{
Labels: labelSelector,
Expand Down Expand Up @@ -658,6 +659,28 @@ var _ = Describe("Termination", func() {
Expect(ok).To(BeTrue())
Expect(lo.FromPtr(m.GetCounter().Value)).To(BeNumerically("==", 1))
})
It("should update the eviction queueDepth metric when reconciling pods", func() {
minAvailable := intstr.FromInt32(0)
labelSelector := map[string]string{test.RandomName(): test.RandomName()}
pdb := test.PodDisruptionBudget(test.PDBOptions{
Labels: labelSelector,
// Don't let any pod evict
MinAvailable: &minAvailable,
})
ExpectApplied(ctx, env.Client, pdb, node)
pods := test.Pods(5, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{
OwnerReferences: defaultOwnerRefs,
Labels: labelSelector,
}})
ExpectApplied(ctx, env.Client, lo.Map(pods, func(p *v1.Pod, _ int) client.Object { return p })...)

Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(ctx, env.Client, node.Name)
ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node))
ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) // Reconcile the queue so that we set the metric

ExpectMetricGaugeValue("karpenter_nodes_eviction_queue_depth", 5, map[string]string{})
})
})
})

Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (q *Queue) Has(pod *v1.Pod) bool {
}

func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
EvictionQueueDepth.Set(float64(q.RateLimitingInterface.Len()))
// Check if the queue is empty. client-go recommends not using this function to gate the subsequent
// get call, but since we're popping items off the queue synchronously, there should be no synchonization
// issues.
Expand Down
39 changes: 39 additions & 0 deletions pkg/controllers/node/termination/terminator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terminator

import (
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(EvictionQueueDepth)
}

var (
EvictionQueueDepth = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "eviction_queue_depth",
Help: "The number of pods currently waiting for a successful eviction in the eviction queue.",
},
)
)
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(ctx, p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil
return scheduler.NewScheduler(p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down
40 changes: 31 additions & 9 deletions pkg/controllers/provisioning/scheduling/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ import (
)

func init() {
crmetrics.Registry.MustRegister(schedulingSimulationDuration)
crmetrics.Registry.MustRegister(SimulationDurationSeconds, QueueDepth)
}

var schedulingSimulationDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: "provisioner",
Name: "scheduling_simulation_duration_seconds",
Help: "Duration of scheduling simulations used for deprovisioning and provisioning in seconds.",
Buckets: metrics.DurationBuckets(),
},
const (
controllerLabel = "controller"
schedulingIDLabel = "scheduling_id"
)

var (
SimulationDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: "provisioner",
Name: "scheduling_simulation_duration_seconds",
Help: "Duration of scheduling simulations used for deprovisioning and provisioning in seconds.",
Buckets: metrics.DurationBuckets(),
},
[]string{
controllerLabel,
},
)
QueueDepth = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: "provisioner",
Name: "scheduling_queue_depth",
Help: "The number of pods currently waiting to be scheduled.",
},
[]string{
controllerLabel,
schedulingIDLabel,
},
)
)
19 changes: 15 additions & 4 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@ import (
"fmt"
"sort"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/operator/injection"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand All @@ -38,7 +43,7 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTemplates []*NodeClaimTemplate,
func NewScheduler(kubeClient client.Client, nodeClaimTemplates []*NodeClaimTemplate,
nodePools []v1beta1.NodePool, cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology,
instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*v1.Pod,
recorder events.Recorder) *Scheduler {
Expand All @@ -55,7 +60,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTempla
}

s := &Scheduler{
ctx: ctx,
id: uuid.NewUUID(),
kubeClient: kubeClient,
nodeClaimTemplates: nodeClaimTemplates,
topology: topology,
Expand All @@ -74,7 +79,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTempla
}

type Scheduler struct {
ctx context.Context
id types.UID // Unique UUID attached to this scheduling loop
newNodeClaims []*NodeClaim
existingNodes []*ExistingNode
nodeClaimTemplates []*NodeClaimTemplate
Expand Down Expand Up @@ -200,15 +205,21 @@ func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results {
}

func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results {
defer metrics.Measure(schedulingSimulationDuration)()
defer metrics.Measure(SimulationDurationSeconds.With(
prometheus.Labels{controllerLabel: injection.GetControllerName(ctx)},
))()
// We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few
// issues including pods with affinity to another pod in the batch. We could topo-sort to solve this, but it wouldn't
// solve the problem of scheduling pods where a particular order is needed to prevent a max-skew violation. E.g. if we
// had 5xA pods and 5xB pods were they have a zonal topology spread, but A can only go in one zone and B in another.
// We need to schedule them alternating, A, B, A, B, .... and this solution also solves that as well.
errors := map[*v1.Pod]error{}
QueueDepth.DeletePartialMatch(prometheus.Labels{controllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around
q := NewQueue(pods...)
for {
QueueDepth.With(
prometheus.Labels{controllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)},
).Set(float64(len(q.pods)))
// Try the next pod
pod, ok := q.Pop()
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
b.Fatalf("creating topology, %s", err)
}

scheduler := scheduling.NewScheduler(ctx, client, []*scheduling.NodeClaimTemplate{scheduling.NewNodeClaimTemplate(nodePool)},
scheduler := scheduling.NewScheduler(client, []*scheduling.NodeClaimTemplate{scheduling.NewNodeClaimTemplate(nodePool)},
nil, cluster, nil, topology,
map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil,
events.NewRecorder(&record.FakeRecorder{}))
Expand Down

0 comments on commit 1489b70

Please sign in to comment.