From 31fd8e15e882e42b06ce634fda5c205d9cfe8e79 Mon Sep 17 00:00:00 2001 From: Aleksei Chernevskii Date: Fri, 18 Aug 2023 15:01:20 -0500 Subject: [PATCH] Improve support for pods * Add namespace/pod label filtering for Default pod webhook. Add PodIntegrationOptions configuration field containing namespace and pod label selectors. * Simplify RunWithPodSetsInfo method for the pod controller. * Create a new interface JobWithFinalize. Jobs implementing it supports custom finalization logic. * Add k8s version check. If the pod integration is enabled on k8s server versions < 1.27, Kueue pod will stop with an error message. * Add JobWithSkip interface. Jobs that implement this interface can introduce custom reconciliation skip logic. * Add IsPodOwnerManagedByQueue function. Defaulting webhook will skip a pod if it's owner is managed by Kueue. Reconciler will skip such a pod even if 'managed' label is set. * Add integration tests for the pod controller and webhook. --- apis/config/v1beta1/configuration_types.go | 10 + apis/config/v1beta1/defaults.go | 22 + apis/config/v1beta1/defaults_test.go | 38 +- apis/config/v1beta1/zz_generated.deepcopy.go | 30 ++ cmd/kueue/main.go | 32 +- cmd/kueue/main_test.go | 17 +- .../manager/controller_manager_config.yaml | 6 + pkg/config/config.go | 6 + pkg/config/config_test.go | 50 +- pkg/config/validation.go | 62 +++ pkg/config/validation_test.go | 102 ++++ pkg/controller/jobframework/interface.go | 14 +- pkg/controller/jobframework/reconciler.go | 40 +- .../jobframework/reconciler_test.go | 2 +- pkg/controller/jobs/job/job_controller.go | 19 +- .../jobs/job/job_controller_test.go | 2 +- pkg/controller/jobs/pod/pod_controller.go | 97 ++-- .../jobs/pod/pod_controller_test.go | 488 ++++++++++++++++++ pkg/controller/jobs/pod/pod_webhook.go | 76 ++- pkg/controller/jobs/pod/pod_webhook_test.go | 171 ++++++ pkg/util/testing/wrappers.go | 5 + pkg/util/testingjobs/jobset/wrappers.go | 2 +- pkg/util/testingjobs/pod/wrappers.go | 162 ++++++ .../jobs/pod/pod_controller_test.go | 401 ++++++++++++++ .../controller/jobs/pod/pod_webhook_test.go | 205 ++++++++ .../controller/jobs/pod/suite_test.go | 95 ++++ 26 files changed, 2077 insertions(+), 77 deletions(-) create mode 100644 pkg/config/validation.go create mode 100644 pkg/config/validation_test.go create mode 100644 pkg/controller/jobs/pod/pod_controller_test.go create mode 100644 pkg/controller/jobs/pod/pod_webhook_test.go create mode 100644 pkg/util/testingjobs/pod/wrappers.go create mode 100644 test/integration/controller/jobs/pod/pod_controller_test.go create mode 100644 test/integration/controller/jobs/pod/pod_webhook_test.go create mode 100644 test/integration/controller/jobs/pod/suite_test.go diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 65b2cccba9..7c22a14df3 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -224,5 +224,15 @@ type Integrations struct { // - "jobset.x-k8s.io/jobset" // - "kubeflow.org/pytorchjob" // - "kubeflow.org/tfjob" + // - "pod" Frameworks []string `json:"frameworks,omitempty"` + // PodOptions defines kueue controller behaviour for pod objects + PodOptions *PodIntegrationOptions `json:"podOptions,omitempty"` +} + +type PodIntegrationOptions struct { + // NamespaceSelector can be used to omit some namespaces from pod reconciliation + NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"` + // PodSelector can be used to choose what pods to reconcile + PodSelector *metav1.LabelSelector `json:"podSelector,omitempty"` } diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index 02336f2dcd..f8d9e38cba 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -116,4 +116,26 @@ func SetDefaults_Configuration(cfg *Configuration) { if cfg.Integrations.Frameworks == nil { cfg.Integrations.Frameworks = []string{job.FrameworkName} } + + if cfg.Integrations.PodOptions == nil { + cfg.Integrations.PodOptions = &PodIntegrationOptions{} + } + + if cfg.Integrations.PodOptions.NamespaceSelector == nil { + matchExpressionsValues := []string{"kube-system", *cfg.Namespace} + + cfg.Integrations.PodOptions.NamespaceSelector = &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: matchExpressionsValues, + }, + }, + } + } + + if cfg.Integrations.PodOptions.PodSelector == nil { + cfg.Integrations.PodOptions.PodSelector = &metav1.LabelSelector{} + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 0c79f73dce..85dcb31e16 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -54,7 +54,36 @@ func TestSetDefaults_Configuration(t *testing.T) { } defaultIntegrations := &Integrations{ Frameworks: []string{job.FrameworkName}, + PodOptions: &PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, + } + + overwriteNamespaceIntegrations := &Integrations{ + Frameworks: []string{job.FrameworkName}, + PodOptions: &PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", overwriteNamespace}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, } + podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute} @@ -206,7 +235,7 @@ func TestSetDefaults_Configuration(t *testing.T) { WebhookSecretName: ptr.To(DefaultWebhookSecretName), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, }, }, "should not default InternalCertManagement": { @@ -223,7 +252,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, }, }, "should not default values in custom ClientConnection": { @@ -247,7 +276,7 @@ func TestSetDefaults_Configuration(t *testing.T) { QPS: ptr.To[float32](123.0), Burst: ptr.To[int32](456), }, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, }, }, "should default empty custom ClientConnection": { @@ -265,7 +294,7 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: overwriteNamespaceIntegrations, }, }, "defaulting waitForPodsReady.timeout": { @@ -359,6 +388,7 @@ func TestSetDefaults_Configuration(t *testing.T) { ClientConnection: defaultClientConnection, Integrations: &Integrations{ Frameworks: []string{"a", "b"}, + PodOptions: defaultIntegrations.PodOptions, }, }, }, diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 12bc37a5a6..a0167a5707 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -216,6 +216,11 @@ func (in *Integrations) DeepCopyInto(out *Integrations) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.PodOptions != nil { + in, out := &in.PodOptions, &out.PodOptions + *out = new(PodIntegrationOptions) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Integrations. @@ -258,6 +263,31 @@ func (in *InternalCertManagement) DeepCopy() *InternalCertManagement { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodIntegrationOptions) DeepCopyInto(out *PodIntegrationOptions) { + *out = *in + if in.NamespaceSelector != nil { + in, out := &in.NamespaceSelector, &out.NamespaceSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } + if in.PodSelector != nil { + in, out := &in.PodSelector, &out.PodSelector + *out = new(v1.LabelSelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodIntegrationOptions. +func (in *PodIntegrationOptions) DeepCopy() *PodIntegrationOptions { + if in == nil { + return nil + } + out := new(PodIntegrationOptions) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WaitForPodsReady) DeepCopyInto(out *WaitForPodsReady) { *out = *in diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 5ef0d3ad9e..68b1cf99d5 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -23,10 +23,6 @@ import ( "fmt" "os" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) - // to ensure that exec-entrypoint and run can make use of them. - _ "k8s.io/client-go/plugin/pkg/client/auth" - zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" schedulingv1 "k8s.io/api/scheduling/v1" @@ -37,6 +33,9 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) + // to ensure that exec-entrypoint and run can make use of them. + _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -67,8 +66,9 @@ import ( ) var ( - scheme = runtime.NewScheme() - setupLog = ctrl.Log.WithName("setup") + scheme = runtime.NewScheme() + setupLog = ctrl.Log.WithName("setup") + podIntegrationErr = errors.New("pod integration only supported in Kubernetes 1.27 or newer") ) func init() { @@ -246,6 +246,26 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag log.Error(err, "Unable to create controller") return err } + if name == "pod" { + err := serverVersionFetcher.FetchServerVersion() + if err != nil { + setupLog.Error(err, "failed to fetch kubernetes server version") + os.Exit(1) + } + v := serverVersionFetcher.GetServerVersion() + if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) { + setupLog.Error(podIntegrationErr, + "Failed to configure reconcilers", + "kubernetesVersion", v) + os.Exit(1) + } + + opts = append( + opts, + jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector), + jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector), + ) + } if err = cb.SetupWebhook(mgr, opts...); err != nil { log.Error(err, "Unable to create webhook") return err diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 380aefed7a..3c9309024a 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "fmt" "os" "path/filepath" "testing" @@ -24,11 +25,9 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/utils/ptr" config "sigs.k8s.io/kueue/apis/config/v1beta1" - "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/controller/jobs/job" ) @@ -99,13 +98,25 @@ integrations: // referencing job.FrameworkName ensures the link of job package // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, + PodOptions: &config.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, }, }, }, { name: "bad integrations config", configFile: badIntegrationsConfig, - wantError: field.NotSupported(field.NewPath("integrations", "frameworks"), "unregistered/jobframework", jobframework.GetIntegrationsList()), + wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"pod\", \"ray.io/rayjob\""), }, } diff --git a/config/components/manager/controller_manager_config.yaml b/config/components/manager/controller_manager_config.yaml index 778bae6eb0..d671a5d7ea 100644 --- a/config/components/manager/controller_manager_config.yaml +++ b/config/components/manager/controller_manager_config.yaml @@ -37,3 +37,9 @@ integrations: - "kubeflow.org/pytorchjob" - "kubeflow.org/tfjob" # - "pod" + podOptions: + namespaceSelector: + matchExpressions: + - key: kubernetes.io/metadata.name + operator: NotIn + values: [ kube-system, kueue-system ] diff --git a/pkg/config/config.go b/pkg/config/config.go index c8d9f7b51e..9943732fd6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -162,6 +162,12 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co return options, cfg, err } } + + errs := ValidateConfiguration(cfg) + if ea := errs.ToAggregate(); ea != nil { + return options, cfg, ea + } + addTo(&options, &cfg) return options, cfg, err } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index f9daeb57f8..a837d3c27d 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -260,6 +260,18 @@ integrations: defaultIntegrations := &configapi.Integrations{ Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, } testcases := []struct { @@ -311,7 +323,21 @@ integrations: ManageJobsWithoutQueueName: false, InternalCertManagement: enableDefaultInternalCertManagement, ClientConnection: defaultClientConnection, - Integrations: defaultIntegrations, + Integrations: &configapi.Integrations{ + Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-tenant-a"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, + }, }, wantOptions: defaultControlOptions, }, @@ -514,6 +540,18 @@ integrations: // referencing job.FrameworkName ensures the link of job package // therefore the batch/framework should be registered Frameworks: []string{job.FrameworkName}, + PodOptions: &configapi.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + PodSelector: &metav1.LabelSelector{}, + }, }, }, wantOptions: ctrl.Options{ @@ -609,6 +647,16 @@ func TestEncode(t *testing.T) { "manageJobsWithoutQueueName": false, "integrations": map[string]any{ "frameworks": []any{"batch/job"}, + "podOptions": map[string]any{ + "namespaceSelector": map[string]any{ + "matchExpressions": []any{map[string]any{ + "key": "kubernetes.io/metadata.name", + "operator": "NotIn", + "values": []any{"kube-system", "kueue-system"}, + }}, + }, + "podSelector": map[string]any{}, + }, }, }, }, diff --git a/pkg/config/validation.go b/pkg/config/validation.go new file mode 100644 index 0000000000..ff1ae81b4d --- /dev/null +++ b/pkg/config/validation.go @@ -0,0 +1,62 @@ +package config + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/validation/field" + "sigs.k8s.io/kueue/apis/config/v1beta1" +) + +const ( + errMsgPodOptionsIsNil = "value of Config.Integrations.PodOptions is nil" + errMsgNamespaceSelectorIsNil = "value of Config.Integrations.PodOptions.PodNamespaceSelector is nil" + errMsgProhibitedNamespace = "namespaces with this label cannot be used for pod integration" +) + +var ( + podOptionsPath = field.NewPath("integrations", "podOptions") + namespaceSelectorPath = podOptionsPath.Child("namespaceSelector") +) + +func ValidateConfiguration(c v1beta1.Configuration) field.ErrorList { + var allErrs field.ErrorList + + // Validate PodNamespaceSelector for the pod framework + allErrs = append(allErrs, validateNamespaceSelector(c)...) + + return allErrs +} + +func validateNamespaceSelector(c v1beta1.Configuration) field.ErrorList { + var allErrs field.ErrorList + + if c.Integrations.PodOptions == nil { + return field.ErrorList{field.Required(podOptionsPath, errMsgPodOptionsIsNil)} + } + if c.Integrations.PodOptions.NamespaceSelector == nil { + return field.ErrorList{field.Required(namespaceSelectorPath, errMsgNamespaceSelectorIsNil)} + } + + prohibitedNamespaces := []labels.Set{{corev1.LabelMetadataName: "kube-system"}} + + if c.Namespace != nil && *c.Namespace != "" { + prohibitedNamespaces = append(prohibitedNamespaces, labels.Set{corev1.LabelMetadataName: *c.Namespace}) + } + + allErrs = append(allErrs, validation.ValidateLabelSelector(c.Integrations.PodOptions.NamespaceSelector, validation.LabelSelectorValidationOptions{}, namespaceSelectorPath)...) + + selector, err := metav1.LabelSelectorAsSelector(c.Integrations.PodOptions.NamespaceSelector) + if err != nil { + return allErrs + } + + for _, pn := range prohibitedNamespaces { + if selector.Matches(pn) { + allErrs = append(allErrs, field.Invalid(namespaceSelectorPath, pn, errMsgProhibitedNamespace)) + } + } + + return allErrs +} diff --git a/pkg/config/validation_test.go b/pkg/config/validation_test.go new file mode 100644 index 0000000000..19ff7191c9 --- /dev/null +++ b/pkg/config/validation_test.go @@ -0,0 +1,102 @@ +package config + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/utils/ptr" + "sigs.k8s.io/kueue/apis/config/v1beta1" +) + +func TestValidateNamespaceSelector(t *testing.T) { + testCases := map[string]struct { + podIntegrationOptions *v1beta1.PodIntegrationOptions + wantError field.ErrorList + }{ + "nil PodIntegrationOptions": { + podIntegrationOptions: nil, + wantError: field.ErrorList{ + field.Required(podOptionsPath, errMsgPodOptionsIsNil), + }, + }, + + "nil PodIntegrationOptions.PodNamespaceSelector": { + podIntegrationOptions: &v1beta1.PodIntegrationOptions{ + NamespaceSelector: nil, + }, + wantError: field.ErrorList{ + field.Required(namespaceSelectorPath, errMsgNamespaceSelectorIsNil), + }, + }, + "emptyLabelSelector": { + podIntegrationOptions: &v1beta1.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{}, + }, + wantError: field.ErrorList{ + field.Invalid(namespaceSelectorPath, labels.Set{corev1.LabelMetadataName: "kube-system"}, errMsgProhibitedNamespace), + field.Invalid(namespaceSelectorPath, labels.Set{corev1.LabelMetadataName: "kueue-system"}, errMsgProhibitedNamespace), + }, + }, + "prohibited namespace in MatchLabels": { + podIntegrationOptions: &v1beta1.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "kubernetes.io/metadata.name": "kube-system", + }, + }, + }, + wantError: field.ErrorList{ + field.Invalid(namespaceSelectorPath, labels.Set{corev1.LabelMetadataName: "kube-system"}, errMsgProhibitedNamespace), + }, + }, + "prohibited namespace in MatchExpressions with operator In": { + podIntegrationOptions: &v1beta1.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"kube-system"}, + }, + }, + }, + }, + wantError: field.ErrorList{ + field.Invalid(namespaceSelectorPath, labels.Set{corev1.LabelMetadataName: "kube-system"}, errMsgProhibitedNamespace), + }, + }, + "prohibited namespace in MatchExpressions with operator NotIn": { + podIntegrationOptions: &v1beta1.PodIntegrationOptions{ + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }, + }, + wantError: nil, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + cfg := v1beta1.Configuration{ + Namespace: ptr.To("kueue-system"), + Integrations: &v1beta1.Integrations{ + PodOptions: tc.podIntegrationOptions, + }, + } + gotError := validateNamespaceSelector(cfg) + if diff := cmp.Diff(tc.wantError, gotError); diff != "" { + t.Errorf("unexpected non-nil error (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index feeb0c2509..de934423cd 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -63,7 +63,19 @@ type JobWithCustomStop interface { // Stop implements a custom stop procedure. // The function should be idempotent: not do any API calls if the job is already stopped. // Returns whether the Job stopped with this call or an error - Stop(ctx context.Context, c client.Client, podSetsInfo []PodSetInfo) (bool, error) + Stop(ctx context.Context, c client.Client, podSetsInfo []PodSetInfo, eventMsg string) (bool, error) +} + +// JobWithFinalize interface should be implemented by generic jobs, +// when custom finalization logic is needed for a job, after it's finished. +type JobWithFinalize interface { + Finalize(ctx context.Context, c client.Client) error +} + +// JobWithSkip interface should be implemented by generic jobs, +// when reconciliation should be skipped depending on the job's state +type JobWithSkip interface { + Skip() (bool, error) } type JobWithPriorityClass interface { diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index aba810936d..0f6f2f8685 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -69,6 +69,8 @@ type Options struct { ManageJobsWithoutQueueName bool WaitForPodsReady bool KubeServerVersion *kubeversion.ServerVersionFetcher + PodNamespaceSelector *metav1.LabelSelector + PodSelector *metav1.LabelSelector } // Option configures the reconciler. @@ -97,6 +99,22 @@ func WithKubeServerVersion(v *kubeversion.ServerVersionFetcher) Option { } } +// WithPodNamespaceSelector adds rules to reconcile pods only in particular +// namespaces. +func WithPodNamespaceSelector(s *metav1.LabelSelector) Option { + return func(o *Options) { + o.PodNamespaceSelector = s + } +} + +// WithPodSelector adds rules to reconcile pods only with particular +// labels. +func WithPodSelector(s *metav1.LabelSelector) Option { + return func(o *Options) { + o.PodSelector = s + } +} + var DefaultOptions = Options{} func NewReconciler( @@ -123,6 +141,12 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques err := r.client.Get(ctx, req.NamespacedName, object) + if jws, implements := job.(JobWithSkip); implements { + if skip, skipErr := jws.Skip(); skip || skipErr != nil { + return ctrl.Result{}, nil + } + } + if apierrors.IsNotFound(err) || !object.GetDeletionTimestamp().IsZero() { workloads := kueue.WorkloadList{} if err := r.client.List(ctx, &workloads, client.InNamespace(req.Namespace), @@ -221,6 +245,10 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques if err != nil { log.Error(err, "Updating workload status") } + // Execute job finalization logic + if err := r.finalizeJob(ctx, job); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, nil } @@ -500,7 +528,7 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie info := getPodSetsInfoFromWorkload(wl) if jws, implements := job.(JobWithCustomStop); implements { - stoppedNow, err := jws.Stop(ctx, r.client, info) + stoppedNow, err := jws.Stop(ctx, r.client, info, eventMsg) if stoppedNow { r.record.Eventf(object, corev1.EventTypeNormal, "Stopped", eventMsg) } @@ -523,6 +551,16 @@ func (r *JobReconciler) stopJob(ctx context.Context, job GenericJob, object clie return nil } +func (r *JobReconciler) finalizeJob(ctx context.Context, job GenericJob) error { + if jwf, implements := job.(JobWithFinalize); implements { + if err := jwf.Finalize(ctx, r.client); err != nil { + return err + } + } + + return nil +} + func (r *JobReconciler) removeFinalizer(ctx context.Context, wl *kueue.Workload) error { if controllerutil.RemoveFinalizer(wl, kueue.ResourceInUseFinalizerName) { return r.client.Update(ctx, wl) diff --git a/pkg/controller/jobframework/reconciler_test.go b/pkg/controller/jobframework/reconciler_test.go index bcdf9ab455..a188b7edcc 100644 --- a/pkg/controller/jobframework/reconciler_test.go +++ b/pkg/controller/jobframework/reconciler_test.go @@ -57,7 +57,7 @@ func TestIsParentJobManaged(t *testing.T) { UID(parentJobName). Obj(), job: testingjob.MakeJob(childJobName, jobNamespace). - OwnerReference(parentJobName, batchv1.SchemeGroupVersion.WithKind("Job")). + OwnerReference(parentJobName, batchv1.SchemeGroupVersion.WithKind("CronJob")). Obj(), wantErr: ErrUnknownWorkloadOwner, }, diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index 6eb327d2db..3916f6f620 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strconv" + "strings" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -57,10 +58,11 @@ const ( func init() { utilruntime.Must(jobframework.RegisterIntegration(FrameworkName, jobframework.IntegrationCallbacks{ - SetupIndexes: SetupIndexes, - NewReconciler: NewReconciler, - SetupWebhook: SetupWebhook, - JobType: &batchv1.Job{}, + SetupIndexes: SetupIndexes, + NewReconciler: NewReconciler, + SetupWebhook: SetupWebhook, + JobType: &batchv1.Job{}, + IsManagingObjectsOwner: isJob, })) } @@ -79,7 +81,12 @@ var NewReconciler = jobframework.NewGenericReconciler( return &Job{} }, func(c client.Client) handler.EventHandler { return &parentWorkloadHandler{client: c} - }) + }, +) + +func isJob(owner *metav1.OwnerReference) bool { + return owner.Kind == "Job" && strings.HasPrefix(owner.APIVersion, "batch/v1") +} type parentWorkloadHandler struct { client client.Client @@ -151,7 +158,7 @@ func (j *Job) Suspend() { j.Spec.Suspend = ptr.To(true) } -func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []jobframework.PodSetInfo) (bool, error) { +func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []jobframework.PodSetInfo, eventMsg string) (bool, error) { stoppedNow := false if !j.IsSuspended() { j.Suspend() diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index 912932292a..7bae237df2 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -521,7 +521,7 @@ func TestReconciler(t *testing.T) { "should get error if workload owner is unknown": { job: *utiltestingjob.MakeJob("job", "ns"). ParentWorkload("non-existing-parent-workload"). - OwnerReference("parent", batchv1.SchemeGroupVersion.WithKind("Job")). + OwnerReference("parent", batchv1.SchemeGroupVersion.WithKind("CronJob")). Obj(), wantJob: *utiltestingjob.MakeJob("job", "ns").Obj(), wantErr: jobframework.ErrUnknownWorkloadOwner, diff --git a/pkg/controller/jobs/pod/pod_controller.go b/pkg/controller/jobs/pod/pod_controller.go index f1ed4fd6f8..31bb9c96b1 100644 --- a/pkg/controller/jobs/pod/pod_controller.go +++ b/pkg/controller/jobs/pod/pod_controller.go @@ -28,7 +28,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "sigs.k8s.io/controller-runtime/pkg/client" - + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/jobframework" @@ -36,10 +36,10 @@ import ( ) const ( - SchedulingGateName = "kueue.x-k8s.io/admission" - FrameworkName = "pod" - - gateNotFound = -1 + SchedulingGateName = "kueue.x-k8s.io/admission" + FrameworkName = "pod" + gateNotFound = -1 + ConditionTypeTerminationTarget = "TerminationTarget" ) var ( @@ -73,10 +73,11 @@ type Pod corev1.Pod var _ jobframework.GenericJob = (*Pod)(nil) var _ jobframework.JobWithCustomStop = (*Pod)(nil) +var _ jobframework.JobWithFinalize = (*Pod)(nil) // Object returns the job instance. -func (j *Pod) Object() client.Object { - return (*corev1.Pod)(j) +func (p *Pod) Object() client.Object { + return (*corev1.Pod)(p) } func (p *Pod) gateIndex() int { @@ -96,10 +97,7 @@ func (p *Pod) IsSuspended() bool { // Suspend will suspend the job. func (p *Pod) Suspend() { - // TODO: maybe change the framework so this can provide feedback, - // the pod can only be "suspended" by the mutation hook if it's not - // done the only way to potentialy stop its execution is the eviction - // which will also terminate the pod. + // Not implemented because this is not called when JobWithCustomStop is implemented. } // RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it. @@ -112,37 +110,15 @@ func (p *Pod) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error { p.Spec.SchedulingGates = append(p.Spec.SchedulingGates[:idx], p.Spec.SchedulingGates[idx+1:]...) } - // TODO: manage the node selector - // NOTE: it's only possible to add and only if k8s > 1.27 is used, case in which, since if the provided - // selectors are changing a existing key will fail we should be able to "refuse" the assignment + p.Spec.NodeSelector = maps.MergeKeepFirst(podSetsInfo[0].NodeSelector, p.Spec.NodeSelector) - // if k8s < 1.27 TODO: wait for Version check patch - info := podSetsInfo[0] - if false { - if len(info.NodeSelector) > 0 { - return fmt.Errorf("%w: node selectors cannot be changed in k8s < 1.27", jobframework.ErrInvalidPodsetInfo) - } - } else { - ns := p.Spec.NodeSelector - if len(ns) > 0 { - overrideNS := make([]string, 0, len(ns)) - for k, val := range ns { - if newVal, found := info.NodeSelector[k]; found && newVal != val { - overrideNS = append(overrideNS, k) - } - } - if len(overrideNS) > 0 { - return fmt.Errorf("%w: node selectors %s cannot be changed", jobframework.ErrInvalidPodsetInfo, strings.Join(overrideNS, ",")) - } - } - p.Spec.NodeSelector = maps.MergeKeepFirst(podSetsInfo[0].NodeSelector, p.Spec.NodeSelector) - } return nil } // RestorePodSetsInfo will restore the original node affinity and podSet counts of the job. func (p *Pod) RestorePodSetsInfo(nodeSelectors []jobframework.PodSetInfo) bool { + // Not implemented since Pods cannot be updated, they can only be terminated. return false } @@ -198,18 +174,13 @@ func (p *Pod) PodsReady() bool { return false } -// GetGVK returns GVK (Group Version Kind) for the job. -func (p *Pod) GetGVK() schema.GroupVersionKind { +// GVK returns GVK (Group Version Kind) for the job. +func (p *Pod) GVK() schema.GroupVersionKind { return gvk } -func (p *Pod) Stop(ctx context.Context, c client.Client, _ []jobframework.PodSetInfo) (bool, error) { +func (p *Pod) Stop(ctx context.Context, c client.Client, _ []jobframework.PodSetInfo, eventMsg string) (bool, error) { // The podset info is not relevant here, since this should mark the pod's end of life - - // The only alternative to pod deletion looks to be the usage of Eviction API which can - // take into account PodDisruptionBudget and the end result will be the same (the pod gets deleted) - // For now just deleting the pod make better sense in a kueue context. - pCopy := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: p.UID, @@ -220,13 +191,13 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []jobframework.PodSet Status: corev1.PodStatus{ Conditions: []corev1.PodCondition{ { - Type: corev1.DisruptionTarget, + Type: ConditionTypeTerminationTarget, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Time{ Time: time.Time{}, }, Reason: "StoppedByKueue", - Message: "stopped by kueue", + Message: eventMsg, }, }, }, @@ -244,3 +215,39 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []jobframework.PodSet func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error { return jobframework.SetupWorkloadOwnerIndex(ctx, indexer, gvk) } + +func (p *Pod) Finalize(ctx context.Context, c client.Client) error { + pod := (*corev1.Pod)(p) + + if controllerutil.RemoveFinalizer(pod, PodFinalizer) { + if err := c.Update(ctx, pod); err != nil { + return err + } + } + + return nil +} + +func (p *Pod) Skip() (bool, error) { + // Skip pod reconciliation, if managed label is not set + if v, ok := p.GetLabels()[ManagedLabelKey]; !ok || v != ManagedLabelValue { + return true, nil + } + + if IsPodOwnerManagedByQueue(p) { + return true, nil + } + + return false, nil +} + +func IsPodOwnerManagedByQueue(p *Pod) bool { + if owner := metav1.GetControllerOf(p); owner != nil { + return jobframework.IsOwnerManagedByKueue(owner) || (owner.Kind == "RayCluster" && strings.HasPrefix(owner.APIVersion, "ray.io/v1alpha1")) + } + return false +} + +func GetWorkloadNameForPod(podName string) string { + return jobframework.GetWorkloadNameForOwnerWithGVK(podName, gvk) +} diff --git a/pkg/controller/jobs/pod/pod_controller_test.go b/pkg/controller/jobs/pod/pod_controller_test.go new file mode 100644 index 0000000000..7cfc14b9d9 --- /dev/null +++ b/pkg/controller/jobs/pod/pod_controller_test.go @@ -0,0 +1,488 @@ +package pod + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + rayjobapi "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + _ "sigs.k8s.io/kueue/pkg/controller/jobs/job" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestPodsReady(t *testing.T) { + testCases := map[string]struct { + pod *corev1.Pod + want bool + }{ + "pod is ready": { + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + StatusConditions( + corev1.PodCondition{ + Type: "Ready", + Status: "True", + }, + ). + Obj(), + want: true, + }, + "pod is not ready": { + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + StatusConditions(). + Obj(), + want: false, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pod := (*Pod)(tc.pod) + got := pod.PodsReady() + if tc.want != got { + t.Errorf("Unexpected response (want: %v, got: %v)", tc.want, got) + } + }) + } +} + +func TestRunWithPodSetsInfo(t *testing.T) { + testCases := map[string]struct { + pod *Pod + runInfo, restoreInfo []jobframework.PodSetInfo + wantPod *corev1.Pod + wantErr error + }{ + "pod set info > 1": { + pod: (*Pod)(testingutil.MakePod("test-pod", "test-namespace").Obj()), + runInfo: make([]jobframework.PodSetInfo, 2), + wantErr: fmt.Errorf("invalid podset infos: expecting 1 got 2"), + }, + "pod with scheduling gate and empty node selector": { + pod: (*Pod)(testingutil.MakePod("test-pod", "test-namespace"). + KueueSchedulingGate(). + Obj()), + runInfo: []jobframework.PodSetInfo{ + { + NodeSelector: map[string]string{ + "test-key": "test-val", + }, + }, + }, + wantPod: testingutil.MakePod("test-pod", "test-namespace"). + NodeSelector("test-key", "test-val"). + Obj(), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + gotErr := tc.pod.RunWithPodSetsInfo(tc.runInfo) + + if tc.wantErr != nil { + if diff := cmp.Diff(tc.wantErr.Error(), gotErr.Error()); diff != "" { + t.Errorf("error mismatch mismatch (-want +got):\n%s", diff) + } + } else { + if gotErr != nil { + t.Fatalf("unexpected error: %s", gotErr) + } + if diff := cmp.Diff(tc.wantPod.Spec, tc.pod.Spec); diff != "" { + t.Errorf("pod spec mismatch (-want +got):\n%s", diff) + } + } + }) + } +} + +var ( + podCmpOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.IgnoreFields(corev1.Pod{}, "TypeMeta", "ObjectMeta.ResourceVersion", + "ObjectMeta.DeletionTimestamp"), + } + defaultWorkloadCmpOpts = []cmp.Option{ + cmpopts.EquateEmpty(), + cmpopts.SortSlices(func(a, b kueue.Workload) bool { + return a.Name < b.Name + }), + cmpopts.SortSlices(func(a, b metav1.Condition) bool { + return a.Type < b.Type + }), + cmpopts.IgnoreFields( + kueue.Workload{}, "TypeMeta", "ObjectMeta.OwnerReferences", + "ObjectMeta.Name", "ObjectMeta.ResourceVersion", + ), + cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + } +) + +func TestReconciler(t *testing.T) { + basePodWrapper := testingutil.MakePod("pod", "ns"). + UID("test-uid"). + Queue("user-queue"). + Request(corev1.ResourceCPU, "1"). + Image("", nil) + + testCases := map[string]struct { + initObjects []client.Object + pod corev1.Pod + wantPod corev1.Pod + workloads []kueue.Workload + wantWorkloads []kueue.Workload + wantErr error + workloadCmpOpts []cmp.Option + }{ + "scheduling gate is removed and node selector is added if workload is admitted": { + initObjects: []client.Object{ + utiltesting.MakeResourceFlavor("unit-test-flavor").Label("kubernetes.io/arch", "arm64").Obj(), + }, + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + NodeSelector("kubernetes.io/arch", "arm64"). + KueueFinalizer(). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + Obj(), + ). + Admitted(true). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota( + utiltesting.MakeAdmission("cq"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(1). + Obj(), + ). + Admitted(true). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "non-matching admitted workload is deleted": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + StatusConditions(corev1.PodCondition{ + Type: "TerminationTarget", + Status: "True", + Reason: "StoppedByKueue", + Message: "No matching Workload", + }). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 2).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Obj(), + }, + wantErr: jobframework.ErrNoMatchingWorkloads, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the workload is created when queue name is set": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + *utiltesting.MakePodSet(kueue.DefaultPodSetName, 1). + Request(corev1.ResourceCPU, "1"). + SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}). + Obj(), + ). + Queue("test-queue"). + Priority(0). + Labels(map[string]string{ + controllerconsts.JobUIDLabel: "test-uid", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the pod reconciliation is skipped when 'kueue.x-k8s.io/managed' label is not set": { + pod: *basePodWrapper. + Clone(). + Obj(), + wantPod: *basePodWrapper. + Clone(). + Obj(), + wantWorkloads: []kueue.Workload{}, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the pod reconciliation is skipped when it's owner is managed by kueue (Job)": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + wantWorkloads: []kueue.Workload{}, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "the pod reconciliation is skipped when it's owner is managed by kueue (RayCluster)": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + wantWorkloads: []kueue.Workload{}, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "pod is stopped when workload is evicted": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + KueueSchedulingGate(). + Queue("test-queue"). + StatusConditions(corev1.PodCondition{ + Type: "TerminationTarget", + Status: "True", + Reason: "StoppedByKueue", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Queue("test-queue"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Preempted", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("job", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + Queue("test-queue"). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Preempted", + Message: "Preempted to accommodate a higher priority Workload", + }). + Obj(), + }, + workloadCmpOpts: defaultWorkloadCmpOpts, + }, + "pod is finalized when it's succeeded": { + pod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueFinalizer(). + StatusPhase(corev1.PodSucceeded). + Obj(), + wantPod: *basePodWrapper. + Clone(). + SetLabel("kueue.x-k8s.io/managed", "true"). + StatusPhase(corev1.PodSucceeded). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("unit-test", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()). + Admitted(true). + Condition(metav1.Condition{ + Type: "Finished", + Status: "True", + Reason: "JobFinished", + Message: "Job finished successfully", + }). + Obj(), + }, + workloadCmpOpts: append( + defaultWorkloadCmpOpts, + // This is required because SSA doesn't work properly for the fake k8s client. + // Reconciler appends "Finished" condition using SSA. Fake client will just + // replace all the older conditions. + // See: https://github.com/kubernetes-sigs/controller-runtime/issues/2341 + cmpopts.IgnoreSliceElements(func(c metav1.Condition) bool { + return c.Type == "Admitted" + }), + ), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + clientBuilder := utiltesting.NewClientBuilder() + if err := SetupIndexes(ctx, utiltesting.AsIndexer(clientBuilder)); err != nil { + t.Fatalf("Could not setup indexes: %v", err) + } + kcBuilder := clientBuilder.WithObjects(append(tc.initObjects, &tc.pod)...) + + for i := range tc.workloads { + kcBuilder = kcBuilder.WithStatusSubresource(&tc.workloads[i]) + } + + kClient := kcBuilder.Build() + for i := range tc.workloads { + if err := ctrl.SetControllerReference(&tc.pod, &tc.workloads[i], kClient.Scheme()); err != nil { + t.Fatalf("Could not setup owner reference in Workloads: %v", err) + } + if err := kClient.Create(ctx, &tc.workloads[i]); err != nil { + t.Fatalf("Could not create workload: %v", err) + } + } + recorder := record.NewBroadcaster().NewRecorder(kClient.Scheme(), corev1.EventSource{Component: "test"}) + reconciler := NewReconciler(kClient, recorder) + + podKey := client.ObjectKeyFromObject(&tc.pod) + _, err := reconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: podKey, + }) + if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" { + t.Errorf("Reconcile returned error (-want,+got):\n%s", diff) + } + + var gotPod corev1.Pod + if err := kClient.Get(ctx, podKey, &gotPod); err != nil { + t.Fatalf("Could not get Pod after reconcile: %v", err) + } + if diff := cmp.Diff(tc.wantPod, gotPod, podCmpOpts...); diff != "" { + t.Errorf("Pod after reconcile (-want,+got):\n%s", diff) + } + var gotWorkloads kueue.WorkloadList + if err := kClient.List(ctx, &gotWorkloads); err != nil { + t.Fatalf("Could not get Workloads after reconcile: %v", err) + } + if diff := cmp.Diff(tc.wantWorkloads, gotWorkloads.Items, tc.workloadCmpOpts...); diff != "" { + t.Errorf("Workloads after reconcile (-want,+got):\n%s", diff) + } + }) + } +} + +func TestIsPodOwnerManagedByQueue(t *testing.T) { + testCases := map[string]struct { + ownerReference metav1.OwnerReference + wantRes bool + }{ + "batch/v1/Job": { + ownerReference: metav1.OwnerReference{ + APIVersion: "batch/v1", + Controller: ptr.To(true), + Kind: "Job", + }, + wantRes: true, + }, + "apps/v1/ReplicaSet": { + ownerReference: metav1.OwnerReference{ + APIVersion: "apps/v1", + Controller: ptr.To(true), + Kind: "ReplicaSet", + }, + wantRes: false, + }, + "ray.io/v1alpha1/RayCluster": { + ownerReference: metav1.OwnerReference{ + APIVersion: "ray.io/v1alpha1", + Controller: ptr.To(true), + Kind: "RayCluster", + }, + wantRes: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pod := testingutil.MakePod("pod", "ns"). + UID("test-uid"). + Request(corev1.ResourceCPU, "1"). + Image("", nil). + Obj() + + pod.OwnerReferences = append(pod.OwnerReferences, tc.ownerReference) + + if diff := cmp.Diff(tc.wantRes, IsPodOwnerManagedByQueue((*Pod)(pod))); diff != "" { + t.Errorf("Unexpected 'IsPodOwnerManagedByQueue' result (-want,+got):\n%s", diff) + } + }) + } +} + +func TestGetWorkloadNameForPod(t *testing.T) { + wlName := GetWorkloadNameForPod("unit-test") + + if diff := cmp.Diff("pod-unit-test-7bb47", wlName); diff != "" { + t.Errorf("Expected different workload name (-want,+got):\n%s", diff) + } +} diff --git a/pkg/controller/jobs/pod/pod_webhook.go b/pkg/controller/jobs/pod/pod_webhook.go index b8d87f20be..1d1b437695 100644 --- a/pkg/controller/jobs/pod/pod_webhook.go +++ b/pkg/controller/jobs/pod/pod_webhook.go @@ -18,22 +18,46 @@ package pod import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" - "sigs.k8s.io/kueue/pkg/controller/jobframework" ) -type PodWebhook struct{} +const ( + ManagedLabelKey = "kueue.x-k8s.io/managed" + ManagedLabelValue = "true" + PodFinalizer = ManagedLabelKey +) + +type PodWebhook struct { + client client.Client + manageJobsWithoutQueueName bool + namespaceSelector *metav1.LabelSelector + podSelector *metav1.LabelSelector +} // SetupWebhook configures the webhook for pods. -func SetupWebhook(mgr ctrl.Manager, _ ...jobframework.Option) error { - wh := &PodWebhook{} +func SetupWebhook(mgr ctrl.Manager, opts ...jobframework.Option) error { + options := jobframework.DefaultOptions + for _, opt := range opts { + opt(&options) + } + wh := &PodWebhook{ + client: mgr.GetClient(), + manageJobsWithoutQueueName: options.ManageJobsWithoutQueueName, + namespaceSelector: options.PodNamespaceSelector, + podSelector: options.PodSelector, + } return ctrl.NewWebhookManagedBy(mgr). For(&corev1.Pod{}). WithDefaulter(wh). @@ -46,8 +70,7 @@ func fromObject(o runtime.Object) *Pod { } // +kubebuilder:webhook:path=/mutate--v1-pod,mutating=true,failurePolicy=ignore,sideEffects=None,groups="",resources=pods,verbs=create,versions=v1,name=mpod.kb.io,admissionReviewVersions=v1 - -//TODO: check if we can use namespace/object selectors to skip this webhook +// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch var _ webhook.CustomDefaulter = &PodWebhook{} @@ -56,7 +79,46 @@ func (w *PodWebhook) Default(ctx context.Context, obj runtime.Object) error { log := ctrl.LoggerFrom(ctx).WithName("pod-webhook").WithValues("pod", klog.KObj(pod)) log.V(5).Info("Applying defaults") - if jobframework.QueueName(pod) != "" { + if IsPodOwnerManagedByQueue(pod) { + log.V(5).Info("Pod owner is managed by queue, skipping") + return nil + } + + // Check for pod label selector match + podSelector, err := metav1.LabelSelectorAsSelector(w.podSelector) + if err != nil { + return fmt.Errorf("failed to parse pod selector: %w", err) + } + if !podSelector.Matches(labels.Set(pod.GetLabels())) { + return nil + } + + // Get pod namespace and check for namespace label selector match + ns := corev1.Namespace{} + err = w.client.Get(ctx, client.ObjectKey{Name: pod.GetNamespace()}, &ns) + if err != nil { + return fmt.Errorf("failed to run mutating webhook on pod %s, error while getting namespace: %w", + pod.GetName(), + err, + ) + } + log.V(5).Info("Found pod namespace", "Namespace.Name", ns.GetName()) + nsSelector, err := metav1.LabelSelectorAsSelector(w.namespaceSelector) + if err != nil { + return fmt.Errorf("failed to parse namespace selector: %w", err) + } + if !nsSelector.Matches(labels.Set(ns.GetLabels())) { + return nil + } + + if jobframework.QueueName(pod) != "" || w.manageJobsWithoutQueueName { + controllerutil.AddFinalizer(obj.(*corev1.Pod), PodFinalizer) + + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[ManagedLabelKey] = ManagedLabelValue + if pod.gateIndex() == gateNotFound { log.V(5).Info("Adding gate") pod.Spec.SchedulingGates = append(pod.Spec.SchedulingGates, corev1.PodSchedulingGate{Name: SchedulingGateName}) diff --git a/pkg/controller/jobs/pod/pod_webhook_test.go b/pkg/controller/jobs/pod/pod_webhook_test.go new file mode 100644 index 0000000000..46abdd9d8a --- /dev/null +++ b/pkg/controller/jobs/pod/pod_webhook_test.go @@ -0,0 +1,171 @@ +package pod + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + rayjobapi "github.com/ray-project/kuberay/ray-operator/apis/ray/v1alpha1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" +) + +func TestDefault(t *testing.T) { + testCases := map[string]struct { + initObjects []client.Object + pod *corev1.Pod + manageJobsWithoutQueueName bool + namespaceSelector *metav1.LabelSelector + podSelector *metav1.LabelSelector + want *corev1.Pod + wantError error + }{ + "pod with queue nil ns selector": { + initObjects: []client.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + }, + }, + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + Obj(), + want: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + Obj(), + }, + "pod with queue matching ns selector": { + initObjects: []client.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + }, + }, + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + Obj(), + namespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system"}, + }, + }, + }, + podSelector: &metav1.LabelSelector{}, + want: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueSchedulingGate(). + KueueFinalizer(). + Obj(), + }, + "pod without queue matching ns selector manage jobs without queue name": { + initObjects: []client.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + }, + }, + pod: testingutil.MakePod("test-pod", "test-ns"). + Obj(), + manageJobsWithoutQueueName: true, + namespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system"}, + }, + }, + }, + podSelector: &metav1.LabelSelector{}, + want: testingutil.MakePod("test-pod", "test-ns"). + SetLabel("kueue.x-k8s.io/managed", "true"). + KueueSchedulingGate(). + KueueFinalizer(). + Obj(), + }, + "pod with owner managed by kueue (Job)": { + initObjects: []client.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + }, + }, + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + want: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj(), + }, + "pod with owner managed by kueue (RayCluster)": { + initObjects: []client.Object{ + &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-ns", + Labels: map[string]string{ + "kubernetes.io/metadata.name": "test-ns", + }, + }, + }, + }, + pod: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + want: testingutil.MakePod("test-pod", "test-ns"). + Queue("test-queue"). + OwnerReference("parent-ray-cluster", rayjobapi.GroupVersion.WithKind("RayCluster")). + Obj(), + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + builder := utiltesting.NewClientBuilder(kubeflow.AddToScheme) + builder = builder.WithObjects(tc.initObjects...) + cli := builder.Build() + + w := &PodWebhook{ + client: cli, + manageJobsWithoutQueueName: tc.manageJobsWithoutQueueName, + namespaceSelector: tc.namespaceSelector, + podSelector: tc.podSelector, + } + + ctx, _ := utiltesting.ContextWithLog(t) + + if err := w.Default(ctx, tc.pod); err != nil { + t.Errorf("failed to set defaults to a v1/pod: %s", err) + } + if diff := cmp.Diff(tc.want, tc.pod); len(diff) != 0 { + t.Errorf("Default() mismatch (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 3db9367a91..6105f03396 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -248,6 +248,11 @@ func (p *PodSetWrapper) NodeSelector(kv map[string]string) *PodSetWrapper { return p } +func (p *PodSetWrapper) SchedulingGates(sg ...corev1.PodSchedulingGate) *PodSetWrapper { + p.Template.Spec.SchedulingGates = sg + return p +} + // AdmissionWrapper wraps an Admission type AdmissionWrapper struct{ kueue.Admission } diff --git a/pkg/util/testingjobs/jobset/wrappers.go b/pkg/util/testingjobs/jobset/wrappers.go index f698c712be..55c7dbd5f6 100644 --- a/pkg/util/testingjobs/jobset/wrappers.go +++ b/pkg/util/testingjobs/jobset/wrappers.go @@ -112,7 +112,7 @@ func (j *JobSetWrapper) PriorityClass(pc string) *JobSetWrapper { return j } -// PriorityClass updates JobSet priorityclass. +// JobsStatus updates JobSet status. func (j *JobSetWrapper) JobsStatus(statuses ...jobsetapi.ReplicatedJobStatus) *JobSetWrapper { j.Status.ReplicatedJobsStatus = statuses return j diff --git a/pkg/util/testingjobs/pod/wrappers.go b/pkg/util/testingjobs/pod/wrappers.go new file mode 100644 index 0000000000..006f1c3cfb --- /dev/null +++ b/pkg/util/testingjobs/pod/wrappers.go @@ -0,0 +1,162 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/kueue/pkg/controller/constants" +) + +// PodWrapper wraps a Pod. +type PodWrapper struct { + corev1.Pod +} + +// MakePod creates a wrapper for a pod with a single container. +func MakePod(name, ns string) *PodWrapper { + return &PodWrapper{corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: ns, + Annotations: make(map[string]string, 1), + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "c", + Image: "pause", + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + }, + }, + SchedulingGates: make([]corev1.PodSchedulingGate, 0), + NodeSelector: map[string]string{}, + }, + }} +} + +// Obj returns the inner Pod. +func (p *PodWrapper) Obj() *corev1.Pod { + return &p.Pod +} + +// Clone returns deep copy of the Pod. +func (p *PodWrapper) Clone() *PodWrapper { + return &PodWrapper{Pod: *p.DeepCopy()} +} + +// Queue updates the queue name of the Pod +func (p *PodWrapper) Queue(queue string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[constants.QueueLabel] = queue + return p +} + +// SetLabel sets the label of the Pod +func (p *PodWrapper) SetLabel(k, v string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[k] = v + return p +} + +func (p *PodWrapper) SetAnnotation(key, content string) *PodWrapper { + p.Annotations[key] = content + return p +} + +// ParentWorkload sets the parent-workload annotation +func (p *PodWrapper) ParentWorkload(parentWorkload string) *PodWrapper { + p.Annotations[constants.ParentWorkloadAnnotation] = parentWorkload + return p +} + +// KueueSchedulingGate adds kueue scheduling gate to the Pod +func (p *PodWrapper) KueueSchedulingGate() *PodWrapper { + if p.Spec.SchedulingGates == nil { + p.Spec.SchedulingGates = make([]corev1.PodSchedulingGate, 0) + } + p.Spec.SchedulingGates = append(p.Spec.SchedulingGates, corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}) + return p +} + +// KueueFinalizer adds kueue finalizer to the Pod +func (p *PodWrapper) KueueFinalizer() *PodWrapper { + if p.ObjectMeta.Finalizers == nil { + p.ObjectMeta.Finalizers = make([]string, 0) + } + p.ObjectMeta.Finalizers = append(p.ObjectMeta.Finalizers, "kueue.x-k8s.io/managed") + return p +} + +// NodeSelector adds a node selector to the Pod. +func (p *PodWrapper) NodeSelector(k, v string) *PodWrapper { + p.Spec.NodeSelector[k] = v + return p +} + +// Request adds a resource request to the default container. +func (p *PodWrapper) Request(r corev1.ResourceName, v string) *PodWrapper { + p.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) + return p +} + +func (p *PodWrapper) Image(image string, args []string) *PodWrapper { + p.Spec.Containers[0].Image = image + p.Spec.Containers[0].Args = args + return p +} + +// OwnerReference adds a ownerReference to the default container. +func (p *PodWrapper) OwnerReference(ownerName string, ownerGVK schema.GroupVersionKind) *PodWrapper { + p.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: ownerGVK.GroupVersion().String(), + Kind: ownerGVK.Kind, + Name: ownerName, + UID: types.UID(ownerName), + Controller: ptr.To(true), + }, + } + return p +} + +// UID updates the uid of the Pod. +func (p *PodWrapper) UID(uid string) *PodWrapper { + p.ObjectMeta.UID = types.UID(uid) + return p +} + +// StatusConditions updates status conditions of the Pod. +func (p *PodWrapper) StatusConditions(conditions ...corev1.PodCondition) *PodWrapper { + p.Pod.Status.Conditions = conditions + return p +} + +// StatusPhase updates status phase of the Pod. +func (p *PodWrapper) StatusPhase(ph corev1.PodPhase) *PodWrapper { + p.Pod.Status.Phase = ph + return p +} diff --git a/test/integration/controller/jobs/pod/pod_controller_test.go b/test/integration/controller/jobs/pod/pod_controller_test.go new file mode 100644 index 0000000000..5c32a318b0 --- /dev/null +++ b/test/integration/controller/jobs/pod/pod_controller_test.go @@ -0,0 +1,401 @@ +package pod + +import ( + "fmt" + "slices" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/util/testing" + podtesting "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/pkg/workload" + "sigs.k8s.io/kueue/test/integration/framework" + "sigs.k8s.io/kueue/test/util" +) + +const ( + podName = "test-pod" +) + +var _ = ginkgo.Describe("Pod controller", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.When("manageJobsWithoutQueueName is disabled", func() { + var defaultFlavor = testing.MakeResourceFlavor("default").Label("kubernetes.io/arch", "arm64").Obj() + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(false), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultFlavor, true) + fwk.Teardown() + }) + + var ( + ns *corev1.Namespace + wlLookupKey types.NamespacedName + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-namespace-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + wlLookupKey = types.NamespacedName{Name: pod.GetWorkloadNameForPod(podName), Namespace: ns.Name} + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.It("Should reconcile the single pod with the queue name", func() { + pod := podtesting.MakePod(podName, ns.Name).Queue("test-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: podName, Namespace: ns.Name} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have 'kueue.x-k8s.io/managed' finalizer set") + + ginkgo.By("checking that workload is created for pod with the queue name") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + + gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), + "The Workload should have .spec.queueName set") + + ginkgo.By("checking the pod is unsuspended when workload is assigned") + + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return false + } + return !hasKueueSchedulingGate(createdPod) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(createdPod.Spec.NodeSelector).Should(gomega.HaveLen(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + gomega.Consistently(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return len(createdWorkload.Status.Conditions) == 2 + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is finished and the pod finalizer is removed when pod is succeeded") + createdPod.Status.Phase = corev1.PodSucceeded + gomega.Expect(k8sClient.Status().Update(ctx, createdPod)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) + if err != nil || len(createdWorkload.Status.Conditions) == 1 { + return false + } + + return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + gomega.Eventually(func() bool { + err := k8sClient.Get(ctx, lookupKey, createdPod) + return err == nil && !slices.Contains(createdPod.Finalizers, "kueue.x-k8s.io/managed") + }, util.Timeout, util.Interval).Should(gomega.BeTrue(), + "Expected 'kueue.x-k8s.io/managed' finalizer to be removed") + }) + + ginkgo.It("Should stop the single pod with the queue name if workload is evicted", func() { + ginkgo.By("Creating a pod with queue name") + pod := podtesting.MakePod(podName, ns.Name).Queue("test-queue").Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: podName, Namespace: ns.Name} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have 'kueue.x-k8s.io/managed' finalizer set") + + ginkgo.By("checking that workload is created for pod with the queue name") + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + + gomega.Expect(createdWorkload.Spec.QueueName).To(gomega.Equal("test-queue"), "The Workload should have .spec.queueName set") + + ginkgo.By("checking that pod is unsuspended when workload is assigned") + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return false + } + return !hasKueueSchedulingGate(createdPod) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(len(createdPod.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + gomega.Consistently(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return len(createdWorkload.Status.Conditions) == 2 + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking that pod is stopped when workload is evicted") + + gomega.Expect( + workload.UpdateStatus(ctx, k8sClient, createdWorkload, kueue.WorkloadEvicted, metav1.ConditionTrue, + kueue.WorkloadEvictedByPreemption, "By test", "evict"), + ).Should(gomega.Succeed()) + util.FinishEvictionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return false + } + return !createdPod.DeletionTimestamp.IsZero() + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + gomega.Expect(createdPod.Status.Conditions).Should(gomega.ContainElement(corev1.PodCondition{ + Type: "TerminationTarget", + Status: "True", + Reason: "StoppedByKueue", + Message: "By test", + })) + }) + }) + + ginkgo.When("manageJobsWithoutQueueName is enabled", func() { + defaultFlavor := testing.MakeResourceFlavor("default").Label("kubernetes.io/arch", "arm64").Obj() + + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(true), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + gomega.Expect(k8sClient.Create(ctx, defaultFlavor)).Should(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, defaultFlavor, true) + fwk.Teardown() + }) + + var ( + ns *corev1.Namespace + wlLookupKey types.NamespacedName + ) + + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + wlLookupKey = types.NamespacedName{Name: pod.GetWorkloadNameForPod(podName), Namespace: ns.Name} + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + + ginkgo.It("Should reconcile the single pod without the queue name", func() { + pod := podtesting.MakePod(podName, ns.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: podName, Namespace: ns.Name} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have 'kueue.x-k8s.io/managed' finalizer set") + + ginkgo.By(fmt.Sprintf("checking that workload '%s' is created", wlLookupKey)) + createdWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(createdWorkload.Spec.PodSets).To(gomega.HaveLen(1)) + + ginkgo.By("checking the pod is unsuspended when workload is assigned") + clusterQueue := testing.MakeClusterQueue("cluster-queue"). + ResourceGroup( + *testing.MakeFlavorQuotas("default").Resource(corev1.ResourceCPU, "1").Obj(), + ).Obj() + admission := testing.MakeAdmission(clusterQueue.Name). + Assignment(corev1.ResourceCPU, "default", "1"). + AssignmentPodCount(createdWorkload.Spec.PodSets[0].Count). + Obj() + gomega.Expect(util.SetQuotaReservation(ctx, k8sClient, createdWorkload, admission)).Should(gomega.Succeed()) + util.SyncAdmittedConditionForWorkloads(ctx, k8sClient, createdWorkload) + + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, lookupKey, createdPod); err != nil { + return false + } + return !hasKueueSchedulingGate(createdPod) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Eventually(func() bool { + ok, _ := testing.CheckLatestEvent(ctx, k8sClient, "Started", corev1.EventTypeNormal, fmt.Sprintf("Admitted by clusterQueue %v", clusterQueue.Name)) + return ok + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + gomega.Expect(len(createdPod.Spec.NodeSelector)).Should(gomega.Equal(1)) + gomega.Expect(createdPod.Spec.NodeSelector["kubernetes.io/arch"]).Should(gomega.Equal("arm64")) + gomega.Consistently(func() bool { + if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil { + return false + } + return len(createdWorkload.Status.Conditions) == 2 + }, util.ConsistentDuration, util.Interval).Should(gomega.BeTrue()) + + ginkgo.By("checking the workload is finished and the pod finalizer is removed when pod is succeeded") + createdPod.Status.Phase = corev1.PodSucceeded + gomega.Expect(k8sClient.Status().Update(ctx, createdPod)).Should(gomega.Succeed()) + gomega.Eventually(func() bool { + err := k8sClient.Get(ctx, wlLookupKey, createdWorkload) + if err != nil || len(createdWorkload.Status.Conditions) == 1 { + return false + } + + return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadFinished) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + + gomega.Eventually(func() bool { + err := k8sClient.Get(ctx, lookupKey, createdPod) + return err == nil && !slices.Contains(createdPod.Finalizers, "kueue.x-k8s.io/managed") + }, util.Timeout, util.Interval).Should(gomega.BeTrue(), + "Expected 'kueue.x-k8s.io/managed' finalizer to be removed") + }) + + ginkgo.When("Pod owner is managed by Kueue", func() { + var pod *corev1.Pod + ginkgo.BeforeEach(func() { + pod = podtesting.MakePod(podName, ns.Name). + OwnerReference("parent-job", batchv1.SchemeGroupVersion.WithKind("Job")). + Obj() + }) + + ginkgo.It("Should skip the pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + lookupKey := types.NamespacedName{Name: podName, Namespace: ns.Name} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have 'kueue.x-k8s.io/managed' finalizer set") + + ginkgo.By(fmt.Sprintf("checking that workload '%s' is not created", wlLookupKey)) + createdWorkload := &kueue.Workload{} + gomega.Consistently(func() error { + return k8sClient.Get(ctx, wlLookupKey, createdWorkload) + }, util.Timeout, util.Interval).Should(testing.BeNotFoundError()) + }) + }) + + }) +}) diff --git a/test/integration/controller/jobs/pod/pod_webhook_test.go b/test/integration/controller/jobs/pod/pod_webhook_test.go new file mode 100644 index 0000000000..952bc586b4 --- /dev/null +++ b/test/integration/controller/jobs/pod/pod_webhook_test.go @@ -0,0 +1,205 @@ +package pod + +import ( + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/util/kubeversion" + testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/pod" + "sigs.k8s.io/kueue/test/integration/framework" + "sigs.k8s.io/kueue/test/util" +) + +var _ = ginkgo.Describe("Pod Webhook", func() { + var ns *corev1.Namespace + + ginkgo.When("with manageJobsWithoutQueueName disabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + serverVersionFetcher = kubeversion.NewServerVersionFetcher(discoveryClient) + err = serverVersionFetcher.FetchServerVersion() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(false), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.When("The queue-name label is set", func() { + var pod *corev1.Pod + + ginkgo.BeforeEach(func() { + pod = testingutil.MakePod("pod-with-queue-name", ns.Name).Queue("user-queue").Obj() + }) + + ginkgo.It("Should inject scheduling gate, 'managed' label and finalizer into created pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have 'kueue.x-k8s.io/managed' finalizer set") + }) + + ginkgo.It("Should skip a Pod created in the forbidden 'kube-system' namespace", func() { + pod.Namespace = "kube-system" + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: "kube-system"} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have 'kueue.x-k8s.io/managed' finalizer set") + }) + }) + }) + + ginkgo.When("with manageJobsWithoutQueueName enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { + ginkgo.BeforeAll(func() { + fwk = &framework.Framework{ + CRDPath: crdPath, + WebhookPath: webhookPath, + } + cfg = fwk.Init() + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + serverVersionFetcher = kubeversion.NewServerVersionFetcher(discoveryClient) + err = serverVersionFetcher.FetchServerVersion() + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ctx, k8sClient = fwk.RunManager(cfg, managerSetup( + jobframework.WithManageJobsWithoutQueueName(true), + jobframework.WithKubeServerVersion(serverVersionFetcher), + jobframework.WithPodSelector(&metav1.LabelSelector{}), + jobframework.WithPodNamespaceSelector(&metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{"kube-system", "kueue-system"}, + }, + }, + }), + )) + }) + ginkgo.BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-", + }, + } + gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + }) + ginkgo.AfterAll(func() { + fwk.Teardown() + }) + + ginkgo.When("The queue-name label is not set", func() { + var pod *corev1.Pod + + ginkgo.BeforeEach(func() { + pod = testingutil.MakePod("pod-integration", ns.Name).Obj() + }) + + ginkgo.It("Should inject scheduling gate, 'managed' label and finalizer into created pod", func() { + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeTrue(), + "Pod should have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).To(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod should have 'kueue.x-k8s.io/managed' finalizer set") + }) + + ginkgo.It("Should skip a Pod created in the forbidden 'kube-system' namespace", func() { + pod.Namespace = "kube-system" + gomega.Expect(k8sClient.Create(ctx, pod)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: pod.Name, Namespace: "kube-system"} + createdPod := &corev1.Pod{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, createdPod) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Expect(hasKueueSchedulingGate(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/admission' scheduling gate set") + + gomega.Expect(hasManagedLabel(createdPod)).To(gomega.BeFalse(), + "Pod shouldn't have 'kueue.x-k8s.io/managed=true' label set") + + gomega.Expect(createdPod.Finalizers).NotTo(gomega.ContainElement("kueue.x-k8s.io/managed"), + "Pod shouldn't have 'kueue.x-k8s.io/managed' finalizer set") + }) + }) + }) +}) diff --git a/test/integration/controller/jobs/pod/suite_test.go b/test/integration/controller/jobs/pod/suite_test.go new file mode 100644 index 0000000000..ac97c0306c --- /dev/null +++ b/test/integration/controller/jobs/pod/suite_test.go @@ -0,0 +1,95 @@ +package pod + +import ( + "context" + "path/filepath" + "slices" + "testing" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" + config "sigs.k8s.io/kueue/apis/config/v1beta1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/controller/jobframework" + "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/jobs/pod" + "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/util/kubeversion" + "sigs.k8s.io/kueue/pkg/webhooks" + "sigs.k8s.io/kueue/test/integration/framework" + //+kubebuilder:scaffold:imports +) + +var ( + cfg *rest.Config + k8sClient client.Client + serverVersionFetcher *kubeversion.ServerVersionFetcher + ctx context.Context + fwk *framework.Framework + crdPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "crd", "bases") + webhookPath = filepath.Join("..", "..", "..", "..", "..", "config", "components", "webhook") +) + +func TestAPIs(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + + ginkgo.RunSpecs(t, + "Pod Controller Suite", + ) +} + +func managerSetup(opts ...jobframework.Option) framework.ManagerSetup { + return func(mgr manager.Manager, ctx context.Context) { + err := pod.SetupIndexes(ctx, mgr.GetFieldIndexer()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + podReconciler := pod.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = podReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // Job reconciler is enabled for "pod parent managed by queue" tests + jobReconciler := job.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(constants.JobControllerName), + opts...) + err = jobReconciler.SetupWithManager(mgr) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + cCache := cache.New(mgr.GetClient()) + queues := queue.NewManager(mgr.GetClient(), cCache) + + failedCtrl, err := core.SetupControllers(mgr, queues, cCache, &config.Configuration{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + err = pod.SetupWebhook(mgr, opts...) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) + } +} + +// hasKueueSchedulingGate returns true, if pod contains 'kueue.x-k8s.io/admission' +// scheduling gate +func hasKueueSchedulingGate(p *corev1.Pod) bool { + return slices.Contains( + p.Spec.SchedulingGates, + corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}, + ) +} + +// hasManagedLabel returns true, if pod contains 'kueue.x-k8s.io/managed' label +func hasManagedLabel(p *corev1.Pod) bool { + if l, ok := p.Labels["kueue.x-k8s.io/managed"]; ok && l == "true" { + return true + } + return false +}