Skip to content

Commit

Permalink
Improve support for pods
Browse files Browse the repository at this point in the history
* Add namespace/pod label filtering for Default pod
  webhook. Add PodIntegrationOptions configuration
  field containing namespace and pod label selectors.

* Simplify RunWithPodSetsInfo method for the pod
  controller.

* Create a new interface JobWithFinalize. Jobs implementing
  it supports custom finalization logic.

* Add k8s version check. If the pod integration is
  enabled on k8s server versions < 1.27, Kueue pod
  will stop with an error message.

* Add JobWithSkip interface. Jobs that implement
  this interface can introduce custom
  reconciliation skip logic.

* Add IsPodOwnerManagedByQueue function.
  Defaulting webhook will skip a pod if it's owner
  is managed by Kueue. Reconciler will skip such a
  pod even if 'managed' label is set.

* Add integration tests for the pod controller
  and webhook.
  • Loading branch information
achernevskii committed Sep 8, 2023
1 parent da86eba commit 31fd8e1
Show file tree
Hide file tree
Showing 26 changed files with 2,077 additions and 77 deletions.
10 changes: 10 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,5 +224,15 @@ type Integrations struct {
// - "jobset.x-k8s.io/jobset"
// - "kubeflow.org/pytorchjob"
// - "kubeflow.org/tfjob"
// - "pod"
Frameworks []string `json:"frameworks,omitempty"`
// PodOptions defines kueue controller behaviour for pod objects
PodOptions *PodIntegrationOptions `json:"podOptions,omitempty"`
}

type PodIntegrationOptions struct {
// NamespaceSelector can be used to omit some namespaces from pod reconciliation
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
// PodSelector can be used to choose what pods to reconcile
PodSelector *metav1.LabelSelector `json:"podSelector,omitempty"`
}
22 changes: 22 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,26 @@ func SetDefaults_Configuration(cfg *Configuration) {
if cfg.Integrations.Frameworks == nil {
cfg.Integrations.Frameworks = []string{job.FrameworkName}
}

if cfg.Integrations.PodOptions == nil {
cfg.Integrations.PodOptions = &PodIntegrationOptions{}
}

if cfg.Integrations.PodOptions.NamespaceSelector == nil {
matchExpressionsValues := []string{"kube-system", *cfg.Namespace}

cfg.Integrations.PodOptions.NamespaceSelector = &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: matchExpressionsValues,
},
},
}
}

if cfg.Integrations.PodOptions.PodSelector == nil {
cfg.Integrations.PodOptions.PodSelector = &metav1.LabelSelector{}
}
}
38 changes: 34 additions & 4 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,36 @@ func TestSetDefaults_Configuration(t *testing.T) {
}
defaultIntegrations := &Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
}

overwriteNamespaceIntegrations := &Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", overwriteNamespace},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
}

podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
podsReadyTimeoutOverwrite := metav1.Duration{Duration: time.Minute}

Expand Down Expand Up @@ -206,7 +235,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
WebhookSecretName: ptr.To(DefaultWebhookSecretName),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
},
},
"should not default InternalCertManagement": {
Expand All @@ -223,7 +252,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
},
},
"should not default values in custom ClientConnection": {
Expand All @@ -247,7 +276,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
QPS: ptr.To[float32](123.0),
Burst: ptr.To[int32](456),
},
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
},
},
"should default empty custom ClientConnection": {
Expand All @@ -265,7 +294,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: overwriteNamespaceIntegrations,
},
},
"defaulting waitForPodsReady.timeout": {
Expand Down Expand Up @@ -359,6 +388,7 @@ func TestSetDefaults_Configuration(t *testing.T) {
ClientConnection: defaultClientConnection,
Integrations: &Integrations{
Frameworks: []string{"a", "b"},
PodOptions: defaultIntegrations.PodOptions,
},
},
},
Expand Down
30 changes: 30 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 26 additions & 6 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
"fmt"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
schedulingv1 "k8s.io/api/scheduling/v1"
Expand All @@ -37,6 +33,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -67,8 +66,9 @@ import (
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
podIntegrationErr = errors.New("pod integration only supported in Kubernetes 1.27 or newer")
)

func init() {
Expand Down Expand Up @@ -246,6 +246,26 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
log.Error(err, "Unable to create controller")
return err
}
if name == "pod" {
err := serverVersionFetcher.FetchServerVersion()
if err != nil {
setupLog.Error(err, "failed to fetch kubernetes server version")
os.Exit(1)
}
v := serverVersionFetcher.GetServerVersion()
if v.String() == "" || v.LessThan(kubeversion.KubeVersion1_27) {
setupLog.Error(podIntegrationErr,
"Failed to configure reconcilers",
"kubernetesVersion", v)
os.Exit(1)
}

opts = append(
opts,
jobframework.WithPodNamespaceSelector(cfg.Integrations.PodOptions.NamespaceSelector),
jobframework.WithPodSelector(cfg.Integrations.PodOptions.PodSelector),
)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
Expand Down
17 changes: 14 additions & 3 deletions cmd/kueue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@ limitations under the License.
package main

import (
"fmt"
"os"
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/utils/ptr"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
)

Expand Down Expand Up @@ -99,13 +98,25 @@ integrations:
// referencing job.FrameworkName ensures the link of job package
// therefore the batch/framework should be registered
Frameworks: []string{job.FrameworkName},
PodOptions: &config.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
},
},
},
{
name: "bad integrations config",
configFile: badIntegrationsConfig,
wantError: field.NotSupported(field.NewPath("integrations", "frameworks"), "unregistered/jobframework", jobframework.GetIntegrationsList()),
wantError: fmt.Errorf("integrations.frameworks: Unsupported value: \"unregistered/jobframework\": supported values: \"batch/job\", \"jobset.x-k8s.io/jobset\", \"kubeflow.org/mpijob\", \"kubeflow.org/pytorchjob\", \"kubeflow.org/tfjob\", \"pod\", \"ray.io/rayjob\""),
},
}

Expand Down
6 changes: 6 additions & 0 deletions config/components/manager/controller_manager_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ integrations:
- "kubeflow.org/pytorchjob"
- "kubeflow.org/tfjob"
# - "pod"
podOptions:
namespaceSelector:
matchExpressions:
- key: kubernetes.io/metadata.name
operator: NotIn
values: [ kube-system, kueue-system ]
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
return options, cfg, err
}
}

errs := ValidateConfiguration(cfg)
if ea := errs.ToAggregate(); ea != nil {
return options, cfg, ea
}

addTo(&options, &cfg)
return options, cfg, err
}
50 changes: 49 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ integrations:

defaultIntegrations := &configapi.Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &configapi.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
}

testcases := []struct {
Expand Down Expand Up @@ -311,7 +323,21 @@ integrations:
ManageJobsWithoutQueueName: false,
InternalCertManagement: enableDefaultInternalCertManagement,
ClientConnection: defaultClientConnection,
Integrations: defaultIntegrations,
Integrations: &configapi.Integrations{
Frameworks: []string{job.FrameworkName},
PodOptions: &configapi.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-tenant-a"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
},
},
wantOptions: defaultControlOptions,
},
Expand Down Expand Up @@ -514,6 +540,18 @@ integrations:
// referencing job.FrameworkName ensures the link of job package
// therefore the batch/framework should be registered
Frameworks: []string{job.FrameworkName},
PodOptions: &configapi.PodIntegrationOptions{
NamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: "kubernetes.io/metadata.name",
Operator: metav1.LabelSelectorOpNotIn,
Values: []string{"kube-system", "kueue-system"},
},
},
},
PodSelector: &metav1.LabelSelector{},
},
},
},
wantOptions: ctrl.Options{
Expand Down Expand Up @@ -609,6 +647,16 @@ func TestEncode(t *testing.T) {
"manageJobsWithoutQueueName": false,
"integrations": map[string]any{
"frameworks": []any{"batch/job"},
"podOptions": map[string]any{
"namespaceSelector": map[string]any{
"matchExpressions": []any{map[string]any{
"key": "kubernetes.io/metadata.name",
"operator": "NotIn",
"values": []any{"kube-system", "kueue-system"},
}},
},
"podSelector": map[string]any{},
},
},
},
},
Expand Down

0 comments on commit 31fd8e1

Please sign in to comment.