diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 45bcfdba4a..08d27554f6 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -230,7 +230,17 @@ type Integrations struct { // - "kubeflow.org/pytorchjob" // - "kubeflow.org/tfjob" // - "kubeflow.org/xgboostjob" + // - "pod" Frameworks []string `json:"frameworks,omitempty"` + // PodOptions defines kueue controller behaviour for pod objects + PodOptions *PodIntegrationOptions `json:"podOptions,omitempty"` +} + +type PodIntegrationOptions struct { + // NamespaceSelector can be used to omit some namespaces from pod reconciliation + NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"` + // PodSelector can be used to choose what pods to reconcile + PodSelector *metav1.LabelSelector `json:"podSelector,omitempty"` } type QueueVisibility struct { diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index b1cf6a0b15..d873af7e4d 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -129,4 +129,26 @@ func SetDefaults_Configuration(cfg *Configuration) { MaxCount: DefaultClusterQueuesMaxCount, } } + + if cfg.Integrations.PodOptions == nil { + cfg.Integrations.PodOptions = &PodIntegrationOptions{} + } + + if cfg.Integrations.PodOptions.NamespaceSelector == nil { + matchExpressionsValues := []string{"kube-system", *cfg.Namespace} + + cfg.Integrations.PodOptions.NamespaceSelector = &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: matchExpressionsValues, + }, + }, + } + } + + if cfg.Integrations.PodOptions.PodSelector == nil { + cfg.Integrations.PodOptions.PodSelector = &metav1.LabelSelector{} + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index b2ef277ea0..933293ae91 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -54,6 +54,18 @@ func TestSetDefaults_Configuration(t *testing.T) { } defaultIntegrations := &Integrations{ Frameworks: []string{job.FrameworkName}, + PodOptions: &PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, } defaultQueueVisibility := &QueueVisibility{ UpdateIntervalSeconds: DefaultQueueVisibilityUpdateIntervalSeconds, @@ -61,6 +73,23 @@ func TestSetDefaults_Configuration(t *testing.T) { MaxCount: 10, }, } + + overwriteNamespaceIntegrations := &Integrations{ + Frameworks: []string{job.FrameworkName}, + PodOptions: &PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", overwriteNamespace}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, + } + podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute} @@ -217,7 +246,7 @@ func TestSetDefaults_Configuration(t *testing.T) { WebhookSecretName: ptr.To(DefaultWebhookSecretName), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, QueueVisibility: defaultQueueVisibility, }, }, @@ -235,7 +264,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, QueueVisibility: defaultQueueVisibility, }, }, @@ -260,7 +289,7 @@ func TestSetDefaults_Configuration(t *testing.T) { QPS: ptr.To[float32](123.0), Burst: ptr.To[int32](456), }, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, QueueVisibility: defaultQueueVisibility, }, }, @@ -279,7 +308,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, QueueVisibility: defaultQueueVisibility, }, }, @@ -377,6 +406,7 @@ func TestSetDefaults_Configuration(t *testing.T) { ClientConnection: defaultClientConnection, Integrations: &Integrations{ Frameworks: []string{"a", "b"}, + PodOptions: defaultIntegrations.PodOptions, }, QueueVisibility: defaultQueueVisibility, }, diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 62b9bcde31..123e912615 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -236,6 +236,11 @@ func (in *Integrations) DeepCopyInto(out *Integrations) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.PodOptions != nil { + in, out := &in.PodOptions, &out.PodOptions + *out = new(PodIntegrationOptions) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Integrations. @@ -278,6 +283,31 @@ func (in *InternalCertManagement) DeepCopy() *InternalCertManagement { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodIntegrationOptions) DeepCopyInto(out *PodIntegrationOptions) { + *out = *in + if in.NamespaceSelector != nil { + in, out := &in.NamespaceSelector, &out.NamespaceSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.PodSelector != nil { + in, out := &in.PodSelector, &out.PodSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodIntegrationOptions. +func (in *PodIntegrationOptions) DeepCopy() *PodIntegrationOptions { + if in == nil { + return nil + } + out := new(PodIntegrationOptions) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *QueueVisibility) DeepCopyInto(out *QueueVisibility) { *out = *in diff --git a/charts/kueue/templates/rbac/role.yaml b/charts/kueue/templates/rbac/role.yaml index 14d9316bf0..821f280595 100644 --- a/charts/kueue/templates/rbac/role.yaml +++ b/charts/kueue/templates/rbac/role.yaml @@ -28,6 +28,24 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - pods + verbs: + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - "" + resources: + - pods/status + verbs: + - get + - patch - apiGroups: - "" resources: diff --git a/charts/kueue/templates/webhook/webhook.yaml b/charts/kueue/templates/webhook/webhook.yaml index 99d4bebf76..03202c6230 100644 --- a/charts/kueue/templates/webhook/webhook.yaml +++ b/charts/kueue/templates/webhook/webhook.yaml @@ -102,6 +102,32 @@ webhooks: resources: - mpijobs sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ include "kueue.fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /mutate--v1-pod + failurePolicy: Fail + name: mpod.kb.io + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: + - kube-system + - '{{ .Release.Namespace }}' + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + sideEffects: None --- apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration @@ -272,3 +298,30 @@ webhooks: resources: - rayjobs sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: '{{ include "kueue.fullname" . }}-webhook-service' + namespace: '{{ .Release.Namespace }}' + path: /validate--v1-pod + failurePolicy: Fail + name: vpod.kb.io + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: + - kube-system + - '{{ .Release.Namespace }}' + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - pods + sideEffects: None diff --git a/charts/kueue/values.yaml b/charts/kueue/values.yaml index 5ae7484a2e..8c330c3faf 100644 --- a/charts/kueue/values.yaml +++ b/charts/kueue/values.yaml @@ -91,6 +91,13 @@ managerConfig: - "kubeflow.org/pytorchjob" - "kubeflow.org/tfjob" - "kubeflow.org/xgboostjob" + # - "pod" + # podOptions: + # namespaceSelector: + # matchExpressions: + # - key: kubernetes.io/metadata.name + # operator: NotIn + # values: [ kube-system, kueue-system ] # ports definition for metricsService and webhookService. metricsService: ports: diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 35dfe020ba..d7fcb428ad 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -18,6 +18,7 @@ package main import ( "context" + "errors" "flag" "fmt" "os" @@ -66,8 +67,9 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + errPodIntegration = errors.New("pod integration only supported in Kubernetes 1.27 or newer") ) func init() { @@ -239,6 +241,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag log.Error(err, "Unable to create controller") return err } + if name == "pod" { + v := serverVersionFetcher.GetServerVersion() + if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) { + setupLog.Error(errPodIntegration, + "Failed to configure reconcilers", + "kubernetesVersion", v) + os.Exit(1) + } + + opts = append( + opts, + jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector), + jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector), + ) + } if err = cb.SetupWebhook(mgr, opts...); err != nil { log.Error(err, "Unable to create webhook") return err @@ -299,6 +316,11 @@ func setupServerVersionFetcher(mgr ctrl.Manager, kubeConfig *rest.Config) *kubev os.Exit(1) } + if err := serverVersionFetcher.FetchServerVersion(); err != nil { + setupLog.Error(err, "failed to fetch kubernetes server version") + os.Exit(1) + } + return serverVersionFetcher } diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 4aff834924..86acb13183 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -98,6 +98,18 @@ integrations: // referencing job.FrameworkName ensures the link of job package // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, + PodOptions: &config.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, }, QueueVisibility: &config.QueueVisibility{ UpdateIntervalSeconds: config.DefaultQueueVisibilityUpdateIntervalSeconds, @@ -110,7 +122,7 @@ integrations: { name: "bad integrations config", configFile: badIntegrationsConfig, - wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"ray.io/rayjob\""), + wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"pod\", \"ray.io/rayjob\""), }, } diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml index ae1cb45cf5..ba507e459d 100644 --- a/config/components/manager/controller_manager_config.yaml +++ b/config/components/manager/controller_manager_config.yaml @@ -38,3 +38,10 @@ integrations: - "kubeflow.org/pytorchjob" - "kubeflow.org/tfjob" - "kubeflow.org/xgboostjob" +# - "pod" +# podOptions: +# namespaceSelector: +# matchExpressions: +# - key: kubernetes.io/metadata.name +# operator: NotIn +# values: [ kube-system, kueue-system ] diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 9a6936ab66..3cc25087d2 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -29,6 +29,24 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - pods/status + verbs: + - get + - patch - apiGroups: - "" resources: diff --git a/config/components/webhook/kustomization.yaml b/config/components/webhook/kustomization.yaml index 9cf26134e4..1c17012696 100644 --- a/config/components/webhook/kustomization.yaml +++ b/config/components/webhook/kustomization.yaml @@ -4,3 +4,33 @@ resources: configurations: - kustomizeconfig.yaml + +patches: +- patch: |- + apiVersion: admissionregistration.k8s.io/v1 + kind: MutatingWebhookConfiguration + metadata: + name: mutating-webhook-configuration + webhooks: + - name: mpod.kb.io + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: + - kube-system + - kueue-system +- patch: |- + apiVersion: admissionregistration.k8s.io/v1 + kind: ValidatingWebhookConfiguration + metadata: + name: validating-webhook-configuration + webhooks: + - name: vpod.kb.io + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: + - kube-system + - kueue-system diff --git a/config/components/webhook/manifests.yaml b/config/components/webhook/manifests.yaml index 75eb1c5d4a..5a430da538 100644 --- a/config/components/webhook/manifests.yaml +++ b/config/components/webhook/manifests.yaml @@ -137,6 +137,25 @@ webhooks: resources: - mpijobs sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /mutate--v1-pod + failurePolicy: Fail + name: mpod.kb.io + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + resources: + - pods + sideEffects: None - admissionReviewVersions: - v1 clientConfig: @@ -360,6 +379,26 @@ webhooks: resources: - mpijobs sideEffects: None +- admissionReviewVersions: + - v1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate--v1-pod + failurePolicy: Fail + name: vpod.kb.io + rules: + - apiGroups: + - "" + apiVersions: + - v1 + operations: + - CREATE + - UPDATE + resources: + - pods + sideEffects: None - admissionReviewVersions: - v1 clientConfig: diff --git a/pkg/config/config.go b/pkg/config/config.go index 8e4f0b1bb1..3a7122f96d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -162,7 +162,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co return options, cfg, err } } - if err = validate(&cfg).ToAggregate(); err != nil { + if err := validate(&cfg).ToAggregate(); err != nil { return options, cfg, err } addTo(&options, &cfg) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index dcc5686079..ec69adb40a 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -235,6 +235,27 @@ queueVisibility: `), os.FileMode(0600)); err != nil { t.Fatal(err) } + podIntegrationOptionsConfig := filepath.Join(tmpDir, "podIntegrationOptions.yaml") + if err := os.WriteFile(podIntegrationOptionsConfig, []byte(` +apiVersion: config.kueue.x-k8s.io/v1beta1 +kind: Configuration +integrations: + frameworks: + - pod + podOptions: + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: [ kube-system, kueue-system, prohibited-namespace ] + podSelector: + matchExpressions: + - key: kueue-job + operator: In + values: [ "true", "True", "yes" ] +`), os.FileMode(0600)); err != nil { + t.Fatal(err) + } defaultControlOptions := ctrl.Options{ HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, @@ -276,6 +297,18 @@ queueVisibility: defaultIntegrations := &configapi.Integrations{ Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, } defaultQueueVisibility := &configapi.QueueVisibility{ @@ -337,8 +370,22 @@ queueVisibility: ManageJobsWithoutQueueName: false, InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, - QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-tenant-a"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, + }, + QueueVisibility: defaultQueueVisibility, }, wantOptions: defaultControlOptions, }, @@ -556,6 +603,18 @@ queueVisibility: // referencing job.FrameworkName ensures the link of job package // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, }, QueueVisibility: defaultQueueVisibility, }, @@ -603,6 +662,57 @@ queueVisibility: }, }, }, + { + name: "pod integration options config", + configFile: podIntegrationOptionsConfig, + wantConfiguration: configapi.Configuration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: configapi.GroupVersion.String(), + Kind: "Configuration", + }, + Namespace: ptr.To(configapi.DefaultNamespace), + ManageJobsWithoutQueueName: false, + InternalCertManagement: enableDefaultInternalCertManagement, + ClientConnection: defaultClientConnection, + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{ + "pod", + }, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system", "prohibited-namespace"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kueue-job", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"true", "True", "yes"}, + }, + }, + }, + }, + }, + }, + wantOptions: ctrl.Options{ + HealthProbeBindAddress: configapi.DefaultHealthProbeBindAddress, + Metrics: metricsserver.Options{ + BindAddress: configapi.DefaultMetricsBindAddress, + }, + WebhookServer: &webhook.DefaultServer{ + Options: webhook.Options{ + Port: configapi.DefaultWebhookPort, + }, + }, + }, + }, } for _, tc := range testcases { @@ -686,6 +796,16 @@ func TestEncode(t *testing.T) { "manageJobsWithoutQueueName": false, "integrations": map[string]any{ "frameworks": []any{"batch/job"}, + "podOptions": map[string]any{ + "namespaceSelector": map[string]any{ + "matchExpressions": []any{map[string]any{ + "key": "kubernetes.io/metadata.name", + "operator": "NotIn", + "values": []any{"kube-system", "kueue-system"}, + }}, + }, + "podSelector": map[string]any{}, + }, }, "queueVisibility": map[string]any{ "updateIntervalSeconds": int64(configapi.DefaultQueueVisibilityUpdateIntervalSeconds), diff --git a/pkg/config/validation.go b/pkg/config/validation.go index 1204cceb9b..21e3e3386d 100644 --- a/pkg/config/validation.go +++ b/pkg/config/validation.go @@ -1,9 +1,30 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package config import ( "fmt" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/utils/strings/slices" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" ) @@ -13,7 +34,25 @@ const ( queueVisibilityClusterQueuesUpdateIntervalSeconds = 1 ) -func validate(cfg *configapi.Configuration) field.ErrorList { +var ( + integrationsPath = field.NewPath("integrations") + integrationsFrameworksPath = integrationsPath.Child("frameworks") + podOptionsPath = integrationsPath.Child("podOptions") + namespaceSelectorPath = podOptionsPath.Child("namespaceSelector") +) + +func validate(c *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + + allErrs = append(allErrs, validateQueueVisibility(c)...) + + // Validate PodNamespaceSelector for the pod framework + allErrs = append(allErrs, validateIntegrations(c)...) + + return allErrs +} + +func validateQueueVisibility(cfg *configapi.Configuration) field.ErrorList { var allErrs field.ErrorList if cfg.QueueVisibility != nil { queueVisibilityPath := field.NewPath("queueVisibility") @@ -29,3 +68,56 @@ func validate(cfg *configapi.Configuration) field.ErrorList { } return allErrs } + +func validateIntegrations(c *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + + if c.Integrations == nil { + return field.ErrorList{field.Required(integrationsPath, "cannot be empty")} + } + + if c.Integrations.Frameworks == nil { + return field.ErrorList{field.Required(integrationsFrameworksPath, "cannot be empty")} + } + + allErrs = append(allErrs, validatePodIntegrationOptions(c)...) + + return allErrs +} + +func validatePodIntegrationOptions(c *configapi.Configuration) field.ErrorList { + var allErrs field.ErrorList + + if !slices.Contains(c.Integrations.Frameworks, "pod") { + return allErrs + } + + if c.Integrations.PodOptions == nil { + return field.ErrorList{field.Required(podOptionsPath, "cannot be empty when pod integration is enabled")} + } + if c.Integrations.PodOptions.NamespaceSelector == nil { + return field.ErrorList{field.Required(namespaceSelectorPath, "a namespace selector is required")} + } + + prohibitedNamespaces := []labels.Set{{corev1.LabelMetadataName: "kube-system"}} + + if c.Namespace != nil && *c.Namespace != "" { + prohibitedNamespaces = append(prohibitedNamespaces, labels.Set{corev1.LabelMetadataName: *c.Namespace}) + } + + allErrs = append(allErrs, validation.ValidateLabelSelector(c.Integrations.PodOptions.NamespaceSelector, validation.LabelSelectorValidationOptions{}, namespaceSelectorPath)...) + + selector, err := metav1.LabelSelectorAsSelector(c.Integrations.PodOptions.NamespaceSelector) + if err != nil { + return allErrs + } + + for _, pn := range prohibitedNamespaces { + if selector.Matches(pn) { + allErrs = append(allErrs, field.Invalid(namespaceSelectorPath, c.Integrations.PodOptions.NamespaceSelector, + fmt.Sprintf("should not match the %q namespace", pn[corev1.LabelMetadataName]))) + } + } + + return allErrs +} diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go index afd6eb2227..a7abd25a4c 100644 --- a/pkg/config/validation_test.go +++ b/pkg/config/validation_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package config import ( @@ -6,36 +22,64 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/utils/ptr" configapi "sigs.k8s.io/kueue/apis/config/v1beta1" ) func TestValidate(t *testing.T) { - testcases := []struct { - name string + defaultQueueVisibility := &configapi.QueueVisibility{ + UpdateIntervalSeconds: configapi.DefaultQueueVisibilityUpdateIntervalSeconds, + ClusterQueues: &configapi.ClusterQueueVisibility{ + MaxCount: configapi.DefaultClusterQueuesMaxCount, + }, + } + + defaultPodIntegrationOptions := &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + } + + defaultIntegrations := &configapi.Integrations{ + Frameworks: []string{"batch/job"}, + PodOptions: defaultPodIntegrationOptions, + } + + testCases := map[string]struct { cfg *configapi.Configuration wantErr field.ErrorList }{ - - { - name: "empty", - cfg: &configapi.Configuration{}, - wantErr: nil, + "empty": { + cfg: &configapi.Configuration{}, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "integrations", + }, + }, }, - { - name: "invalid queue visibility UpdateIntervalSeconds", + "invalid queue visibility UpdateIntervalSeconds": { cfg: &configapi.Configuration{ QueueVisibility: &configapi.QueueVisibility{ UpdateIntervalSeconds: 0, }, + Integrations: defaultIntegrations, }, wantErr: field.ErrorList{ field.Invalid(field.NewPath("queueVisibility").Child("updateIntervalSeconds"), 0, fmt.Sprintf("greater than or equal to %d", queueVisibilityClusterQueuesUpdateIntervalSeconds)), }, }, - { - name: "invalid queue visibility cluster queue max count", + "invalid queue visibility cluster queue max count": { cfg: &configapi.Configuration{ QueueVisibility: &configapi.QueueVisibility{ ClusterQueues: &configapi.ClusterQueueVisibility{ @@ -43,15 +87,137 @@ func TestValidate(t *testing.T) { }, UpdateIntervalSeconds: 1, }, + Integrations: defaultIntegrations, }, wantErr: field.ErrorList{ field.Invalid(field.NewPath("queueVisibility").Child("clusterQueues").Child("maxCount"), 4001, fmt.Sprintf("must be less than %d", queueVisibilityClusterQueuesMaxValue)), }, }, + "nil PodIntegrationOptions": { + cfg: &configapi.Configuration{ + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: nil, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "integrations.podOptions", + }, + }, + }, + "nil PodIntegrationOptions.NamespaceSelector": { + cfg: &configapi.Configuration{ + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: nil, + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "integrations.podOptions.namespaceSelector", + }, + }, + }, + "emptyLabelSelector": { + cfg: &configapi.Configuration{ + Namespace: ptr.To("kueue-system"), + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{}, + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "integrations.podOptions.namespaceSelector", + }, + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "integrations.podOptions.namespaceSelector", + }, + }, + }, + "prohibited namespace in MatchLabels": { + cfg: &configapi.Configuration{ + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "kubernetes.io/metadata.name": "kube-system", + }, + }, + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "integrations.podOptions.namespaceSelector", + }, + }, + }, + "prohibited namespace in MatchExpressions with operator In": { + cfg: &configapi.Configuration{ + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"kube-system"}, + }, + }, + }, + }, + }, + }, + wantErr: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeInvalid, + Field: "integrations.podOptions.namespaceSelector", + }, + }, + }, + "prohibited namespace in MatchExpressions with operator NotIn": { + cfg: &configapi.Configuration{ + QueueVisibility: defaultQueueVisibility, + Integrations: &configapi.Integrations{ + Frameworks: []string{"pod"}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + }, + }, + }, + wantErr: nil, + }, } - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" { + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if diff := cmp.Diff(tc.wantErr, validate(tc.cfg), cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")); diff != "" { t.Errorf("Unexpected returned error (-want,+got):\n%s", diff) } }) diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index 21ce46de8e..533f7baac4 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -63,7 +63,19 @@ type JobWithCustomStop interface { // Stop implements a custom stop procedure. // The function should be idempotent: not do any API calls if the job is already stopped. // Returns whether the Job stopped with this call or an error - Stop(ctx context.Context, c client.Client, podSetsInfo []PodSetInfo) (bool, error) + Stop(ctx context.Context, c client.Client, podSetsInfo []PodSetInfo, eventMsg string) (bool, error) +} + +// JobWithFinalize interface should be implemented by generic jobs, +// when custom finalization logic is needed for a job, after it's finished. +type JobWithFinalize interface { + Finalize(ctx context.Context, c client.Client) error +} + +// JobWithSkip interface should be implemented by generic jobs, +// when reconciliation should be skipped depending on the job's state +type JobWithSkip interface { + Skip() bool } type JobWithPriorityClass interface { diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index 0476616cb3..bb6f4a44de 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -69,6 +69,8 @@ type Options struct { ManageJobsWithoutQueueName bool WaitForPodsReady bool KubeServerVersion *kubeversion.ServerVersionFetcher + PodNamespaceSelector *metav1.LabelSelector + PodSelector *metav1.LabelSelector } // Option configures the reconciler. @@ -97,6 +99,22 @@ func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option { } } +// WithPodNamespaceSelector adds rules to reconcile pods only in particular +// namespaces. +func WithPodNamespaceSelector(s *metav1.LabelSelector) Option { + return func(o *Options) { + o.PodNamespaceSelector = s + } +} + +// WithPodSelector adds rules to reconcile pods only with particular +// labels. +func WithPodSelector(s *metav1.LabelSelector) Option { + return func(o *Options) { + o.PodSelector = s + } +} + var DefaultOptions = Options{} func NewReconciler( @@ -123,6 +141,12 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques err := r.client.Get(ctx, req.NamespacedName, object) + if jws, implements := job.(JobWithSkip); implements { + if jws.Skip() { + return ctrl.Result{}, nil + } + } + if apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero() { workloads := kueue.WorkloadList{} if err := r.client.List(ctx, &workloads, client.InNamespace(req.Namespace), @@ -221,6 +245,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if err != nil { log.Error(err, "Updating workload status") } + // Execute job finalization logic + if err := r.finalizeJob(ctx, job); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, nil } @@ -399,8 +427,15 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o // than one workload... w = &workloads.Items[0] } - if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil { - return nil, fmt.Errorf("stopping job with no matching workload: %w", err) + + if _, finished := job.Finished(); finished { + if err := r.finalizeJob(ctx, job); err != nil { + return nil, fmt.Errorf("finalizing job with no matching workload: %w", err) + } + } else { + if err := r.stopJob(ctx, job, object, w, "No matching Workload"); err != nil { + return nil, fmt.Errorf("stopping job with no matching workload: %w", err) + } } } @@ -500,7 +535,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie info := getPodSetsInfoFromWorkload(wl) if jws, implements := job.(JobWithCustomStop); implements { - stoppedNow, err := jws.Stop(ctx, r.client, info) + stoppedNow, err := jws.Stop(ctx, r.client, info, eventMsg) if stoppedNow { r.record.Eventf(object, corev1.EventTypeNormal, "Stopped", eventMsg) } @@ -523,6 +558,16 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie return nil } +func (r *JobReconciler) finalizeJob(ctx context.Context, job GenericJob) error { + if jwf, implements := job.(JobWithFinalize); implements { + if err := jwf.Finalize(ctx, r.client); err != nil { + return err + } + } + + return nil +} + func (r *JobReconciler) removeFinalizer(ctx context.Context, wl *kueue.Workload) error { if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) { return r.client.Update(ctx, wl) diff --git a/pkg/controller/jobframework/reconciler_test.go b/pkg/controller/jobframework/reconciler_test.go index bcdf9ab455..a188b7edcc 100644 --- a/pkg/controller/jobframework/reconciler_test.go +++ b/pkg/controller/jobframework/reconciler_test.go @@ -57,7 +57,7 @@ func TestIsParentJobManaged(t *testing.T) { UID(parentJobName). Obj(), job: testingjob.MakeJob(childJobName, jobNamespace). - OwnerReference(parentJobName, batchv1.SchemeGroupVersion.WithKind("Job")). + OwnerReference(parentJobName, batchv1.SchemeGroupVersion.WithKind("CronJob")). Obj(), wantErr: ErrUnknownWorkloadOwner, }, diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 7708338637..88f3b6f04f 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -57,10 +57,11 @@ const ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupWebhook, - JobType: &batchv1.Job{}, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupWebhook, + JobType: &batchv1.Job{}, + IsManagingObjectsOwner: isJob, })) } @@ -80,7 +81,12 @@ var NewReconciler = jobframework.NewGenericReconciler( return &Job{} }, func(c client.Client) handler.EventHandler { return &parentWorkloadHandler{client: c} - }) + }, +) + +func isJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "Job" && owner.APIVersion == gvk.GroupVersion().String() +} type parentWorkloadHandler struct { client client.Client @@ -152,7 +158,7 @@ func (j *Job) Suspend() { j.Spec.Suspend = ptr.To(true) } -func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []jobframework.PodSetInfo) (bool, error) { +func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []jobframework.PodSetInfo, eventMsg string) (bool, error) { stoppedNow := false if !j.IsSuspended() { j.Suspend() diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index 9c9185bb53..a490c3700c 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -529,7 +529,7 @@ func TestReconciler(t *testing.T) { "should get error if workload owner is unknown": { job: *utiltestingjob.MakeJob("job", "ns"). ParentWorkload("non-existing-parent-workload"). - OwnerReference("parent", batchv1.SchemeGroupVersion.WithKind("Job")). + OwnerReference("parent", batchv1.SchemeGroupVersion.WithKind("CronJob")). Obj(), wantJob: *utiltestingjob.MakeJob("job", "ns").Obj(), wantErr: jobframework.ErrUnknownWorkloadOwner, diff --git a/pkg/controller/jobs/jobs.go b/pkg/controller/jobs/jobs.go index 23c801ffe8..16a932372a 100644 --- a/pkg/controller/jobs/jobs.go +++ b/pkg/controller/jobs/jobs.go @@ -22,5 +22,6 @@ import ( _ "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs" _ "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + _ "sigs.k8s.io/kueue/pkg/controller/jobs/pod" _ "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" ) diff --git a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go index 83a26545fb..1d71775a10 100644 --- a/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/paddlejob/paddlejob_controller.go @@ -18,8 +18,10 @@ package paddlejob import ( "context" + "strings" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,11 +38,12 @@ var ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupPaddleJobWebhook, - JobType: &kftraining.PaddleJob{}, - AddToScheme: kftraining.AddToScheme, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupPaddleJobWebhook, + JobType: &kftraining.PaddleJob{}, + AddToScheme: kftraining.AddToScheme, + IsManagingObjectsOwner: isPaddleJob, })) } @@ -58,6 +61,10 @@ var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.Generi return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.PaddleJob{})} }, nil) +func isPaddleJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "PaddleJob" && strings.HasPrefix(owner.APIVersion, "kubeflow.org") +} + type JobControl kftraining.PaddleJob var _ kubeflowjob.KFJobControl = (*JobControl)(nil) diff --git a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go index 65d4fbda76..efdb0e45c8 100644 --- a/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/pytorchjob/pytorchjob_controller.go @@ -18,8 +18,10 @@ package pytorchjob import ( "context" + "strings" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,11 +38,12 @@ var ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupPyTorchJobWebhook, - JobType: &kftraining.PyTorchJob{}, - AddToScheme: kftraining.AddToScheme, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupPyTorchJobWebhook, + JobType: &kftraining.PyTorchJob{}, + AddToScheme: kftraining.AddToScheme, + IsManagingObjectsOwner: isPyTorchJob, })) } @@ -58,6 +61,10 @@ var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.Generi return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.PyTorchJob{})} }, nil) +func isPyTorchJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "PyTorchJob" && strings.HasPrefix(owner.APIVersion, "kubeflow.org") +} + type JobControl kftraining.PyTorchJob var _ kubeflowjob.KFJobControl = (*JobControl)(nil) diff --git a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go index 898bcee4b2..07fa3a8cab 100644 --- a/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/tfjob/tfjob_controller.go @@ -18,8 +18,10 @@ package tfjob import ( "context" + "strings" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,11 +38,12 @@ var ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupTFJobWebhook, - JobType: &kftraining.TFJob{}, - AddToScheme: kftraining.AddToScheme, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupTFJobWebhook, + JobType: &kftraining.TFJob{}, + AddToScheme: kftraining.AddToScheme, + IsManagingObjectsOwner: isTFJob, })) } @@ -58,6 +61,10 @@ var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.Generi return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.TFJob{})} }, nil) +func isTFJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "TFJob" && strings.HasPrefix(owner.APIVersion, "kubeflow.org") +} + type JobControl kftraining.TFJob var _ kubeflowjob.KFJobControl = (*JobControl)(nil) diff --git a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go index 1e57f7ccef..b5392abf50 100644 --- a/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go +++ b/pkg/controller/jobs/kubeflow/jobs/xgboostjob/xgboostjob_controller.go @@ -18,8 +18,10 @@ package xgboostjob import ( "context" + "strings" kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -36,11 +38,12 @@ var ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupXGBoostJobWebhook, - JobType: &kftraining.XGBoostJob{}, - AddToScheme: kftraining.AddToScheme, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupXGBoostJobWebhook, + JobType: &kftraining.XGBoostJob{}, + AddToScheme: kftraining.AddToScheme, + IsManagingObjectsOwner: isXGBoostJob, })) } @@ -58,6 +61,10 @@ var NewReconciler = jobframework.NewGenericReconciler(func() jobframework.Generi return &kubeflowjob.KubeflowJob{KFJobControl: (*JobControl)(&kftraining.XGBoostJob{})} }, nil) +func isXGBoostJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "XGBoostJob" && strings.HasPrefix(owner.APIVersion, "kubeflow.org") +} + type JobControl kftraining.XGBoostJob var _ kubeflowjob.KFJobControl = (*JobControl)(nil) diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go new file mode 100644 index 0000000000..c27521ce2d --- /dev/null +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -0,0 +1,246 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "fmt" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/maps" +) + +const ( + SchedulingGateName = "kueue.x-k8s.io/admission" + FrameworkName = "pod" + gateNotFound = -1 + ConditionTypeTerminationTarget = "TerminationTarget" +) + +var ( + gvk = corev1.SchemeGroupVersion.WithKind("Pod") +) + +func init() { + utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupWebhook, + JobType: &corev1.Pod{}, + })) +} + +// +kubebuilder:rbac:groups=scheduling.k8s.io,resources=priorityclasses,verbs=list;get;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;update;patch;delete +// +kubebuilder:rbac:groups="",resources=pods/status,verbs=get;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads/finalizers,verbs=update +// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch + +var NewReconciler = jobframework.NewGenericReconciler( + func() jobframework.GenericJob { + return &Pod{} + }, nil) + +type Pod corev1.Pod + +var _ jobframework.GenericJob = (*Pod)(nil) +var _ jobframework.JobWithCustomStop = (*Pod)(nil) +var _ jobframework.JobWithFinalize = (*Pod)(nil) + +func fromObject(o runtime.Object) *Pod { + return (*Pod)(o.(*corev1.Pod)) +} + +// Object returns the job instance. +func (p *Pod) Object() client.Object { + return (*corev1.Pod)(p) +} + +func (p *Pod) gateIndex() int { + for i := range p.Spec.SchedulingGates { + if p.Spec.SchedulingGates[i].Name == SchedulingGateName { + return i + } + } + return gateNotFound +} + +// IsSuspended returns whether the job is suspended or not. +func (p *Pod) IsSuspended() bool { + return p.gateIndex() != gateNotFound +} + +// Suspend will suspend the job. +func (p *Pod) Suspend() { + // Not implemented because this is not called when JobWithCustomStop is implemented. +} + +// RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it. +func (p *Pod) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error { + if len(podSetsInfo) != 1 { + return fmt.Errorf("%w: expecting 1 got %d", jobframework.ErrInvalidPodsetInfo, len(podSetsInfo)) + } + idx := p.gateIndex() + if idx != gateNotFound { + p.Spec.SchedulingGates = append(p.Spec.SchedulingGates[:idx], p.Spec.SchedulingGates[idx+1:]...) + } + + p.Spec.NodeSelector = maps.MergeKeepFirst(podSetsInfo[0].NodeSelector, p.Spec.NodeSelector) + + return nil + +} + +// RestorePodSetsInfo will restore the original node affinity and podSet counts of the job. +func (p *Pod) RestorePodSetsInfo(nodeSelectors []jobframework.PodSetInfo) bool { + // Not implemented since Pods cannot be updated, they can only be terminated. + return false +} + +// Finished means whether the job is completed/failed or not, +// condition represents the workload finished condition. +func (p *Pod) Finished() (metav1.Condition, bool) { + ph := p.Status.Phase + condition := metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: "JobFinished", + Message: "Job finished successfully", + } + if ph == corev1.PodFailed { + condition.Message = "Job failed" + } + + return condition, ph == corev1.PodSucceeded || ph == corev1.PodFailed +} + +// PodSets will build workload podSets corresponding to the job. +func (p *Pod) PodSets() []kueue.PodSet { + return []kueue.PodSet{ + { + Name: kueue.DefaultPodSetName, + Count: 1, + Template: corev1.PodTemplateSpec{ + Spec: *p.Spec.DeepCopy(), + }, + }, + } +} + +// IsActive returns true if there are any running pods. +func (p *Pod) IsActive() bool { + return p.Status.Phase == corev1.PodRunning +} + +// PodsReady instructs whether job derived pods are all ready now. +func (p *Pod) PodsReady() bool { + for i := range p.Status.Conditions { + c := &p.Status.Conditions[i] + if c.Type == corev1.PodReady { + return c.Status == corev1.ConditionTrue + } + } + return false +} + +// GVK returns GVK (Group Version Kind) for the job. +func (p *Pod) GVK() schema.GroupVersionKind { + return gvk +} + +func (p *Pod) Stop(ctx context.Context, c client.Client, _ []jobframework.PodSetInfo, eventMsg string) (bool, error) { + // The podset info is not relevant here, since this should mark the pod's end of life + pCopy := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: p.UID, + Name: p.Name, + Namespace: p.Namespace, + }, + TypeMeta: p.TypeMeta, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: ConditionTypeTerminationTarget, + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{ + Time: time.Now(), + }, + Reason: "StoppedByKueue", + Message: eventMsg, + }, + }, + }, + } + err := c.Status().Patch(ctx, pCopy, client.Apply, client.FieldOwner(constants.KueueName)) + if err == nil { + err = c.Delete(ctx, p.Object()) + } + if err == nil || apierrors.IsNotFound(err) { + return true, nil + } + return false, err +} + +func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { + return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk) +} + +func (p *Pod) Finalize(ctx context.Context, c client.Client) error { + if controllerutil.RemoveFinalizer(p.Object(), PodFinalizer) { + if err := c.Update(ctx, p.Object()); err != nil { + return err + } + } + + return nil +} + +func (p *Pod) Skip() bool { + // Skip pod reconciliation, if managed label is not set + if v, ok := p.GetLabels()[ManagedLabelKey]; !ok || v != ManagedLabelValue { + return true + } + + return false +} + +func IsPodOwnerManagedByKueue(p *Pod) bool { + if owner := metav1.GetControllerOf(p); owner != nil { + return jobframework.IsOwnerManagedByKueue(owner) || (owner.Kind == "RayCluster" && strings.HasPrefix(owner.APIVersion, "ray.io/v1alpha1")) + } + return false +} + +func GetWorkloadNameForPod(podName string) string { + return jobframework.GetWorkloadNameForOwnerWithGVK(podName, gvk) +} diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go new file mode 100644 index 0000000000..2b8090413d --- /dev/null +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -0,0 +1,507 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + _ "sigs.k8s.io/kueue/pkg/controller/jobs/job" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestPodsReady(t *testing.T) { + testCases := map[string]struct { + pod *corev1.Pod + want bool + }{ + "pod is ready": { + pod: testingpod.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + StatusConditions( + corev1.PodCondition{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + ). + Obj(), + want: true, + }, + "pod is not ready": { + pod: testingpod.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + StatusConditions(). + Obj(), + want: false, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pod := fromObject(tc.pod) + got := pod.PodsReady() + if tc.want != got { + t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) + } + }) + } +} + +func TestRunWithPodSetsInfo(t *testing.T) { + testCases := map[string]struct { + pod *corev1.Pod + runInfo, restoreInfo []jobframework.PodSetInfo + wantPod *corev1.Pod + wantErr error + }{ + "pod set info > 1": { + pod: testingpod.MakePod("test-pod", "test-namespace").Obj(), + runInfo: make([]jobframework.PodSetInfo, 2), + wantErr: jobframework.ErrInvalidPodsetInfo, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pod := fromObject(tc.pod) + + gotErr := pod.RunWithPodSetsInfo(tc.runInfo) + + if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { + t.Errorf("error mismatch (-want +got):\n%s", diff) + } + if tc.wantErr == nil { + if diff := cmp.Diff(tc.wantPod.Spec, tc.pod.Spec); diff != "" { + t.Errorf("pod spec mismatch (-want +got):\n%s", diff) + } + } + }) + } +} + +var ( + podCmpOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(corev1.Pod{}, "TypeMeta", "ObjectMeta.ResourceVersion", + "ObjectMeta.DeletionTimestamp"), + cmpopts.IgnoreFields(corev1.PodCondition{}, "LastTransitionTime"), + } + defaultWorkloadCmpOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b kueue.Workload) bool { + return a.Name < b.Name + }), + cmpopts.SortSlices(func(a, b metav1.Condition) bool { + return a.Type < b.Type + }), + cmpopts.IgnoreFields( + kueue.Workload{}, "TypeMeta", "ObjectMeta.OwnerReferences", + "ObjectMeta.Name", "ObjectMeta.ResourceVersion", + ), + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + } +) + +func TestReconciler(t *testing.T) { + basePodWrapper := testingpod.MakePod("pod", "ns"). + UID("test-uid"). + Queue("user-queue"). + Request(corev1.ResourceCPU, "1"). + Image("", nil) + + testCases := map[string]struct { + initObjects []client.Object + pod corev1.Pod + wantPod *corev1.Pod + workloads []kueue.Workload + wantWorkloads []kueue.Workload + wantErr error + workloadCmpOpts []cmp.Option + }{ + "scheduling gate is removed and node selector is added if workload is admitted": { + initObjects: []client.Object{ + utiltesting.MakeResourceFlavor("unit-test-flavor").Label("kubernetes.io/arch", "arm64").Obj(), + }, + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + NodeSelector("kubernetes.io/arch", "arm64"). + KueueFinalizer(). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + Obj(), + ). + Admitted(true). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + Obj(), + ). + Admitted(true). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "non-matching admitted workload is deleted and pod is finalized": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + Obj(), + wantPod: nil, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Obj(), + }, + wantErr: jobframework.ErrNoMatchingWorkloads, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the workload is created when queue name is set": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + *utiltesting.MakePodSet(kueue.DefaultPodSetName, 1). + Request(corev1.ResourceCPU, "1"). + SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}). + Obj(), + ). + Queue("test-queue"). + Priority(0). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the pod reconciliation is skipped when 'kueue.x-k8s.io/managed' label is not set": { + pod: *basePodWrapper. + Clone(). + Obj(), + wantPod: basePodWrapper. + Clone(). + Obj(), + wantWorkloads: []kueue.Workload{}, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "pod is stopped when workload is evicted": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + StatusConditions(corev1.PodCondition{ + Type: "TerminationTarget", + Status: corev1.ConditionTrue, + Reason: "StoppedByKueue", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Queue("test-queue"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Preempted", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Queue("test-queue"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Preempted", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "pod is finalized when it's succeeded": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + StatusPhase(corev1.PodSucceeded). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + StatusPhase(corev1.PodSucceeded). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Condition(metav1.Condition{ + Type: "Finished", + Status: "True", + Reason: "JobFinished", + Message: "Job finished successfully", + }). + Obj(), + }, + workloadCmpOpts: append( + defaultWorkloadCmpOpts, + // This is required because SSA doesn't work properly for the fake k8s client. + // Reconciler appends "Finished" condition using SSA. Fake client will just + // replace all the older conditions. + // See: https://github.com/kubernetes-sigs/controller-runtime/issues/2341 + cmpopts.IgnoreSliceElements(func(c metav1.Condition) bool { + return c.Type == "Admitted" + }), + ), + }, + "pod without scheduling gate is terminated if workload is not admitted": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + StatusConditions(corev1.PodCondition{ + Type: "TerminationTarget", + Status: corev1.ConditionTrue, + Reason: "StoppedByKueue", + Message: "Not admitted by cluster queue", + }). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "finalizer is removed for finished pod without matching workload": { + pod: *basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + StatusPhase(corev1.PodSucceeded). + Obj(), + wantPod: basePodWrapper. + Clone(). + Label("kueue.x-k8s.io/managed", "true"). + StatusPhase(corev1.PodSucceeded). + Obj(), + workloads: []kueue.Workload{}, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Labels(map[string]string{"kueue.x-k8s.io/job-uid": "test-uid"}). + Queue("user-queue"). + Priority(0). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + clientBuilder := utiltesting.NewClientBuilder() + if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { + t.Fatalf("Could not setup indexes: %v", err) + } + kcBuilder := clientBuilder.WithObjects(append(tc.initObjects, &tc.pod)...) + + for i := range tc.workloads { + kcBuilder = kcBuilder.WithStatusSubresource(&tc.workloads[i]) + } + + kClient := kcBuilder.Build() + for i := range tc.workloads { + if err := ctrl.SetControllerReference(&tc.pod, &tc.workloads[i], kClient.Scheme()); err != nil { + t.Fatalf("Could not setup owner reference in Workloads: %v", err) + } + if err := kClient.Create(ctx, &tc.workloads[i]); err != nil { + t.Fatalf("Could not create workload: %v", err) + } + } + recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) + reconciler := NewReconciler(kClient, recorder) + + podKey := client.ObjectKeyFromObject(&tc.pod) + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: podKey, + }) + if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Reconcile returned error (-want,+got):\n%s", diff) + } + + var gotPod corev1.Pod + if err := kClient.Get(ctx, podKey, &gotPod); err != nil { + if tc.wantPod != nil || !errors.IsNotFound(err) { + t.Fatalf("Could not get Pod after reconcile: %v", err) + } + } + if tc.wantPod != nil { + if diff := cmp.Diff(*tc.wantPod, gotPod, podCmpOpts...); diff != "" && tc.wantPod != nil { + t.Errorf("Pod after reconcile (-want,+got):\n%s", diff) + } + } + var gotWorkloads kueue.WorkloadList + if err := kClient.List(ctx, &gotWorkloads); err != nil { + t.Fatalf("Could not get Workloads after reconcile: %v", err) + } + if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, tc.workloadCmpOpts...); diff != "" { + t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) + } + }) + } +} + +func TestIsPodOwnerManagedByQueue(t *testing.T) { + testCases := map[string]struct { + ownerReference metav1.OwnerReference + wantRes bool + }{ + "batch/v1/Job": { + ownerReference: metav1.OwnerReference{ + APIVersion: "batch/v1", + Controller: ptr.To(true), + Kind: "Job", + }, + wantRes: true, + }, + "apps/v1/ReplicaSet": { + ownerReference: metav1.OwnerReference{ + APIVersion: "apps/v1", + Controller: ptr.To(true), + Kind: "ReplicaSet", + }, + wantRes: false, + }, + "ray.io/v1alpha1/RayCluster": { + ownerReference: metav1.OwnerReference{ + APIVersion: "ray.io/v1alpha1", + Controller: ptr.To(true), + Kind: "RayCluster", + }, + wantRes: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pod := testingpod.MakePod("pod", "ns"). + UID("test-uid"). + Request(corev1.ResourceCPU, "1"). + Image("", nil). + Obj() + + pod.OwnerReferences = append(pod.OwnerReferences, tc.ownerReference) + + if tc.wantRes != IsPodOwnerManagedByKueue(fromObject(pod)) { + t.Errorf("Unexpected 'IsPodOwnerManagedByKueue' result\n want: %t\n got: %t)", + tc.wantRes, IsPodOwnerManagedByKueue(fromObject(pod))) + } + }) + } +} + +func TestGetWorkloadNameForPod(t *testing.T) { + wantWlName := "pod-unit-test-7bb47" + wlName := GetWorkloadNameForPod("unit-test") + + if wantWlName != wlName { + t.Errorf("Expected different workload name\n want: %s\n got: %s", wantWlName, wlName) + } +} diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go new file mode 100644 index 0000000000..ee3656a02a --- /dev/null +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -0,0 +1,195 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + "sigs.k8s.io/kueue/pkg/controller/jobframework" +) + +const ( + ManagedLabelKey = "kueue.x-k8s.io/managed" + ManagedLabelValue = "true" + PodFinalizer = ManagedLabelKey +) + +var ( + labelsPath = field.NewPath("metadata", "labels") + managedLabelPath = labelsPath.Key(ManagedLabelKey) +) + +type PodWebhook struct { + client client.Client + manageJobsWithoutQueueName bool + namespaceSelector *metav1.LabelSelector + podSelector *metav1.LabelSelector +} + +// SetupWebhook configures the webhook for pods. +func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { + options := jobframework.DefaultOptions + for _, opt := range opts { + opt(&options) + } + wh := &PodWebhook{ + client: mgr.GetClient(), + manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, + namespaceSelector: options.PodNamespaceSelector, + podSelector: options.PodSelector, + } + return ctrl.NewWebhookManagedBy(mgr). + For(&corev1.Pod{}). + WithDefaulter(wh). + WithValidator(wh). + Complete() +} + +// +kubebuilder:webhook:path=/mutate--v1-pod,mutating=true,failurePolicy=fail,sideEffects=None,groups="",resources=pods,verbs=create,versions=v1,name=mpod.kb.io,admissionReviewVersions=v1 +// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch + +var _ webhook.CustomDefaulter = &PodWebhook{} + +func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { + pod := fromObject(obj) + log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(pod)) + log.V(5).Info("Applying defaults") + + if IsPodOwnerManagedByKueue(pod) { + log.V(5).Info("Pod owner is managed by kueue, skipping") + return nil + } + + // Check for pod label selector match + podSelector, err := metav1.LabelSelectorAsSelector(w.podSelector) + if err != nil { + return fmt.Errorf("failed to parse pod selector: %w", err) + } + if !podSelector.Matches(labels.Set(pod.GetLabels())) { + return nil + } + + // Get pod namespace and check for namespace label selector match + ns := corev1.Namespace{} + err = w.client.Get(ctx, client.ObjectKey{Name: pod.GetNamespace()}, &ns) + if err != nil { + return fmt.Errorf("failed to run mutating webhook on pod %s, error while getting namespace: %w", + pod.GetName(), + err, + ) + } + log.V(5).Info("Found pod namespace", "Namespace.Name", ns.GetName()) + nsSelector, err := metav1.LabelSelectorAsSelector(w.namespaceSelector) + if err != nil { + return fmt.Errorf("failed to parse namespace selector: %w", err) + } + if !nsSelector.Matches(labels.Set(ns.GetLabels())) { + return nil + } + + if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName { + controllerutil.AddFinalizer(pod.Object(), PodFinalizer) + + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[ManagedLabelKey] = ManagedLabelValue + + if pod.gateIndex() == gateNotFound { + log.V(5).Info("Adding gate") + pod.Spec.SchedulingGates = append(pod.Spec.SchedulingGates, corev1.PodSchedulingGate{Name: SchedulingGateName}) + } + } + + return nil +} + +// +kubebuilder:webhook:path=/validate--v1-pod,mutating=false,failurePolicy=fail,sideEffects=None,groups="",resources=pods,verbs=create;update,versions=v1,name=vpod.kb.io,admissionReviewVersions=v1 + +var _ webhook.CustomValidator = &PodWebhook{} + +func (w *PodWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + var warnings admission.Warnings + + pod := fromObject(obj) + log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(pod)) + log.V(5).Info("Validating create") + allErrs := jobframework.ValidateCreateForQueueName(pod) + + allErrs = append(allErrs, validateManagedLabel(pod)...) + + if warn := warningForPodManagedLabel(pod); warn != "" { + warnings = append(warnings, warn) + } + + return warnings, allErrs.ToAggregate() +} + +func (w *PodWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + var warnings admission.Warnings + + oldPod := fromObject(oldObj) + newPod := fromObject(newObj) + log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(newPod)) + log.V(5).Info("Validating update") + allErrs := jobframework.ValidateUpdateForQueueName(oldPod, newPod) + + allErrs = append(allErrs, validateManagedLabel(newPod)...) + + if warn := warningForPodManagedLabel(newPod); warn != "" { + warnings = append(warnings, warn) + } + + return warnings, allErrs.ToAggregate() +} + +func (w *PodWebhook) ValidateDelete(context.Context, runtime.Object) (admission.Warnings, error) { + return nil, nil +} + +func validateManagedLabel(pod *Pod) field.ErrorList { + var allErrs field.ErrorList + + if managedLabel, ok := pod.GetLabels()[ManagedLabelKey]; ok && managedLabel != ManagedLabelValue { + return append(allErrs, field.Forbidden(managedLabelPath, fmt.Sprintf("managed label value can only be '%s'", ManagedLabelValue))) + } + + return allErrs +} + +// warningForPodManagedLabel returns a warning message if the pod has a managed label, and it's parent is managed by kueue +func warningForPodManagedLabel(p *Pod) string { + if managedLabel := p.GetLabels()[ManagedLabelKey]; managedLabel == ManagedLabelValue && IsPodOwnerManagedByKueue(p) { + return fmt.Sprintf("pod owner is managed by kueue, label '%s=%s' might lead to unexpected behaviour", + ManagedLabelKey, ManagedLabelValue) + } + + return "" +} diff --git a/pkg/controller/jobs/pod/pod_webhook_test.go b/pkg/controller/jobs/pod/pod_webhook_test.go new file mode 100644 index 0000000000..4cb26d58c9 --- /dev/null +++ b/pkg/controller/jobs/pod/pod_webhook_test.go @@ -0,0 +1,341 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + rayjobapi "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + _ "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs" + _ "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestDefault(t *testing.T) { + defaultNamespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + } + + defaultNamespaceSelector := &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system"}, + }, + }, + } + + testCases := map[string]struct { + initObjects []client.Object + pod *corev1.Pod + manageJobsWithoutQueueName bool + namespaceSelector *metav1.LabelSelector + podSelector *metav1.LabelSelector + want *corev1.Pod + wantError error + }{ + "pod with queue nil ns selector": { + initObjects: []client.Object{defaultNamespace}, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + Obj(), + }, + "pod with queue matching ns selector": { + initObjects: []client.Object{defaultNamespace}, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + Obj(), + namespaceSelector: defaultNamespaceSelector, + podSelector: &metav1.LabelSelector{}, + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + Label("kueue.x-k8s.io/managed", "true"). + KueueSchedulingGate(). + KueueFinalizer(). + Obj(), + }, + "pod without queue matching ns selector manage jobs without queue name": { + initObjects: []client.Object{defaultNamespace}, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Obj(), + manageJobsWithoutQueueName: true, + namespaceSelector: defaultNamespaceSelector, + podSelector: &metav1.LabelSelector{}, + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Label("kueue.x-k8s.io/managed", "true"). + KueueSchedulingGate(). + KueueFinalizer(). + Obj(), + }, + "pod with owner managed by kueue (Job)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + }, + "pod with owner managed by kueue (RayCluster)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + }, + "pod with owner managed by kueue (MPIJob)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-mpi-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v2beta1", Kind: "MPIJob"}, + ). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-mpi-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v2beta1", Kind: "MPIJob"}, + ). + Obj(), + }, + "pod with owner managed by kueue (PyTorchJob)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-pytorch-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PyTorchJob"}, + ). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-pytorch-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PyTorchJob"}, + ). + Obj(), + }, + "pod with owner managed by kueue (TFJob)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-tf-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "TFJob"}, + ). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-tf-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "TFJob"}, + ). + Obj(), + }, + "pod with owner managed by kueue (XGBoostJob)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-xgboost-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "XGBoostJob"}, + ). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-xgboost-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "XGBoostJob"}, + ). + Obj(), + }, + "pod with owner managed by kueue (PaddleJob)": { + initObjects: []client.Object{defaultNamespace}, + podSelector: &metav1.LabelSelector{}, + namespaceSelector: defaultNamespaceSelector, + pod: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-paddle-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PaddleJob"}, + ). + Obj(), + want: testingpod.MakePod("test-pod", defaultNamespace.Name). + Queue("test-queue"). + OwnerReference( + "parent-paddle-job", + schema.GroupVersionKind{Group: "kubeflow.org", Version: "v1", Kind: "PaddleJob"}, + ). + Obj(), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + builder := utiltesting.NewClientBuilder() + builder = builder.WithObjects(tc.initObjects...) + cli := builder.Build() + + w := &PodWebhook{ + client: cli, + manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, + namespaceSelector: tc.namespaceSelector, + podSelector: tc.podSelector, + } + + ctx, _ := utiltesting.ContextWithLog(t) + + if err := w.Default(ctx, tc.pod); err != nil { + t.Errorf("failed to set defaults for v1/pod: %s", err) + } + if diff := cmp.Diff(tc.want, tc.pod); len(diff) != 0 { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + }) + } +} + +func TestValidateCreate(t *testing.T) { + testCases := map[string]struct { + pod *corev1.Pod + wantWarns admission.Warnings + }{ + "pod owner is managed by kueue": { + pod: testingpod.MakePod("test-pod", "test-ns"). + Label("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + wantWarns: admission.Warnings{ + "pod owner is managed by kueue, label 'kueue.x-k8s.io/managed=true' might lead to unexpected behaviour", + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + + w := &PodWebhook{ + client: cli, + } + + ctx, _ := utiltesting.ContextWithLog(t) + + warns, err := w.ValidateCreate(ctx, tc.pod) + if err != nil { + t.Errorf("failed to set validate create for v1/pod: %s", err) + } + if diff := cmp.Diff(warns, tc.wantWarns); diff != "" { + t.Errorf("Expected different list of warnings (-want,+got):\n%s", diff) + } + }) + } +} + +func TestValidateUpdate(t *testing.T) { + testCases := map[string]struct { + oldPod *corev1.Pod + newPod *corev1.Pod + wantWarns admission.Warnings + }{ + "pods owner is managed by kueue, managed label is set for both pods": { + oldPod: testingpod.MakePod("test-pod", "test-ns"). + Label("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + newPod: testingpod.MakePod("test-pod", "test-ns"). + Label("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + wantWarns: admission.Warnings{ + "pod owner is managed by kueue, label 'kueue.x-k8s.io/managed=true' might lead to unexpected behaviour", + }, + }, + "pod owner is managed by kueue, managed label is set for new pod": { + oldPod: testingpod.MakePod("test-pod", "test-ns"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + newPod: testingpod.MakePod("test-pod", "test-ns"). + Label("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + wantWarns: admission.Warnings{ + "pod owner is managed by kueue, label 'kueue.x-k8s.io/managed=true' might lead to unexpected behaviour", + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + builder := utiltesting.NewClientBuilder() + cli := builder.Build() + + w := &PodWebhook{ + client: cli, + } + + ctx, _ := utiltesting.ContextWithLog(t) + + warns, err := w.ValidateUpdate(ctx, tc.oldPod, tc.newPod) + if err != nil { + t.Errorf("failed to set validate create for v1/pod: %s", err) + } + if diff := cmp.Diff(warns, tc.wantWarns); diff != "" { + t.Errorf("Expected different list of warnings (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 9b14f9ccdc..9fa26f50f9 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -258,6 +258,11 @@ func (p *PodSetWrapper) NodeSelector(kv map[string]string) *PodSetWrapper { return p } +func (p *PodSetWrapper) SchedulingGates(sg ...corev1.PodSchedulingGate) *PodSetWrapper { + p.Template.Spec.SchedulingGates = sg + return p +} + // AdmissionWrapper wraps an Admission type AdmissionWrapper struct{ kueue.Admission } diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index 427cea7afd..d6678711df 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -121,7 +121,7 @@ func (j *JobSetWrapper) WorkloadPriorityClass(wpc string) *JobSetWrapper { return j } -// PriorityClass updates JobSet priorityclass. +// JobsStatus updates JobSet status. func (j *JobSetWrapper) JobsStatus(statuses ...jobsetapi.ReplicatedJobStatus) *JobSetWrapper { j.Status.ReplicatedJobsStatus = statuses return j diff --git a/pkg/util/testingjobs/pod/wrappers.go b/pkg/util/testingjobs/pod/wrappers.go new file mode 100644 index 0000000000..ddfa191159 --- /dev/null +++ b/pkg/util/testingjobs/pod/wrappers.go @@ -0,0 +1,168 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + + "sigs.k8s.io/kueue/pkg/controller/constants" +) + +// PodWrapper wraps a Pod. +type PodWrapper struct { + corev1.Pod +} + +// MakePod creates a wrapper for a pod with a single container. +func MakePod(name, ns string) *PodWrapper { + return &PodWrapper{corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: make(map[string]string, 1), + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, + }, + SchedulingGates: make([]corev1.PodSchedulingGate, 0), + }, + }} +} + +// Obj returns the inner Pod. +func (p *PodWrapper) Obj() *corev1.Pod { + return &p.Pod +} + +// Clone returns deep copy of the Pod. +func (p *PodWrapper) Clone() *PodWrapper { + return &PodWrapper{Pod: *p.DeepCopy()} +} + +// Queue updates the queue name of the Pod +func (p *PodWrapper) Queue(queue string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[constants.QueueLabel] = queue + return p +} + +// Label sets the label of the Pod +func (p *PodWrapper) Label(k, v string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[k] = v + return p +} + +func (p *PodWrapper) Annotation(key, content string) *PodWrapper { + p.Annotations[key] = content + return p +} + +// ParentWorkload sets the parent-workload annotation +func (p *PodWrapper) ParentWorkload(parentWorkload string) *PodWrapper { + p.Annotations[constants.ParentWorkloadAnnotation] = parentWorkload + return p +} + +// KueueSchedulingGate adds kueue scheduling gate to the Pod +func (p *PodWrapper) KueueSchedulingGate() *PodWrapper { + if p.Spec.SchedulingGates == nil { + p.Spec.SchedulingGates = make([]corev1.PodSchedulingGate, 0) + } + p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}) + return p +} + +// KueueFinalizer adds kueue finalizer to the Pod +func (p *PodWrapper) KueueFinalizer() *PodWrapper { + if p.ObjectMeta.Finalizers == nil { + p.ObjectMeta.Finalizers = make([]string, 0) + } + p.ObjectMeta.Finalizers = append(p.ObjectMeta.Finalizers, "kueue.x-k8s.io/managed") + return p +} + +// NodeSelector adds a node selector to the Pod. +func (p *PodWrapper) NodeSelector(k, v string) *PodWrapper { + if p.Spec.NodeSelector == nil { + p.Spec.NodeSelector = make(map[string]string, 1) + } + + p.Spec.NodeSelector[k] = v + return p +} + +// Request adds a resource request to the default container. +func (p *PodWrapper) Request(r corev1.ResourceName, v string) *PodWrapper { + p.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) + return p +} + +func (p *PodWrapper) Image(image string, args []string) *PodWrapper { + p.Spec.Containers[0].Image = image + p.Spec.Containers[0].Args = args + return p +} + +// OwnerReference adds a ownerReference to the default container. +func (p *PodWrapper) OwnerReference(ownerName string, ownerGVK schema.GroupVersionKind) *PodWrapper { + p.ObjectMeta.OwnerReferences = append( + p.ObjectMeta.OwnerReferences, + metav1.OwnerReference{ + APIVersion: ownerGVK.GroupVersion().String(), + Kind: ownerGVK.Kind, + Name: ownerName, + UID: types.UID(ownerName), + Controller: ptr.To(true), + }, + ) + + return p +} + +// UID updates the uid of the Pod. +func (p *PodWrapper) UID(uid string) *PodWrapper { + p.ObjectMeta.UID = types.UID(uid) + return p +} + +// StatusConditions updates status conditions of the Pod. +func (p *PodWrapper) StatusConditions(conditions ...corev1.PodCondition) *PodWrapper { + p.Pod.Status.Conditions = conditions + return p +} + +// StatusPhase updates status phase of the Pod. +func (p *PodWrapper) StatusPhase(ph corev1.PodPhase) *PodWrapper { + p.Pod.Status.Phase = ph + return p +} diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go new file mode 100644 index 0000000000..7ae26e5fcd --- /dev/null +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -0,0 +1,460 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "fmt" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + podcontroller "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/util/testing" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/test/integration/framework" + "sigs.k8s.io/kueue/test/util" +) + +const ( + podName = "test-pod" + instanceKey = "cloud.provider.com/instance" +) + +var ( + wlConditionCmpOpts = []cmp.Option{ + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime", "Reason", "Message"), + } +) + +var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.When("manageJobsWithoutQueueName is disabled", func() { + var defaultFlavor = testing.MakeResourceFlavor("default").Label("kubernetes.io/arch", "arm64").Obj() + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(false), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultFlavor, true) + fwk.Teardown() + }) + + var ( + ns *corev1.Namespace + lookupKey types.NamespacedName + wlLookupKey types.NamespacedName + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-namespace-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + wlLookupKey = types.NamespacedName{Name: podcontroller.GetWorkloadNameForPod(podName), Namespace: ns.Name} + lookupKey = types.NamespacedName{Name: podName, Namespace: ns.Name} + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.It("Should reconcile the single pod with the queue name", func() { + pod := testingpod.MakePod(podName, ns.Name).Queue("test-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).To( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod should have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).To( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod should have the label", + ) + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have finalizer") + + ginkgo.By("checking that workload is created for pod with the queue name") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + + gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), + "The Workload should have .spec.queueName set") + + ginkgo.By("checking the pod is unsuspended when workload is assigned") + + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() []corev1.PodSchedulingGate { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return nil + } + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).Should(gomega.BeEmpty()) + gomega.Eventually(func(g gomega.Gomega) bool { + ok, err := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + g.Expect(err).NotTo(gomega.HaveOccurred()) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(createdPod.Spec.NodeSelector).Should(gomega.HaveLen(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + gomega.Expect(createdWorkload.Status.Conditions).To(gomega.BeComparableTo( + []metav1.Condition{ + {Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionTrue}, + {Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue}, + }, + wlConditionCmpOpts..., + )) + + ginkgo.By("checking the workload is finished and the pod finalizer is removed when pod is succeeded") + createdPod.Status.Phase = corev1.PodSucceeded + gomega.Expect(k8sClient.Status().Update(ctx, createdPod)).Should(gomega.Succeed()) + gomega.Eventually(func() []metav1.Condition { + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) + if err != nil { + return nil + } + return createdWorkload.Status.Conditions + }, util.Timeout, util.Interval).Should(gomega.ContainElement( + gomega.BeComparableTo( + metav1.Condition{Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue}, + wlConditionCmpOpts..., + ), + ), "Expected 'Finished' workload condition") + + gomega.Eventually(func(g gomega.Gomega) []string { + g.Expect(k8sClient.Get(ctx, lookupKey, createdPod)).To(gomega.Succeed()) + return createdPod.Finalizers + }, util.Timeout, util.Interval).ShouldNot(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have finalizer set") + }) + + ginkgo.It("Should stop the single pod with the queue name if workload is evicted", func() { + ginkgo.By("Creating a pod with queue name") + pod := testingpod.MakePod(podName, ns.Name).Queue("test-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("checking that workload is created for pod with the queue name") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + + gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set") + + ginkgo.By("checking that pod is unsuspended when workload is admitted") + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() []corev1.PodSchedulingGate { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return createdPod.Spec.SchedulingGates + } + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).ShouldNot( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + ) + gomega.Eventually(func(g gomega.Gomega) bool { + ok, err := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + g.Expect(err).NotTo(gomega.HaveOccurred()) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(len(createdPod.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + gomega.Expect(createdWorkload.Status.Conditions).Should(gomega.BeComparableTo( + []metav1.Condition{ + {Type: kueue.WorkloadQuotaReserved, Status: metav1.ConditionTrue}, + {Type: kueue.WorkloadAdmitted, Status: metav1.ConditionTrue}, + }, + wlConditionCmpOpts..., + )) + + ginkgo.By("checking that pod is stopped when workload is evicted") + + gomega.Expect( + workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, + kueue.WorkloadEvictedByPreemption, "By test", "evict"), + ).Should(gomega.Succeed()) + util.FinishEvictionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func(g gomega.Gomega) bool { + g.Expect(k8sClient.Get(ctx, lookupKey, createdPod)).To(gomega.Succeed()) + return createdPod.DeletionTimestamp.IsZero() + }, util.Timeout, util.Interval).Should(gomega.BeFalse(), "Expected pod to be deleted") + + gomega.Expect(createdPod.Status.Conditions).Should(gomega.ContainElement( + gomega.BeComparableTo( + corev1.PodCondition{ + Type: "TerminationTarget", + Status: "True", + Reason: "StoppedByKueue", + Message: "By test", + }, + cmpopts.IgnoreFields(corev1.PodCondition{}, "LastTransitionTime"), + ), + )) + + }) + + ginkgo.When("Pod owner is managed by Kueue", func() { + var pod *corev1.Pod + ginkgo.BeforeEach(func() { + pod = testingpod.MakePod(podName, ns.Name). + Queue("test-queue"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj() + }) + + ginkgo.It("Should skip the pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).NotTo( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod shouldn't have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).NotTo( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod shouldn't have the label", + ) + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have finalizer") + + ginkgo.By(fmt.Sprintf("checking that workload '%s' is not created", wlLookupKey)) + createdWorkload := &kueue.Workload{} + + gomega.Expect(k8sClient.Get(ctx, wlLookupKey, createdWorkload)).To(testing.BeNotFoundError()) + }) + }) + }) +}) + +var _ = ginkgo.Describe("Pod controller interacting with scheduler", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + var ( + ns *corev1.Namespace + spotUntaintedFlavor *kueue.ResourceFlavor + clusterQueue *kueue.ClusterQueue + localQueue *kueue.LocalQueue + ) + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerAndSchedulerSetup( + jobframework.WithManageJobsWithoutQueueName(false), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-namespace-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + + spotUntaintedFlavor = testing.MakeResourceFlavor("spot-untainted").Label(instanceKey, "spot-untainted").Obj() + gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).Should(gomega.Succeed()) + + clusterQueue = testing.MakeClusterQueue("dev-clusterqueue"). + ResourceGroup( + *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(), + ).Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).Should(gomega.Succeed()) + }) + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotUntaintedFlavor, true) + }) + + ginkgo.It("Should schedule pods as they fit in their ClusterQueue", func() { + ginkgo.By("creating localQueue") + localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + + ginkgo.By("checking if dev pod starts") + pod := testingpod.MakePod("dev-pod", ns.Name).Queue(localQueue.Name). + Request(corev1.ResourceCPU, "2"). + Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + createdPod := &corev1.Pod{} + gomega.Eventually(func(g gomega.Gomega) []corev1.PodSchedulingGate { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, createdPod)). + To(gomega.Succeed()) + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).Should(gomega.BeEmpty()) + gomega.Expect(createdPod.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name)) + util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(clusterQueue, 1) + }) + + ginkgo.When("The workload's admission is removed", func() { + ginkgo.It("Should not restore the original node selectors", func() { + localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + pod := testingpod.MakePod("dev-pod", ns.Name).Queue(localQueue.Name). + Request(corev1.ResourceCPU, "2"). + Obj() + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + createdPod := &corev1.Pod{} + + ginkgo.By("creating a pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + }) + + ginkgo.By("checking if pod is suspended", func() { + gomega.Eventually(func(g gomega.Gomega) []corev1.PodSchedulingGate { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, createdPod)). + To(gomega.Succeed()) + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).Should( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + ) + }) + + // backup the node selector + originalNodeSelector := createdPod.Spec.NodeSelector + + ginkgo.By("creating a localQueue", func() { + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + }) + + ginkgo.By("checking if pod is unsuspended", func() { + gomega.Eventually(func() []corev1.PodSchedulingGate { + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdPod)).Should(gomega.Succeed()) + return createdPod.Spec.SchedulingGates + }, util.Timeout, util.Interval).Should(gomega.BeEmpty()) + }) + + ginkgo.By("checking if the node selector is updated", func() { + gomega.Eventually(func() map[string]string { + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdPod)).Should(gomega.Succeed()) + return createdPod.Spec.NodeSelector + }, util.Timeout, util.Interval).ShouldNot(gomega.Equal(originalNodeSelector)) + }) + updatedNodeSelector := createdPod.Spec.NodeSelector + + ginkgo.By("deleting the localQueue to prevent readmission", func() { + gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed()) + }) + + ginkgo.By("clearing the workload's admission to stop the job", func() { + wl := &kueue.Workload{} + wlKey := types.NamespacedName{Name: podcontroller.GetWorkloadNameForPod(pod.Name), Namespace: pod.Namespace} + gomega.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed()) + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, wl, nil)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, wl) + }) + + ginkgo.By("checking if the node selectors are not restored", func() { + gomega.Eventually(func() map[string]string { + gomega.Expect(k8sClient.Get(ctx, lookupKey, createdPod)).Should(gomega.Succeed()) + return createdPod.Spec.NodeSelector + }, util.Timeout, util.Interval).Should(gomega.Equal(updatedNodeSelector)) + }) + }) + }) +}) diff --git a/test/integration/controller/jobs/pod/pod_webhook_test.go b/test/integration/controller/jobs/pod/pod_webhook_test.go new file mode 100644 index 0000000000..bb907ef455 --- /dev/null +++ b/test/integration/controller/jobs/pod/pod_webhook_test.go @@ -0,0 +1,275 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" + + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/kubeversion" + testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/test/integration/framework" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("Pod Webhook", func() { + var ns *corev1.Namespace + + ginkgo.When("with manageJobsWithoutQueueName disabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + serverVersionFetcher = kubeversion.NewServerVersionFetcher(discoveryClient) + err = serverVersionFetcher.FetchServerVersion() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(false), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.When("The queue-name label is set", func() { + var ( + pod *corev1.Pod + lookupKey types.NamespacedName + ) + + ginkgo.BeforeEach(func() { + pod = testingpod.MakePod("pod-with-queue-name", ns.Name).Queue("user-queue").Obj() + lookupKey = types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + }) + + ginkgo.It("Should inject scheduling gate, 'managed' label and finalizer into created pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).To( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod should have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).To( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod should have the label", + ) + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have finalizer set") + }) + + ginkgo.It("Should skip a Pod created in the forbidden 'kube-system' namespace", func() { + pod.Namespace = "kube-system" + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: "kube-system"} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).NotTo( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod shouldn't have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).NotTo( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod shouldn't have the label", + ) + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have finalizer set") + }) + }) + + ginkgo.When("The queue-name label is not set", func() { + var ( + pod *corev1.Pod + lookupKey types.NamespacedName + ) + + ginkgo.BeforeEach(func() { + pod = testingpod.MakePod("pod-with-queue-name", ns.Name).Obj() + lookupKey = types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + }) + + ginkgo.It("Should not inject scheduling gate, 'managed' label and finalizer into created pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).NotTo( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod shouldn't have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).NotTo( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod shouldn't have the label", + ) + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have finalizer set") + }) + }) + }) + + ginkgo.When("with manageJobsWithoutQueueName enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + serverVersionFetcher = kubeversion.NewServerVersionFetcher(discoveryClient) + err = serverVersionFetcher.FetchServerVersion() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(true), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.When("The queue-name label is not set", func() { + var pod *corev1.Pod + + ginkgo.BeforeEach(func() { + pod = testingpod.MakePod("pod-integration", ns.Name).Obj() + }) + + ginkgo.It("Should inject scheduling gate, 'managed' label and finalizer into created pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).To( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod should have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).To( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod should have the label", + ) + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have finalizer set") + }) + + ginkgo.It("Should skip a Pod created in the forbidden 'kube-system' namespace", func() { + pod.Namespace = "kube-system" + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: "kube-system"} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdPod.Spec.SchedulingGates).NotTo( + gomega.ContainElement(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}), + "Pod shouldn't have scheduling gate", + ) + + gomega.Expect(createdPod.Labels).NotTo( + gomega.HaveKeyWithValue("kueue.x-k8s.io/managed", "true"), + "Pod shouldn't have the label", + ) + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have finalizer set") + }) + }) + }) +}) diff --git a/test/integration/controller/jobs/pod/suite_test.go b/test/integration/controller/jobs/pod/suite_test.go new file mode 100644 index 0000000000..d4d1f0c82f --- /dev/null +++ b/test/integration/controller/jobs/pod/suite_test.go @@ -0,0 +1,128 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pod + +import ( + "context" + "path/filepath" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/core/indexer" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" + "sigs.k8s.io/kueue/pkg/util/kubeversion" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + //+kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + serverVersionFetcher *kubeversion.ServerVersionFetcher + ctx context.Context + fwk *framework.Framework + crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") + webhookPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "webhook") +) + +func TestAPIs(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "Pod Controller Suite", + ) +} + +func managerSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + err := pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + podReconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = podReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Job reconciler is enabled for "pod parent managed by queue" tests + jobReconciler := job.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = jobReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + err = pod.SetupWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + } +} + +func managerAndSchedulerSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + err := indexer.Setup(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + reconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = reconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + err = pod.SetupWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } +}