Skip to content

Commit

Permalink
[tests] update intergration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Stuchinskii committed Oct 4, 2023
1 parent 13a2389 commit fa11294
Show file tree
Hide file tree
Showing 15 changed files with 882 additions and 213 deletions.
4 changes: 4 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type PodSet struct {
// the keys in the nodeLabels from the ResourceFlavors considered for this
// Workload are used to filter the ResourceFlavors that can be assigned to
// this podSet.
//
// +optional
Template *corev1.PodTemplateSpec `json:"template"`

// count is the number of pods for the spec.
Expand All @@ -133,6 +135,8 @@ type PodSet struct {
MinCount *int32 `json:"minCount,omitempty"`

// PodTemplateName is the name of the PodTemplate the Workload is associated with.
//
// +optional
PodTemplateName *string `json:"podTemplateName,omitempty"`
}

Expand Down
1 change: 0 additions & 1 deletion config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8186,7 +8186,6 @@ spec:
required:
- count
- name
- template
type: object
maxItems: 8
minItems: 1
Expand Down
1 change: 1 addition & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ const (

WorkloadPriorityClassSource = "kueue.x-k8s.io/workloadpriorityclass"
PodPriorityClassSource = "scheduling.k8s.io/priorityclass"
WorkloadNameSource = "kueue.x-k8s.io/workload-name"
)
158 changes: 93 additions & 65 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package core
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -135,17 +138,17 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

for i, ps := range wl.Spec.PodSets {
if wl.Spec.PodSets[i].PodTemplateName != nil {
continue
}
var pt corev1.PodTemplate
if err := r.client.Get(ctx, client.ObjectKey{Name: fmt.Sprintf("%s-%s", wl.Name, ps.Name), Namespace: wl.Namespace}, &pt); err != nil {
continue
}
wl.Spec.PodSets[i].PodTemplateName = &pt.Name
wlCopy := wl.DeepCopy()

if err := r.updatePodTemplateName(ctx, &wl); err != nil {
log.V(2).Error(err, "updatePodTemplateName")
return ctrl.Result{}, err
}

if !equality.Semantic.DeepEqual(wl.Spec.PodSets, wlCopy.Spec.PodSets) {
if err := r.client.Update(ctx, &wl); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
log.V(2).Error(err, "client.Update", "wl", wl)
return ctrl.Result{}, err
}
}

Expand Down Expand Up @@ -191,6 +194,27 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

func (r *WorkloadReconciler) updatePodTemplateName(ctx context.Context, wl *kueue.Workload) error {
labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{constants.WorkloadNameSource: wl.Name}}
var pts corev1.PodTemplateList
if err := r.client.List(ctx, &pts, &client.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).AsSelector(), Namespace: wl.Namespace}); err != nil {
return err
}

for i, ps := range wl.Spec.PodSets {
if wl.Spec.PodSets[i].PodTemplateName != nil {
continue
}
for _, pt := range pts.Items {
if strings.Contains(pt.Name, ps.Name) {
wl.Spec.PodSets[i].PodTemplateName = &pt.Name
}
}
}

return nil
}

func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || !workload.HasRetryOrRejectedChecks(wl) {
return false, nil
Expand Down Expand Up @@ -289,8 +313,12 @@ func (r *WorkloadReconciler) Create(e event.CreateEvent) bool {
}

pts := r.extractPodTemplates(ctx, *wl)
if len(pts) == 0 {
return false
}

wlCopy := wl.DeepCopy()
r.adjustResources(ctx, wlCopy, pts)

if !workload.HasQuotaReservation(wl) {
if !r.queues.AddOrUpdateWorkload(wlCopy, pts) {
Expand Down Expand Up @@ -378,6 +406,8 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {

wlCopy := wl.DeepCopy()

r.adjustResources(ctx, wlCopy, pts)

switch {
case status == finished:
// The workload could have been in the queues if we missed an event.
Expand Down Expand Up @@ -500,26 +530,28 @@ func workloadStatus(w *kueue.Workload) string {
// As a result, the pod's Overhead is not always correct. E.g. if we set a non-existent runtime class name to
// `pod.Spec.RuntimeClassName` and we also set the `pod.Spec.Overhead`, in real world, the pod creation will be
// rejected due to the mismatch with RuntimeClass. However, in the future we assume that they are correct.
func (h *podTemplateHandler) handlePodOverhead(ctx context.Context, pt *corev1.PodTemplate) {
func (r *WorkloadReconciler) handlePodOverhead(ctx context.Context, pts []corev1.PodTemplate) {
log := ctrl.LoggerFrom(ctx)

podSpec := &pt.Template.Spec
if podSpec.RuntimeClassName != nil && len(podSpec.Overhead) == 0 {
var runtimeClass nodev1.RuntimeClass
if err := h.client.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil {
log.Error(err, "Could not get RuntimeClass")
}
if runtimeClass.Overhead != nil {
podSpec.Overhead = runtimeClass.Overhead.PodFixed
for pi := range pts {
podSpec := &pts[pi].Template.Spec
if podSpec.RuntimeClassName != nil && len(podSpec.Overhead) == 0 {
var runtimeClass nodev1.RuntimeClass
if err := r.client.Get(ctx, types.NamespacedName{Name: *podSpec.RuntimeClassName}, &runtimeClass); err != nil {
log.Error(err, "Could not get RuntimeClass")
}
if runtimeClass.Overhead != nil {
podSpec.Overhead = runtimeClass.Overhead.PodFixed
}
}
}
}

func (h *podTemplateHandler) handlePodLimitRange(ctx context.Context, pt *corev1.PodTemplate) {
func (r *WorkloadReconciler) handlePodLimitRange(ctx context.Context, wl *kueue.Workload, pts []corev1.PodTemplate) {
log := ctrl.LoggerFrom(ctx)
// get the list of limit ranges
var list corev1.LimitRangeList
if err := h.client.List(ctx, &list, &client.ListOptions{Namespace: pt.Namespace}, client.MatchingFields{indexer.LimitRangeHasContainerType: "true"}); err != nil {
if err := r.client.List(ctx, &list, &client.ListOptions{Namespace: wl.Namespace}, client.MatchingFields{indexer.LimitRangeHasContainerType: "true"}); err != nil {
log.Error(err, "Could not list LimitRanges")
return
}
Expand All @@ -533,54 +565,51 @@ func (h *podTemplateHandler) handlePodLimitRange(ctx context.Context, pt *corev1
return
}

pod := &pt.Template.Spec
for ci := range pod.InitContainers {
res := &pod.InitContainers[ci].Resources
res.Limits = resource.MergeResourceListKeepFirst(res.Limits, containerLimits.Default)
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, containerLimits.DefaultRequest)
}
for ci := range pod.Containers {
res := &pod.Containers[ci].Resources
res.Limits = resource.MergeResourceListKeepFirst(res.Limits, containerLimits.Default)
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, containerLimits.DefaultRequest)
for pi := range pts {
pod := &pts[pi].Template.Spec
for ci := range pod.InitContainers {
res := &pod.InitContainers[ci].Resources
res.Limits = resource.MergeResourceListKeepFirst(res.Limits, containerLimits.Default)
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, containerLimits.DefaultRequest)
}
for ci := range pod.Containers {
res := &pod.Containers[ci].Resources
res.Limits = resource.MergeResourceListKeepFirst(res.Limits, containerLimits.Default)
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, containerLimits.DefaultRequest)
}
}
}

func (h *podTemplateHandler) handleLimitsToRequests(pt *corev1.PodTemplate) {
pod := &pt.Template.Spec
for ci := range pod.InitContainers {
res := &pod.InitContainers[ci].Resources
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, res.Limits)
}
for ci := range pod.Containers {
res := &pod.Containers[ci].Resources
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, res.Limits)
func (r *WorkloadReconciler) handleLimitsToRequests(pts []corev1.PodTemplate) {
for pi := range pts {
pod := &pts[pi].Template.Spec
for ci := range pod.InitContainers {
res := &pod.InitContainers[ci].Resources
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, res.Limits)
}
for ci := range pod.Containers {
res := &pod.Containers[ci].Resources
res.Requests = resource.MergeResourceListKeepFirst(res.Requests, res.Limits)
}
}
}

func (h *podTemplateHandler) adjustResources(ctx context.Context, obj client.Object) {
pt := obj.(*corev1.PodTemplate)
if pt.Name == "" {
return
}
h.handlePodOverhead(ctx, pt)
h.handlePodLimitRange(ctx, pt)
h.handleLimitsToRequests(pt)
if err := h.client.Update(ctx, pt); err != nil {
return
}
func (r *WorkloadReconciler) adjustResources(ctx context.Context, wl *kueue.Workload, pts []corev1.PodTemplate) {
r.handlePodOverhead(ctx, pts)
r.handlePodLimitRange(ctx, wl, pts)
r.handleLimitsToRequests(pts)
}

func (r *WorkloadReconciler) extractPodTemplates(ctx context.Context, wl kueue.Workload) []corev1.PodTemplate {
podTemplates := make([]corev1.PodTemplate, 0, len(wl.Spec.PodSets))
for _, ps := range wl.Spec.PodSets {
var pt corev1.PodTemplate
if err := r.client.Get(ctx, client.ObjectKey{Name: fmt.Sprintf("%s-%s", wl.Name, ps.Name), Namespace: wl.Namespace}, &pt); err != nil {
continue
}
podTemplates = append(podTemplates, pt)
log := ctrl.LoggerFrom(ctx)
labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{constants.WorkloadNameSource: wl.Name}}
var pts corev1.PodTemplateList
if err := r.client.List(ctx, &pts, &client.ListOptions{LabelSelector: labels.Set(labelSelector.MatchLabels).AsSelector(), Namespace: wl.Namespace}); err != nil {
log.V(2).Error(err, "Unable to find list of podtemplates")
return nil
}
return podTemplates
log.V(2).Info("extractPodTemplates", "pt", pts.Items)
return pts.Items
}

type resourceUpdatesHandler struct {
Expand Down Expand Up @@ -620,7 +649,7 @@ func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object,
case *nodev1.RuntimeClass:
log := ctrl.LoggerFrom(ctx).WithValues("runtimeClass", klog.KObj(v))
ctx = ctrl.LoggerInto(ctx, log)
h.queueReconcileForPending(ctx, q, client.MatchingFields{indexer.WorkloadRuntimeClassKey: v.Name})
h.queueReconcileForPending(ctx, q)
default:
panic(v)
}
Expand All @@ -634,12 +663,14 @@ func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _
if err != nil {
log.Error(err, "Could not list pending workloads")
}
log.V(4).Info("Updating pending workload requests", "count", len(lst.Items))
log.V(2).Info("Updating pending workload requests", "count", len(lst.Items), "opts", opts)
for _, w := range lst.Items {
wlCopy := w.DeepCopy()
log := log.WithValues("workload", klog.KObj(wlCopy))
log.V(5).Info("Queue reconcile for")
if !h.r.queues.AddOrUpdateWorkload(wlCopy, h.r.extractPodTemplates(ctx, w)) {
pts := h.r.extractPodTemplates(ctx, w)
h.r.adjustResources(ctx, wlCopy, pts)
if !h.r.queues.AddOrUpdateWorkload(wlCopy, pts) {
log.V(2).Info("Queue for workload didn't exist")
}
}
Expand All @@ -658,23 +689,20 @@ func (h *podTemplateHandler) Create(ctx context.Context, e event.CreateEvent, q
ctx = ctrl.LoggerInto(ctx, log)
log.V(5).Info("Create event")
h.handle(ctx, e.Object, q)
h.adjustResources(ctx, e.Object)
}

func (h *podTemplateHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
log := ctrl.LoggerFrom(ctx).WithValues("kind", e.ObjectNew.GetObjectKind())
ctx = ctrl.LoggerInto(ctx, log)
log.V(5).Info("Update event")
h.handle(ctx, e.ObjectNew, q)
h.adjustResources(ctx, e.ObjectNew)
}

func (h *podTemplateHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
log := ctrl.LoggerFrom(ctx).WithValues("kind", e.Object.GetObjectKind())
ctx = ctrl.LoggerInto(ctx, log)
log.V(5).Info("Delete event")
h.handle(ctx, e.Object, q)
h.adjustResources(ctx, e.Object)
}

func (h *podTemplateHandler) Generic(context.Context, event.GenericEvent, workqueue.RateLimitingInterface) {
Expand Down
38 changes: 38 additions & 0 deletions pkg/controller/jobframework/podtemplate_names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package jobframework

import (
"crypto/sha1"
"encoding/hex"
"fmt"
)

func GetPodTemplateName(workloadName, podSetName string) string {
fullName := fmt.Sprintf("%s-%s", workloadName, podSetName)
return limitObjectName(fullName)
}

func limitObjectName(fullName string) string {
if len(fullName) <= maxPrefixLength {
return fullName
}
h := sha1.New()
h.Write([]byte(fullName))
hashBytes := hex.EncodeToString(h.Sum(nil))
return fmt.Sprintf("%s-%s", fullName[:maxPrefixLength], hashBytes[:hashLength])
}
6 changes: 4 additions & 2 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,9 +731,11 @@ func newPodTemplate(ps kueue.PodSet, job GenericJob, object client.Object) *core
return &corev1.PodTemplate{
Template: *ps.Template.DeepCopy(),
ObjectMeta: metav1.ObjectMeta{
//Name: fmt.Sprintf("%s-%s", GetWorkloadNameForOwnerWithGVK(object.GetName(), job.GVK()), ps.Name),
Name: ps.Name,
Name: GetPodTemplateName(object.GetName(), ps.Name),
Namespace: object.GetNamespace(),
Labels: map[string]string{
constants.WorkloadNameSource: GetWorkloadNameForOwnerWithGVK(object.GetName(), job.GVK()),
},
},
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,18 @@ func (w *PodTemplateWrapper) Request(r corev1.ResourceName, q string) *PodTempla
return w
}

func (w *PodTemplateWrapper) Limit(r corev1.ResourceName, q string) *PodTemplateWrapper {
res := &w.Template.Spec.Containers[0].Resources
if res.Limits == nil {
res.Limits = corev1.ResourceList{
r: resource.MustParse(q),
}
} else {
res.Limits[r] = resource.MustParse(q)
}
return w
}

func (w *PodTemplateWrapper) Toleration(t corev1.Toleration) *PodTemplateWrapper {
w.Template.Spec.Tolerations = append(w.Template.Spec.Tolerations, t)
return w
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/testingjobs/rayjob/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func MakeJob(name, ns string) *JobWrapper {
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "head-container",
Name: "head-container",
Image: "pause",
},
},
},
Expand All @@ -63,7 +64,8 @@ func MakeJob(name, ns string) *JobWrapper {
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "worker-container",
Name: "worker-container",
Image: "pause",
},
},
},
Expand Down

0 comments on commit fa11294

Please sign in to comment.