Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multikueue] Manage worker cluster unavailability #1681

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ type MultiKueue struct {
// them if their local counterpart no longer exists.
// +optional
Origin *string `json:"origin,omitempty"`

// WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready
// if the connection with its reserving worker cluster is lost.
trasc marked this conversation as resolved.
Show resolved Hide resolved
//
// Defaults to 15 minutes.
// +optional
WorkerLostTimeout *metav1.Duration `json:"workerLostTimeout,omitempty"`
}

type RequeuingStrategy struct {
Expand Down
4 changes: 4 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
defaultJobFrameworkName = "batch/job"
DefaultMultiKueueGCInterval = time.Minute
DefaultMultiKueueOrigin = "multikueue"
DefaultMultiKueueWorkerLostTimeout = 15 * time.Minute
)

func getOperatorNamespace() string {
Expand Down Expand Up @@ -175,4 +176,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if ptr.Deref(cfg.MultiKueue.Origin, "") == "" {
cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin)
}
if cfg.MultiKueue.WorkerLostTimeout == nil {
cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}
}
}
20 changes: 12 additions & 8 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
}

defaultMultiKueue := &MultiKueue{
GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
Origin: ptr.To(DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
Origin: ptr.To(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout},
}

podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
Expand Down Expand Up @@ -498,8 +499,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
},
},
want: &Configuration{
Expand All @@ -512,8 +514,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
},
},
},
Expand All @@ -537,8 +540,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute},
},
},
},
Expand Down
5 changes: 5 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)),
multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions cmd/kueue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ integrations:
},
},
MultiKueue: &config.MultiKueue{
GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
Origin: ptr.To(config.DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
Origin: ptr.To(config.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueWorkerLostTimeout},
},
},
},
Expand Down
16 changes: 10 additions & 6 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ namespace: kueue-system
multiKueue:
gcInterval: 1m30s
origin: multikueue-manager1
workerLostTimeout: 10m
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -333,8 +334,9 @@ multiKueue:
}

defaultMultiKueue := &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
}

testcases := []struct {
Expand Down Expand Up @@ -800,8 +802,9 @@ multiKueue:
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: 90 * time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: 90 * time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute},
},
},
wantOptions: defaultControlOptions,
Expand Down Expand Up @@ -914,8 +917,9 @@ func TestEncode(t *testing.T) {
"clusterQueues": map[string]any{"maxCount": int64(10)},
},
"multiKueue": map[string]any{
"gcInterval": "1m0s",
"origin": "multikueue",
"gcInterval": "1m0s",
"origin": "multikueue",
"workerLostTimeout": "15m0s",
},
},
},
Expand Down
52 changes: 29 additions & 23 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,20 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, client.IgnoreNotFound(err)
}

inactiveReason := ""

log.V(2).Info("Reconcile AdmissionCheck")

newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
}

if cfg, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil {
inactiveReason = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error())
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "BadConfig"
newCondition.Message = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error())

} else {
var missingClusters []string
var inactiveClusters []string
Expand All @@ -90,28 +99,25 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
}
}
}
unusableClustersCount := len(missingClusters) + len(inactiveClusters)
if unusableClustersCount > 0 {
if unusableClustersCount < len(cfg.Spec.Clusters) {
// keep it partially active
newCondition.Reason = "SomeActiveClusters"
} else {
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "NoUsableClusters"
}

var messageParts []string
if len(missingClusters) > 0 {
messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)}
}
if len(inactiveClusters) > 0 {
messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters))
var messageParts []string
if len(missingClusters) > 0 {
messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)}
}
if len(inactiveClusters) > 0 {
messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters))
}
newCondition.Message = strings.Join(messageParts, ", ")
}
inactiveReason = strings.Join(messageParts, ", ")
}

newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
}
if len(inactiveReason) == 0 {
newCondition.Status = metav1.ConditionTrue
newCondition.Reason = "Active"
newCondition.Message = "The admission check is active"
} else {
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "Inactive"
newCondition.Message = inactiveReason
}

oldCondition := apimeta.FindStatusCondition(ac.Status.Conditions, kueue.AdmissionCheckActive)
Expand Down
41 changes: 35 additions & 6 deletions pkg/controller/admissionchecks/multikueue/admissioncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "BadConfig",
Message: `Cannot load the AdmissionChecks parameters: multikueueconfigs.kueue.x-k8s.io "config1" not found`,
}).
Obj(),
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Missing clusters: [worker1]",
}).
Obj(),
Expand Down Expand Up @@ -123,13 +123,13 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Inactive clusters: [worker1]",
}).
Obj(),
},
},
"missing and inactive cluster": {
"all clusters missing or inactive": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
Expand All @@ -143,7 +143,7 @@ func TestReconcile(t *testing.T) {

clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
Expand All @@ -152,7 +152,36 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Missing clusters: [worker3], Inactive clusters: [worker1 worker2]",
}).
Obj(),
},
},
"partially active": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Obj(),
},
configs: []kueuealpha.MultiKueueConfig{
*utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2", "worker3").Obj(),
},

clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "SomeActiveClusters",
Message: "Missing clusters: [worker3], Inactive clusters: [worker1]",
}).
Obj(),
Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
)

const (
defaultGCInterval = time.Minute
defaultOrigin = "multikueue"
defaultGCInterval = time.Minute
defaultOrigin = "multikueue"
defaulWorkerLostTimeout = 5 * time.Minute
)

type SetupOptions struct {
gcInterval time.Duration
origin string
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
}

type SetupOption func(o *SetupOptions)
Expand All @@ -49,10 +51,20 @@ func WithOrigin(origin string) SetupOption {
}
}

// WithWorkerLostTimeout - sets the time for which the multikueue
// admission check is kept in Ready state after the connection to
// the admitting worker cluster is lost.
func WithWorkerLostTimeout(d time.Duration) SetupOption {
return func(o *SetupOptions) {
o.workerLostTimeout = d
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaulWorkerLostTimeout,
}

for _, o := range opts {
Expand All @@ -76,6 +88,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout)
return wlRec.setupWithManager(mgr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) {
cRec.remoteClients["worker1"] = w1remoteClient

helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout)

_, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}})
if gotErr != nil {
Expand Down