Skip to content

Commit

Permalink
Add support for single pods (#1072)
Browse files Browse the repository at this point in the history
* [jobs/pod] Add support for pods

* Improve support for pods

* Add namespace/pod label filtering for Default pod
  webhook. Add PodIntegrationOptions configuration
  field containing namespace and pod label selectors.

* Simplify RunWithPodSetsInfo method for the pod
  controller.

* Create a new interface JobWithFinalize. Jobs implementing
  it supports custom finalization logic.

* Add k8s version check. If the pod integration is
  enabled on k8s server versions < 1.27, Kueue pod
  will stop with an error message.

* Add JobWithSkip interface. Jobs that implement
  this interface can introduce custom
  reconciliation skip logic.

* Add IsPodOwnerManagedByQueue function.
  Defaulting webhook will skip a pod if it's owner
  is managed by Kueue. Reconciler will skip such a
  pod even if 'managed' label is set.

* Add integration tests for the pod controller
  and webhook.

* Address PR comments

* Add tests for pod controller interacting with
  scheduler

* Change IsManagingObjectsOwner functions for
  Kubeflow jobs.

* Update webhook paths for integration tests.

* Add parent check for kubeflow PaddleJob resource

* Update helm chart.

* Replace patchesStrategicMerge with patches in
  webhook kustomization.yaml

* Update unit/integration tests.

* Change pod webhook failure policy from Ignore to
  fail, add webhook namespace selector patch.

* Merge tests in validation_test.go into a single
  test for exported ValidateConfiguration function.

* Update JobWithSkip interface.

* Add missing licence comments.

* Rewrite some of the integration tests messages
  and value validations.

* Update unit tests.

* Add integration test, change stop pod condition

---------

Co-authored-by: Traian Schiau <traian_schiau@epam.com>
  • Loading branch information
achernevskii and trasc committed Sep 27, 2023
1 parent 5029293 commit a68805f
Show file tree
Hide file tree
Showing 37 changed files with 3,151 additions and 58 deletions.
10 changes: 10 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,17 @@ type Integrations struct {
// - "kubeflow.org/pytorchjob"
// - "kubeflow.org/tfjob"
// - "kubeflow.org/xgboostjob"
// - "pod"
Frameworks []string `json:"frameworks,omitempty"`
// PodOptions defines kueue controller behaviour for pod objects
PodOptions *PodIntegrationOptions `json:"podOptions,omitempty"`
}

type PodIntegrationOptions struct {
// NamespaceSelector can be used to omit some namespaces from pod reconciliation
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
// PodSelector can be used to choose what pods to reconcile
PodSelector *metav1.LabelSelector `json:"podSelector,omitempty"`
}

type QueueVisibility struct {
Expand Down
22 changes: 22 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,26 @@ func SetDefaults_Configuration(cfg *Configuration) {
MaxCount: DefaultClusterQueuesMaxCount,
}
}

if cfg.Integrations.PodOptions == nil {
cfg.Integrations.PodOptions = &PodIntegrationOptions{}
}

if cfg.Integrations.PodOptions.NamespaceSelector == nil {
matchExpressionsValues := []string{"kube-system", *cfg.Namespace}

cfg.Integrations.PodOptions.NamespaceSelector = &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: matchExpressionsValues,
},
},
}
}

if cfg.Integrations.PodOptions.PodSelector == nil {
cfg.Integrations.PodOptions.PodSelector = &metav1.LabelSelector{}
}
}
38 changes: 34 additions & 4 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,42 @@ func TestSetDefaults_Configuration(t *testing.T) {
}
defaultIntegrations := &Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
}
defaultQueueVisibility := &QueueVisibility{
UpdateIntervalSeconds: DefaultQueueVisibilityUpdateIntervalSeconds,
ClusterQueues: &ClusterQueueVisibility{
MaxCount: 10,
},
}

overwriteNamespaceIntegrations := &Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", overwriteNamespace},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
}

podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute}

Expand Down Expand Up @@ -217,7 +246,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
WebhookSecretName: ptr.To(DefaultWebhookSecretName),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
},
},
Expand All @@ -235,7 +264,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
},
},
Expand All @@ -260,7 +289,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
QPS: ptr.To[float32](123.0),
Burst: ptr.To[int32](456),
},
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
},
},
Expand All @@ -279,7 +308,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
QueueVisibility: defaultQueueVisibility,
},
},
Expand Down Expand Up @@ -377,6 +406,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: &Integrations{
Frameworks: []string{"a", "b"},
PodOptions: defaultIntegrations.PodOptions,
},
QueueVisibility: defaultQueueVisibility,
},
Expand Down
30 changes: 30 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- patch
- apiGroups:
- ""
resources:
Expand Down
53 changes: 53 additions & 0 deletions charts/kueue/templates/webhook/webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ webhooks:
resources:
- mpijobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: '{{ include "kueue.fullname" . }}-webhook-service'
namespace: '{{ .Release.Namespace }}'
path: /mutate--v1-pod
failurePolicy: Fail
name: mpod.kb.io
namespaceSelector:
matchExpressions:
- key: kubernetes.io/metadata.name
operator: NotIn
values:
- kube-system
- '{{ .Release.Namespace }}'
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
resources:
- pods
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down Expand Up @@ -272,3 +298,30 @@ webhooks:
resources:
- rayjobs
sideEffects: None
- admissionReviewVersions:
- v1
clientConfig:
service:
name: '{{ include "kueue.fullname" . }}-webhook-service'
namespace: '{{ .Release.Namespace }}'
path: /validate--v1-pod
failurePolicy: Fail
name: vpod.kb.io
namespaceSelector:
matchExpressions:
- key: kubernetes.io/metadata.name
operator: NotIn
values:
- kube-system
- '{{ .Release.Namespace }}'
rules:
- apiGroups:
- ""
apiVersions:
- v1
operations:
- CREATE
- UPDATE
resources:
- pods
sideEffects: None
7 changes: 7 additions & 0 deletions charts/kueue/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ managerConfig:
- "kubeflow.org/pytorchjob"
- "kubeflow.org/tfjob"
- "kubeflow.org/xgboostjob"
# - "pod"
# podOptions:
# namespaceSelector:
# matchExpressions:
# - key: kubernetes.io/metadata.name
# operator: NotIn
# values: [ kube-system, kueue-system ]
# ports definition for metricsService and webhookService.
metricsService:
ports:
Expand Down
26 changes: 24 additions & 2 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -66,8 +67,9 @@ import (
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
errPodIntegration = errors.New("pod integration only supported in Kubernetes 1.27 or newer")
)

func init() {
Expand Down Expand Up @@ -239,6 +241,21 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
log.Error(err, "Unable to create controller")
return err
}
if name == "pod" {
v := serverVersionFetcher.GetServerVersion()
if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) {
setupLog.Error(errPodIntegration,
"Failed to configure reconcilers",
"kubernetesVersion", v)
os.Exit(1)
}

opts = append(
opts,
jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector),
jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector),
)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
Expand Down Expand Up @@ -299,6 +316,11 @@ func setupServerVersionFetcher(mgr ctrl.Manager, kubeConfig *rest.Config) *kubev
os.Exit(1)
}

if err := serverVersionFetcher.FetchServerVersion(); err != nil {
setupLog.Error(err, "failed to fetch kubernetes server version")
os.Exit(1)
}

return serverVersionFetcher
}

Expand Down
14 changes: 13 additions & 1 deletion cmd/kueue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,18 @@ integrations:
// referencing job.FrameworkName ensures the link of job package
// therefore the batch/framework should be registered
Frameworks: []string{job.FrameworkName},
PodOptions: &config.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
},
QueueVisibility: &config.QueueVisibility{
UpdateIntervalSeconds: config.DefaultQueueVisibilityUpdateIntervalSeconds,
Expand All @@ -110,7 +122,7 @@ integrations:
{
name: "bad integrations config",
configFile: badIntegrationsConfig,
wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"ray.io/rayjob\""),
wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/paddlejob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"kubeflow.org/xgboostjob\", \"pod\", \"ray.io/rayjob\""),
},
}

Expand Down
7 changes: 7 additions & 0 deletions config/components/manager/controller_manager_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ integrations:
- "kubeflow.org/pytorchjob"
- "kubeflow.org/tfjob"
- "kubeflow.org/xgboostjob"
# - "pod"
# podOptions:
# namespaceSelector:
# matchExpressions:
# - key: kubernetes.io/metadata.name
# operator: NotIn
# values: [ kube-system, kueue-system ]
18 changes: 18 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,24 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
resources:
- pods/status
verbs:
- get
- patch
- apiGroups:
- ""
resources:
Expand Down

0 comments on commit a68805f

Please sign in to comment.