Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
  • Loading branch information
tenzen-y committed Feb 10, 2024
1 parent a42a66c commit 6054a7d
Show file tree
Hide file tree
Showing 23 changed files with 415 additions and 84 deletions.
9 changes: 7 additions & 2 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,19 @@ type MultiKueue struct {

type RequeuingStrategy struct {
// Timestamp defines the timestamp used for requeuing a Workload
// that was evicted due to Pod readiness.
// that was evicted due to Pod readiness. The possible values are:
//
// - `Eviction` (default): indicates from Workload .metadata.creationTimestamp.
// - `Creation`: indicates from Workload .status.conditions.
//
// Defaults to Eviction.
// +optional
Timestamp *RequeuingTimestamp `json:"timestamp,omitempty"`

// BackoffLimitCount defines the maximum number of requeuing retries.
// When the number is reached, the workload is deactivated (`.spec.activate`=`false`).
// The duration until the workload is deactivated is calculated by "Rand+SUM[k=0,n](1.41891^n)"
// where the "n" means the "workloadStatus.requeueState.count-1", and the "Rand" means the jitter.
// Given that the "backoffLimitCount" equals "30", the result approximately equals 24 hours.
//
// Defaults to null.
// +optional
Expand Down
2 changes: 1 addition & 1 deletion apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type WorkloadStatus struct {
// changed once set.
Admission *Admission `json:"admission,omitempty"`

// requeueState holds the state of the requeued Workload according to the requeueing strategy.
// requeueState holds the state of the requeued Workload according to the requeuing strategy.
//
// +optional
RequeueState *RequeueState `json:"requeueState,omitempty"`
Expand Down
6 changes: 1 addition & 5 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,7 @@ func setupServerVersionFetcher(mgr ctrl.Manager, kubeConfig *rest.Config) *kubev
}

func blockForPodsReady(cfg *configapi.Configuration) bool {
return waitForPodsReady(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission
}

func waitForPodsReady(cfg *configapi.Configuration) bool {
return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable
return config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.BlockAdmission != nil && *cfg.WaitForPodsReady.BlockAdmission
}

func podsReadyRequeuingTimestamp(cfg *configapi.Configuration) configapi.RequeuingTimestamp {
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,7 @@ func Load(scheme *runtime.Scheme, configFile string) (ctrl.Options, configapi.Co
addTo(&options, &cfg)
return options, cfg, err
}

func WaitForPodsReadyIsEnabled(cfg *configapi.Configuration) bool {
return cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable
}
38 changes: 37 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ apiVersion: config.kueue.x-k8s.io/v1beta1
kind: Configuration
waitForPodsReady:
enable: true
requeuingStrategy:
timestamp: Creation
backoffLimitCount: 10
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -543,7 +546,8 @@ multiKueue:
BlockAdmission: ptr.To(true),
Timeout: &metav1.Duration{Duration: 5 * time.Minute},
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.EvictionTimestamp),
Timestamp: ptr.To(configapi.CreationTimestamp),
BackoffLimitCount: ptr.To[int32](10),
},
},
ClientConnection: defaultClientConnection,
Expand Down Expand Up @@ -938,3 +942,35 @@ func TestEncode(t *testing.T) {
})
}
}

func TestWaitForPodsReadyIsEnabled(t *testing.T) {
cases := map[string]struct {
cfg *configapi.Configuration
want bool
}{
"cfg.waitForPodsReady is null": {
cfg: &configapi.Configuration{},
},
"cfg.WaitForPodsReadyIsEnabled.enable is false": {
cfg: &configapi.Configuration{
WaitForPodsReady: &configapi.WaitForPodsReady{},
},
},
"waitForPodsReady is true": {
cfg: &configapi.Configuration{
WaitForPodsReady: &configapi.WaitForPodsReady{
Enable: true,
},
},
want: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
got := WaitForPodsReadyIsEnabled(tc.cfg)
if tc.want != got {
t.Errorf("Unexpected result from WaitForPodsReadyIsEnabled\nwant:\n%v\ngot:%v\n", tc.want, got)
}
})
}
}
17 changes: 17 additions & 0 deletions pkg/config/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,15 @@ var (
integrationsFrameworksPath = integrationsPath.Child("frameworks")
podOptionsPath = integrationsPath.Child("podOptions")
namespaceSelectorPath = podOptionsPath.Child("namespaceSelector")
waitForPodsReadyPath = field.NewPath("waitForPodsReady")
requeuingStrategyPath = waitForPodsReadyPath.Child("requeuingStrategy")
)

func validate(c *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList

allErrs = append(allErrs, validateWaitForPodsReady(c)...)

allErrs = append(allErrs, validateQueueVisibility(c)...)

// Validate PodNamespaceSelector for the pod framework
Expand All @@ -52,6 +56,19 @@ func validate(c *configapi.Configuration) field.ErrorList {
return allErrs
}

func validateWaitForPodsReady(c *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList
if !WaitForPodsReadyIsEnabled(c) {
return allErrs
}
if strategy := c.WaitForPodsReady.RequeuingStrategy; strategy != nil && strategy.Timestamp != nil &&
*strategy.Timestamp != configapi.CreationTimestamp && *strategy.Timestamp != configapi.EvictionTimestamp {
allErrs = append(allErrs, field.NotSupported(requeuingStrategyPath.Child("timestamp"),
strategy.Timestamp, []configapi.RequeuingTimestamp{configapi.CreationTimestamp, configapi.EvictionTimestamp}))
}
return allErrs
}

func validateQueueVisibility(cfg *configapi.Configuration) field.ErrorList {
var allErrs field.ErrorList
if cfg.QueueVisibility != nil {
Expand Down
29 changes: 29 additions & 0 deletions pkg/config/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,35 @@ func TestValidate(t *testing.T) {
},
wantErr: nil,
},
"no supported waitForPodsReady.requeuingStrategy.timestamp": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
WaitForPodsReady: &configapi.WaitForPodsReady{
Enable: true,
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To[configapi.RequeuingTimestamp]("NoSupported"),
},
},
},
wantErr: field.ErrorList{
&field.Error{
Type: field.ErrorTypeNotSupported,
Field: "waitForPodsReady.requeuingStrategy.timestamp",
},
},
},
"supported waitForPodsReady.requeuingStrategy.timestamp": {
cfg: &configapi.Configuration{
Integrations: defaultIntegrations,
WaitForPodsReady: &configapi.WaitForPodsReady{
Enable: true,
RequeuingStrategy: &configapi.RequeuingStrategy{
Timestamp: ptr.To(configapi.CreationTimestamp),
},
},
},
wantErr: nil,
},
}

for name, tc := range testCases {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/core/clusterqueue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
Expand All @@ -47,6 +48,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
*utiltesting.MakeWorkload("beta", "").Queue(lqName).Obj(),
},
}
fakeClock := testingclock.NewFakeClock(time.Now())

testCases := map[string]struct {
cqStatus kueue.ClusterQueueStatus
Expand Down Expand Up @@ -198,7 +200,7 @@ func TestUpdateCqStatusIfChanged(t *testing.T) {
qManager: qManager,
}
if tc.newWl != nil {
r.qManager.AddOrUpdateWorkload(tc.newWl)
r.qManager.AddOrUpdateWorkload(ctx, tc.newWl, fakeClock)
}
err := r.updateCqStatusIfChanged(ctx, cq, tc.newConditionStatus, tc.newReason, tc.newMessage)
if err != nil {
Expand Down
23 changes: 16 additions & 7 deletions pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (

ctrl "sigs.k8s.io/controller-runtime"

config "sigs.k8s.io/kueue/apis/config/v1beta1"
configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/config"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/queue"
)
Expand All @@ -31,7 +32,7 @@ const updateChBuffer = 10

// SetupControllers sets up the core controllers. It returns the name of the
// controller that failed to create and an error, if any.
func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *config.Configuration) (string, error) {
func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache, cfg *configapi.Configuration) (string, error) {
rfRec := NewResourceFlavorReconciler(mgr.GetClient(), qManager, cc)
if err := rfRec.SetupWithManager(mgr, cfg); err != nil {
return "ResourceFlavor", err
Expand Down Expand Up @@ -65,27 +66,35 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache
if err := NewWorkloadReconciler(mgr.GetClient(), qManager, cc,
mgr.GetEventRecorderFor(constants.WorkloadControllerName),
WithWorkloadUpdateWatchers(qRec, cqRec),
WithPodsReadyTimeout(podsReadyTimeout(cfg))).SetupWithManager(mgr, cfg); err != nil {
WithPodsReadyTimeout(podsReadyTimeout(cfg)),
WithRequeuingBackoffLimitCount(requeuingBackoffLimitCount(cfg))).SetupWithManager(mgr, cfg); err != nil {
return "Workload", err
}
return "", nil
}

func podsReadyTimeout(cfg *config.Configuration) *time.Duration {
if cfg.WaitForPodsReady != nil && cfg.WaitForPodsReady.Enable && cfg.WaitForPodsReady.Timeout != nil {
func podsReadyTimeout(cfg *configapi.Configuration) *time.Duration {
if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.Timeout != nil {
return &cfg.WaitForPodsReady.Timeout.Duration
}
return nil
}

func queueVisibilityUpdateInterval(cfg *config.Configuration) time.Duration {
func requeuingBackoffLimitCount(cfg *configapi.Configuration) *int32 {
if config.WaitForPodsReadyIsEnabled(cfg) && cfg.WaitForPodsReady.RequeuingStrategy != nil {
return cfg.WaitForPodsReady.RequeuingStrategy.BackoffLimitCount
}
return nil
}

func queueVisibilityUpdateInterval(cfg *configapi.Configuration) time.Duration {
if cfg.QueueVisibility != nil {
return time.Duration(cfg.QueueVisibility.UpdateIntervalSeconds) * time.Second
}
return 0
}

func queueVisibilityClusterQueuesMaxCount(cfg *config.Configuration) int32 {
func queueVisibilityClusterQueuesMaxCount(cfg *configapi.Configuration) int32 {
if cfg.QueueVisibility != nil && cfg.QueueVisibility.ClusterQueues != nil {
return cfg.QueueVisibility.ClusterQueues.MaxCount
}
Expand Down

0 comments on commit 6054a7d

Please sign in to comment.