diff --git a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go index 1170a04aab..eb85735d5c 100644 --- a/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go +++ b/apis/visibility/v1alpha1/openapi/zz_generated.openapi.go @@ -90,6 +90,10 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadOptions": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadOptions(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummary(ref), "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummaryList": schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummaryList(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload": schema_kueue_apis_visibility_v1alpha1_RunningWorkload(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadOptions": schema_kueue_apis_visibility_v1alpha1_RunningWorkloadOptions(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary": schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummary(ref), + "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummaryList": schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummaryList(ref), } } @@ -2544,12 +2548,18 @@ func schema_kueue_apis_visibility_v1alpha1_ClusterQueue(ref common.ReferenceCall Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"), }, }, + "runningWorkloadsSummary": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"), + }, + }, }, - Required: []string{"pendingWorkloadsSummary"}, + Required: []string{"pendingWorkloadsSummary", "runningWorkloadsSummary"}, }, }, Dependencies: []string{ - "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"}, + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"}, } } @@ -2883,3 +2893,179 @@ func schema_kueue_apis_visibility_v1alpha1_PendingWorkloadsSummaryList(ref commo "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary"}, } } + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkload(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunningWorkload is a user-facing representation of a running workload that summarizes the relevant information for assumed resources in the cluster queue.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "priority": { + SchemaProps: spec.SchemaProps{ + Description: "Priority indicates the workload's priority", + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, + "admissionTime": { + SchemaProps: spec.SchemaProps{ + Description: "AdmissionTime indecates the time workloads admitted", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + }, + Required: []string{"priority", "admissionTime"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + } +} + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkloadOptions(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunningWorkloadOptions are query params used in the visibility queries", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "offset": { + SchemaProps: spec.SchemaProps{ + Description: "Offset indicates position of the first pending workload that should be fetched, starting from 0. 0 by default", + Default: 0, + Type: []string{"integer"}, + Format: "int64", + }, + }, + "limit": { + SchemaProps: spec.SchemaProps{ + Description: "Limit indicates max number of pending workloads that should be fetched. 1000 by default", + Type: []string{"integer"}, + Format: "int64", + }, + }, + }, + Required: []string{"offset"}, + }, + }, + } +} + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummary(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RunningWorkloadsSummary contains a list of running workloads in the context of the query (within LocalQueue or ClusterQueue).", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkload"}, + } +} + +func schema_kueue_apis_visibility_v1alpha1_RunningWorkloadsSummaryList(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "kind": { + SchemaProps: spec.SchemaProps{ + Description: "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds", + Type: []string{"string"}, + Format: "", + }, + }, + "apiVersion": { + SchemaProps: spec.SchemaProps{ + Description: "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources", + Type: []string{"string"}, + Format: "", + }, + }, + "metadata": { + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta"), + }, + }, + "items": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: map[string]interface{}{}, + Ref: ref("sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"), + }, + }, + }, + }, + }, + }, + Required: []string{"items"}, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta", "sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary"}, + } +} diff --git a/apis/visibility/v1alpha1/types.go b/apis/visibility/v1alpha1/types.go index 1d7b6e9980..ddfd2e3e89 100644 --- a/apis/visibility/v1alpha1/types.go +++ b/apis/visibility/v1alpha1/types.go @@ -25,11 +25,13 @@ import ( // +k8s:openapi-gen=true // +genclient:nonNamespaced // +genclient:method=GetPendingWorkloadsSummary,verb=get,subresource=pendingworkloads,result=sigs.k8s.io/kueue/apis/visibility/v1alpha1.PendingWorkloadsSummary +// +genclient:method=GetRunningWorkloadsSummary,verb=get,subresource=runningWorkloads,result=sigs.k8s.io/kueue/apis/visibility/v1alpha1.RunningWorkloadsSummary type ClusterQueue struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Summary PendingWorkloadsSummary `json:"pendingWorkloadsSummary"` + PendingWorkloadsSummary PendingWorkloadsSummary `json:"pendingWorkloadsSummary"` + RunningWorkloadsSummary RunningWorkloadsSummary `json:"runningWorkloadsSummary"` } // +kubebuilder:object:root=true @@ -77,6 +79,37 @@ type PendingWorkload struct { PositionInLocalQueue int32 `json:"positionInLocalQueue"` } +// RunningWorkload is a user-facing representation of a running workload that summarizes the relevant information for +// assumed resources in the cluster queue. +type RunningWorkload struct { + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Priority indicates the workload's priority + Priority int32 `json:"priority"` + // AdmissionTime indecates the time workloads admitted + AdmissionTime metav1.Time `json:"admissionTime"` +} + +// +k8s:openapi-gen=true +// +kubebuilder:object:root=true + +// RunningWorkloadsSummary contains a list of running workloads in the context +// of the query (within LocalQueue or ClusterQueue). +type RunningWorkloadsSummary struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Items []RunningWorkload `json:"items"` +} + +// +kubebuilder:object:root=true +type RunningWorkloadsSummaryList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []RunningWorkloadsSummary `json:"items"` +} + // +k8s:openapi-gen=true // +kubebuilder:object:root=true @@ -102,6 +135,22 @@ type PendingWorkloadsSummaryList struct { // +k8s:conversion-gen:explicit-from=net/url.Values // +k8s:defaulter-gen=true +// RunningWorkloadOptions are query params used in the visibility queries +type RunningWorkloadOptions struct { + metav1.TypeMeta `json:",inline"` + + // Offset indicates position of the first pending workload that should be fetched, starting from 0. 0 by default + Offset int64 `json:"offset"` + + // Limit indicates max number of pending workloads that should be fetched. 1000 by default + Limit int64 `json:"limit,omitempty"` +} + +// +kubebuilder:object:root=true +// +k8s:openapi-gen=true +// +k8s:conversion-gen:explicit-from=net/url.Values +// +k8s:defaulter-gen=true + // PendingWorkloadOptions are query params used in the visibility queries type PendingWorkloadOptions struct { metav1.TypeMeta `json:",inline"` @@ -117,5 +166,7 @@ func init() { SchemeBuilder.Register( &PendingWorkloadsSummary{}, &PendingWorkloadOptions{}, + &RunningWorkloadsSummary{}, + &RunningWorkloadOptions{}, ) } diff --git a/apis/visibility/v1alpha1/zz_generated.conversion.go b/apis/visibility/v1alpha1/zz_generated.conversion.go index be9cf32efc..8c78a03339 100644 --- a/apis/visibility/v1alpha1/zz_generated.conversion.go +++ b/apis/visibility/v1alpha1/zz_generated.conversion.go @@ -39,6 +39,11 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*url.Values)(nil), (*RunningWorkloadOptions)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_url_Values_To_v1alpha1_RunningWorkloadOptions(a.(*url.Values), b.(*RunningWorkloadOptions), scope) + }); err != nil { + return err + } return nil } @@ -66,3 +71,28 @@ func autoConvert_url_Values_To_v1alpha1_PendingWorkloadOptions(in *url.Values, o func Convert_url_Values_To_v1alpha1_PendingWorkloadOptions(in *url.Values, out *PendingWorkloadOptions, s conversion.Scope) error { return autoConvert_url_Values_To_v1alpha1_PendingWorkloadOptions(in, out, s) } + +func autoConvert_url_Values_To_v1alpha1_RunningWorkloadOptions(in *url.Values, out *RunningWorkloadOptions, s conversion.Scope) error { + // WARNING: Field TypeMeta does not have json tag, skipping. + + if values, ok := map[string][]string(*in)["offset"]; ok && len(values) > 0 { + if err := runtime.Convert_Slice_string_To_int64(&values, &out.Offset, s); err != nil { + return err + } + } else { + out.Offset = 0 + } + if values, ok := map[string][]string(*in)["limit"]; ok && len(values) > 0 { + if err := runtime.Convert_Slice_string_To_int64(&values, &out.Limit, s); err != nil { + return err + } + } else { + out.Limit = 0 + } + return nil +} + +// Convert_url_Values_To_v1alpha1_RunningWorkloadOptions is an autogenerated conversion function. +func Convert_url_Values_To_v1alpha1_RunningWorkloadOptions(in *url.Values, out *RunningWorkloadOptions, s conversion.Scope) error { + return autoConvert_url_Values_To_v1alpha1_RunningWorkloadOptions(in, out, s) +} diff --git a/apis/visibility/v1alpha1/zz_generated.deepcopy.go b/apis/visibility/v1alpha1/zz_generated.deepcopy.go index 30ecbe25ab..cea5bbe6e3 100644 --- a/apis/visibility/v1alpha1/zz_generated.deepcopy.go +++ b/apis/visibility/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,8 @@ func (in *ClusterQueue) DeepCopyInto(out *ClusterQueue) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Summary.DeepCopyInto(&out.Summary) + in.PendingWorkloadsSummary.DeepCopyInto(&out.PendingWorkloadsSummary) + in.RunningWorkloadsSummary.DeepCopyInto(&out.RunningWorkloadsSummary) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterQueue. @@ -243,3 +244,108 @@ func (in *PendingWorkloadsSummaryList) DeepCopyObject() runtime.Object { } return nil } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkload) DeepCopyInto(out *RunningWorkload) { + *out = *in + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.AdmissionTime.DeepCopyInto(&out.AdmissionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkload. +func (in *RunningWorkload) DeepCopy() *RunningWorkload { + if in == nil { + return nil + } + out := new(RunningWorkload) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkloadOptions) DeepCopyInto(out *RunningWorkloadOptions) { + *out = *in + out.TypeMeta = in.TypeMeta +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkloadOptions. +func (in *RunningWorkloadOptions) DeepCopy() *RunningWorkloadOptions { + if in == nil { + return nil + } + out := new(RunningWorkloadOptions) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RunningWorkloadOptions) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkloadsSummary) DeepCopyInto(out *RunningWorkloadsSummary) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RunningWorkload, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkloadsSummary. +func (in *RunningWorkloadsSummary) DeepCopy() *RunningWorkloadsSummary { + if in == nil { + return nil + } + out := new(RunningWorkloadsSummary) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RunningWorkloadsSummary) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RunningWorkloadsSummaryList) DeepCopyInto(out *RunningWorkloadsSummaryList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RunningWorkloadsSummary, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RunningWorkloadsSummaryList. +func (in *RunningWorkloadsSummaryList) DeepCopy() *RunningWorkloadsSummaryList { + if in == nil { + return nil + } + out := new(RunningWorkloadsSummaryList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RunningWorkloadsSummaryList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 0716a64185..845336534b 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -136,6 +136,10 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &applyconfigurationvisibilityv1alpha1.PendingWorkloadApplyConfiguration{} case visibilityv1alpha1.SchemeGroupVersion.WithKind("PendingWorkloadsSummary"): return &applyconfigurationvisibilityv1alpha1.PendingWorkloadsSummaryApplyConfiguration{} + case visibilityv1alpha1.SchemeGroupVersion.WithKind("RunningWorkload"): + return &applyconfigurationvisibilityv1alpha1.RunningWorkloadApplyConfiguration{} + case visibilityv1alpha1.SchemeGroupVersion.WithKind("RunningWorkloadsSummary"): + return &applyconfigurationvisibilityv1alpha1.RunningWorkloadsSummaryApplyConfiguration{} } return nil diff --git a/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go b/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go index dd6baa3506..b1def5ae4c 100644 --- a/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go +++ b/client-go/applyconfiguration/visibility/v1alpha1/clusterqueue.go @@ -28,7 +28,8 @@ import ( type ClusterQueueApplyConfiguration struct { v1.TypeMetaApplyConfiguration `json:",inline"` *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` - Summary *PendingWorkloadsSummaryApplyConfiguration `json:"pendingWorkloadsSummary,omitempty"` + PendingWorkloadsSummary *PendingWorkloadsSummaryApplyConfiguration `json:"pendingWorkloadsSummary,omitempty"` + RunningWorkloadsSummary *RunningWorkloadsSummaryApplyConfiguration `json:"runningWorkloadsSummary,omitempty"` } // ClusterQueue constructs an declarative configuration of the ClusterQueue type for use with @@ -199,10 +200,18 @@ func (b *ClusterQueueApplyConfiguration) ensureObjectMetaApplyConfigurationExist } } -// WithSummary sets the Summary field in the declarative configuration to the given value +// WithPendingWorkloadsSummary sets the PendingWorkloadsSummary field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Summary field is set to the value of the last call. -func (b *ClusterQueueApplyConfiguration) WithSummary(value *PendingWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { - b.Summary = value +// If called multiple times, the PendingWorkloadsSummary field is set to the value of the last call. +func (b *ClusterQueueApplyConfiguration) WithPendingWorkloadsSummary(value *PendingWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { + b.PendingWorkloadsSummary = value + return b +} + +// WithRunningWorkloadsSummary sets the RunningWorkloadsSummary field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the RunningWorkloadsSummary field is set to the value of the last call. +func (b *ClusterQueueApplyConfiguration) WithRunningWorkloadsSummary(value *RunningWorkloadsSummaryApplyConfiguration) *ClusterQueueApplyConfiguration { + b.RunningWorkloadsSummary = value return b } diff --git a/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go b/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go new file mode 100644 index 0000000000..7e72f15a15 --- /dev/null +++ b/client-go/applyconfiguration/visibility/v1alpha1/runningworkload.go @@ -0,0 +1,196 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// RunningWorkloadApplyConfiguration represents an declarative configuration of the RunningWorkload type for use +// with apply. +type RunningWorkloadApplyConfiguration struct { + *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + Priority *int32 `json:"priority,omitempty"` + AdmissionTime *metav1.Time `json:"admissionTime,omitempty"` +} + +// RunningWorkloadApplyConfiguration constructs an declarative configuration of the RunningWorkload type for use with +// apply. +func RunningWorkload() *RunningWorkloadApplyConfiguration { + return &RunningWorkloadApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithName(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithGenerateName(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithNamespace(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithUID(value types.UID) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithResourceVersion(value string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithGeneration(value int64) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithCreationTimestamp(value metav1.Time) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithDeletionTimestamp(value metav1.Time) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *RunningWorkloadApplyConfiguration) WithLabels(entries map[string]string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *RunningWorkloadApplyConfiguration) WithAnnotations(entries map[string]string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Annotations == nil && len(entries) > 0 { + b.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *RunningWorkloadApplyConfiguration) WithOwnerReferences(values ...*v1.OwnerReferenceApplyConfiguration) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.OwnerReferences = append(b.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *RunningWorkloadApplyConfiguration) WithFinalizers(values ...string) *RunningWorkloadApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.Finalizers = append(b.Finalizers, values[i]) + } + return b +} + +func (b *RunningWorkloadApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &v1.ObjectMetaApplyConfiguration{} + } +} + +// WithPriority sets the Priority field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Priority field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithPriority(value int32) *RunningWorkloadApplyConfiguration { + b.Priority = &value + return b +} + +// WithAdmissionTime sets the AdmissionTime field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionTime field is set to the value of the last call. +func (b *RunningWorkloadApplyConfiguration) WithAdmissionTime(value metav1.Time) *RunningWorkloadApplyConfiguration { + b.AdmissionTime = &value + return b +} diff --git a/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go b/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go new file mode 100644 index 0000000000..756788c18a --- /dev/null +++ b/client-go/applyconfiguration/visibility/v1alpha1/runningworkloadssummary.go @@ -0,0 +1,212 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" +) + +// RunningWorkloadsSummaryApplyConfiguration represents an declarative configuration of the RunningWorkloadsSummary type for use +// with apply. +type RunningWorkloadsSummaryApplyConfiguration struct { + v1.TypeMetaApplyConfiguration `json:",inline"` + *v1.ObjectMetaApplyConfiguration `json:"metadata,omitempty"` + Items []RunningWorkloadApplyConfiguration `json:"items,omitempty"` +} + +// RunningWorkloadsSummaryApplyConfiguration constructs an declarative configuration of the RunningWorkloadsSummary type for use with +// apply. +func RunningWorkloadsSummary() *RunningWorkloadsSummaryApplyConfiguration { + b := &RunningWorkloadsSummaryApplyConfiguration{} + b.WithKind("RunningWorkloadsSummary") + b.WithAPIVersion("visibility.kueue.x-k8s.io/v1alpha1") + return b +} + +// WithKind sets the Kind field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Kind field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithKind(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.Kind = &value + return b +} + +// WithAPIVersion sets the APIVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the APIVersion field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithAPIVersion(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.APIVersion = &value + return b +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithName(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Name = &value + return b +} + +// WithGenerateName sets the GenerateName field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the GenerateName field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithGenerateName(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.GenerateName = &value + return b +} + +// WithNamespace sets the Namespace field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Namespace field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithNamespace(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Namespace = &value + return b +} + +// WithUID sets the UID field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UID field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithUID(value types.UID) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.UID = &value + return b +} + +// WithResourceVersion sets the ResourceVersion field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ResourceVersion field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithResourceVersion(value string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.ResourceVersion = &value + return b +} + +// WithGeneration sets the Generation field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Generation field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithGeneration(value int64) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.Generation = &value + return b +} + +// WithCreationTimestamp sets the CreationTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the CreationTimestamp field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithCreationTimestamp(value metav1.Time) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.CreationTimestamp = &value + return b +} + +// WithDeletionTimestamp sets the DeletionTimestamp field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionTimestamp field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithDeletionTimestamp(value metav1.Time) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionTimestamp = &value + return b +} + +// WithDeletionGracePeriodSeconds sets the DeletionGracePeriodSeconds field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the DeletionGracePeriodSeconds field is set to the value of the last call. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithDeletionGracePeriodSeconds(value int64) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + b.DeletionGracePeriodSeconds = &value + return b +} + +// WithLabels puts the entries into the Labels field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Labels field, +// overwriting an existing map entries in Labels field with the same key. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithLabels(entries map[string]string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Labels == nil && len(entries) > 0 { + b.Labels = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Labels[k] = v + } + return b +} + +// WithAnnotations puts the entries into the Annotations field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the Annotations field, +// overwriting an existing map entries in Annotations field with the same key. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithAnnotations(entries map[string]string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + if b.Annotations == nil && len(entries) > 0 { + b.Annotations = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.Annotations[k] = v + } + return b +} + +// WithOwnerReferences adds the given value to the OwnerReferences field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OwnerReferences field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithOwnerReferences(values ...*v1.OwnerReferenceApplyConfiguration) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + if values[i] == nil { + panic("nil value passed to WithOwnerReferences") + } + b.OwnerReferences = append(b.OwnerReferences, *values[i]) + } + return b +} + +// WithFinalizers adds the given value to the Finalizers field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Finalizers field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithFinalizers(values ...string) *RunningWorkloadsSummaryApplyConfiguration { + b.ensureObjectMetaApplyConfigurationExists() + for i := range values { + b.Finalizers = append(b.Finalizers, values[i]) + } + return b +} + +func (b *RunningWorkloadsSummaryApplyConfiguration) ensureObjectMetaApplyConfigurationExists() { + if b.ObjectMetaApplyConfiguration == nil { + b.ObjectMetaApplyConfiguration = &v1.ObjectMetaApplyConfiguration{} + } +} + +// WithItems adds the given value to the Items field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the Items field. +func (b *RunningWorkloadsSummaryApplyConfiguration) WithItems(values ...*RunningWorkloadApplyConfiguration) *RunningWorkloadsSummaryApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithItems") + } + b.Items = append(b.Items, *values[i]) + } + return b +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go index a4b6fcbf52..ec6698ad94 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/clusterqueue.go @@ -50,6 +50,7 @@ type ClusterQueueInterface interface { Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.ClusterQueue, err error) Apply(ctx context.Context, clusterQueue *visibilityv1alpha1.ClusterQueueApplyConfiguration, opts v1.ApplyOptions) (result *v1alpha1.ClusterQueue, err error) GetPendingWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (*v1alpha1.PendingWorkloadsSummary, error) + GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (*v1alpha1.RunningWorkloadsSummary, error) ClusterQueueExpansion } @@ -209,3 +210,16 @@ func (c *clusterQueues) GetPendingWorkloadsSummary(ctx context.Context, clusterQ Into(result) return } + +// GetRunningWorkloadsSummary takes name of the clusterQueue, and returns the corresponding v1alpha1.RunningWorkloadsSummary object, and an error if there is any. +func (c *clusterQueues) GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (result *v1alpha1.RunningWorkloadsSummary, err error) { + result = &v1alpha1.RunningWorkloadsSummary{} + err = c.client.Get(). + Resource("clusterqueues"). + Name(clusterQueueName). + SubResource("runningWorkloads"). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} diff --git a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go index d0cc7fa7d7..8956ce734a 100644 --- a/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go +++ b/client-go/clientset/versioned/typed/visibility/v1alpha1/fake/fake_clusterqueue.go @@ -152,3 +152,13 @@ func (c *FakeClusterQueues) GetPendingWorkloadsSummary(ctx context.Context, clus } return obj.(*v1alpha1.PendingWorkloadsSummary), err } + +// GetRunningWorkloadsSummary takes name of the clusterQueue, and returns the corresponding runningWorkloadsSummary object, and an error if there is any. +func (c *FakeClusterQueues) GetRunningWorkloadsSummary(ctx context.Context, clusterQueueName string, options v1.GetOptions) (result *v1alpha1.RunningWorkloadsSummary, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetSubresourceAction(clusterqueuesResource, "runningWorkloads", clusterQueueName), &v1alpha1.RunningWorkloadsSummary{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.RunningWorkloadsSummary), err +} diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 8498e47815..90e76a7dcb 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -179,7 +179,7 @@ func main() { }() if features.Enabled(features.VisibilityOnDemand) { - go visibility.CreateAndStartVisibilityServer(queues, ctx) + go visibility.CreateAndStartVisibilityServer(queues, cCache, ctx) } setupScheduler(mgr, cCache, queues, &cfg) diff --git a/keps/2145-list-admitted-workloads/README.md b/keps/2145-list-admitted-workloads/README.md new file mode 100644 index 0000000000..306d5d517f --- /dev/null +++ b/keps/2145-list-admitted-workloads/README.md @@ -0,0 +1,145 @@ +# KEP-1834: List Admitted And Not Finished Workloads + + +- [Summary](#summary) +- [Motivation](#motivation) + - [Goals](#goals) + - [Non-Goals](#non-goals) +- [Proposal](#proposal) + - [Risks and Mitigations](#risks-and-mitigations) +- [Design Details](#design-details) + - [Test Plan](#test-plan) + - [Prerequisite testing updates](#prerequisite-testing-updates) + - [Unit Tests](#unit-tests) + - [Integration tests](#integration-tests) + - [Graduation Criteria](#graduation-criteria) +- [Implementation History](#implementation-history) +- [Drawbacks](#drawbacks) +- [Alternatives](#alternatives) + + +## Summary + +Add a new visibility endpoint in Kueue for querying the list of Workloads that are still "running". + +## Motivation + +Jsonpath support in kubectl is limited, and we can not filter resources by condition in this way. By adding a new +visibility endpoint, users can list running workloads by `kubectl get`. + +### Goals + +* Support list workloads that are admitted and not finished. + +### Non-Goals + +## Proposal + +We will add a new visibility endpoint in Kueue. +``` go +// RunningWorkload is a user-facing representation of a running workload that summarizes the relevant information for +// assumed resources in the cluster queue. +type RunningWorkload struct { + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Priority indicates the workload's priority + Priority int32 `json:"priority"` + // AdmissionTime indecates the time workloads admitted + AdmissionTime metav1.Time `json:"admissionTime"` +} + +// +k8s:openapi-gen=true +// +kubebuilder:object:root=true + +// RunningWorkloadsSummary contains a list of running workloads in the context +// of the query (within LocalQueue or ClusterQueue). +type RunningWorkloadsSummary struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Items []RunningWorkload `json:"items"` +} + +// +kubebuilder:object:root=true +type RunningWorkloadsSummaryList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + + Items []RunningWorkloadsSummary `json:"items"` +} + +// +kubebuilder:object:root=true +// +k8s:openapi-gen=true +// +k8s:conversion-gen:explicit-from=net/url.Values +// +k8s:defaulter-gen=true + +// RunningWorkloadOptions are query params used in the visibility queries +type RunningWorkloadOptions struct { + metav1.TypeMeta `json:",inline"` + + // Offset indicates position of the first pending workload that should be fetched, starting from 0. 0 by default + Offset int64 `json:"offset"` + + // Limit indicates max number of pending workloads that should be fetched. 1000 by default + Limit int64 `json:"limit,omitempty"` +} +``` + +Users can list the running workloads by using +``` bash +kubectl get --raw "/apis/visibility.kueue.x-k8s.io/v1alpha1/clusterqueues/cluster-queue/runningworkloads" +``` + +We will show priority and localqueue information in response. Like this: +``` +{"kind":"RunningWorkloadsSummary","apiVersion":"visibility.kueue.x-k8s.io/v1alpha1","metadata":{"creationTimestamp":null},"items":[{"metadata":{"name":"job-sample-job-jz228-ef938","namespace":"default","creationTimestamp":"2024-05-06T02:15:26Z","ownerReferences":[{"apiVersion":"batch/v1","kind":"Job","name":"sample-job-jz228","uid":"2de8a359-4c95-4159-b677-0279066149b6"}]},"priority":0,"admissionTime":"xxxx"}]} +``` + +### Risks and Mitigations + +None. + +## Design Details + + +### Test Plan + + + +[x] I/we understand the owners of the involved components may require updates to +existing tests to make this code solid enough prior to committing the changes necessary +to implement this enhancement. + +##### Prerequisite testing updates + + + +#### Unit Tests + +New unit tests should be added testing the functionality for new api. + +#### Integration tests + +The idea is to enhance the existing integrations tests to check if workload objects are created with correct labels. + +### Graduation Criteria + +## Implementation History + +* 2024-04-29 First draft + +## Drawbacks + +## Alternatives diff --git a/keps/2145-list-admitted-workloads/kep.yaml b/keps/2145-list-admitted-workloads/kep.yaml new file mode 100644 index 0000000000..f9a9385da6 --- /dev/null +++ b/keps/2145-list-admitted-workloads/kep.yaml @@ -0,0 +1,22 @@ +title: List Admitted And Not Finished Workloads +kep-number: 1834 +authors: + - "@kunwuluan" +status: draft +creation-date: 2024-04-29 +reviewers: + - "@alculquicondor" +approvers: + +# The target maturity stage in the current dev cycle for this KEP. +stage: beta + +# The most recent milestone for which work toward delivery of this KEP has been +# done. This can be the current (upcoming) milestone, if it is being actively +# worked on. +latest-milestone: "v0.9" + +# The milestone at which this feature was, or is targeted to be, at each stage. +milestone: + alpha: "v0.9" + beta: "v0.9" diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 29e60de3de..78b3e588c9 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -23,6 +23,8 @@ import ( "sort" "sync" + apierr "k8s.io/apimachinery/pkg/api/errors" + "github.com/go-logr/logr" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,9 +34,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/metrics" + utilpriority "sigs.k8s.io/kueue/pkg/util/priority" "sigs.k8s.io/kueue/pkg/workload" ) @@ -674,6 +678,41 @@ func filterLocalQueueUsage(orig FlavorResourceQuantities, resourceGroups []Resou return qFlvUsages } +func workloadOrderingFunc(a, b *workload.Info) bool { + p1 := utilpriority.Priority(a.Obj) + p2 := utilpriority.Priority(b.Obj) + + if p1 != p2 { + return p1 > p2 + } + + return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp) +} + +func (c *Cache) RunningWorkload(name string) ([]*workload.Info, error) { + c.RLock() + defer c.RUnlock() + cq, ok := c.clusterQueues[name] + if !ok { + return []*workload.Info{}, apierr.NewNotFound(v1alpha1.Resource("clusterqueue"), name) + } + + count := 0 + wkls := make([]*workload.Info, 0, len(cq.Workloads)) + for _, wkl := range cq.Workloads { + if !workload.IsAdmitted(wkl.Obj) || workload.IsFinished(wkl.Obj) { + continue + } + wkls = append(wkls, wkl) + count++ + } + wkls = wkls[:count] + sort.Slice(wkls, func(i, j int) bool { + return workloadOrderingFunc(wkls[i], wkls[j]) + }) + return wkls, nil +} + func (c *Cache) cleanupAssumedState(w *kueue.Workload) { k := workload.Key(w) assumedCQName, assumed := c.assumedWorkloads[k] diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a03d114821..0dbf29ad98 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -40,6 +40,7 @@ const ( PodPriorityClassSource = "scheduling.k8s.io/priorityclass" DefaultPendingWorkloadsLimit = 1000 + DefaultRunningWorkloadsLimit = 1000 IsNegativeErrorMsg string = `must be greater than or equal to 0` ) diff --git a/pkg/visibility/api/install.go b/pkg/visibility/api/install.go index 48f8a9b761..f738960cbe 100644 --- a/pkg/visibility/api/install.go +++ b/pkg/visibility/api/install.go @@ -24,6 +24,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" v1alpha1 "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" apirest "sigs.k8s.io/kueue/pkg/visibility/api/rest" ) @@ -43,9 +44,10 @@ func init() { } // Install installs API scheme defined in apis/v1alpha1 and registers storage -func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager) error { +func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager, cCache *cache.Cache) error { apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(v1alpha1.GroupVersion.Group, Scheme, ParameterCodec, Codecs) pendingWorkloadsInCqREST := apirest.NewPendingWorkloadsInCqREST(kueueMgr) + runningWorkloadsInCqREST := apirest.NewRunningWorkloadsInCqREST(cCache) cqREST := apirest.NewCqREST() pendingWorkloadsInLqREST := apirest.NewPendingWorkloadsInLqREST(kueueMgr) lqREST := apirest.NewLqREST() @@ -53,6 +55,7 @@ func Install(server *genericapiserver.GenericAPIServer, kueueMgr *queue.Manager) visibilityServerResources := map[string]rest.Storage{ "clusterqueues": cqREST, "clusterqueues/pendingworkloads": pendingWorkloadsInCqREST, + "clusterqueues/runningworkloads": runningWorkloadsInCqREST, "localqueues": lqREST, "localqueues/pendingworkloads": pendingWorkloadsInLqREST, } diff --git a/pkg/visibility/api/rest/pending_workloads_cq_test.go b/pkg/visibility/api/rest/pending_workloads_cq_test.go index d039533cbd..8e060f2542 100644 --- a/pkg/visibility/api/rest/pending_workloads_cq_test.go +++ b/pkg/visibility/api/rest/pending_workloads_cq_test.go @@ -63,8 +63,8 @@ func TestPendingWorkloadsInCQ(t *testing.T) { clusterQueues []*kueue.ClusterQueue queues []*kueue.LocalQueue workloads []*kueue.Workload - req *req - wantResp *resp + req *pendingReq + wantResp *pendingResp wantErrMatch func(error) bool }{ "single ClusterQueue and single LocalQueue setup with two workloads and default query parameters": { @@ -78,11 +78,11 @@ func TestPendingWorkloadsInCQ(t *testing.T) { utiltesting.MakeWorkload("a", nsName).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), utiltesting.MakeWorkload("b", nsName).Queue(lqNameA).Priority(lowPrio).Creation(now).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -122,11 +122,11 @@ func TestPendingWorkloadsInCQ(t *testing.T) { utiltesting.MakeWorkload("lqB-high-prio", nsName).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("lqB-low-prio", nsName).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -186,13 +186,13 @@ func TestPendingWorkloadsInCQ(t *testing.T) { utiltesting.MakeWorkload("b", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: &visibility.PendingWorkloadOptions{ Limit: 2, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -230,14 +230,14 @@ func TestPendingWorkloadsInCQ(t *testing.T) { utiltesting.MakeWorkload("b", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: &visibility.PendingWorkloadOptions{ Offset: 1, Limit: constants.DefaultPendingWorkloadsLimit, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -275,14 +275,14 @@ func TestPendingWorkloadsInCQ(t *testing.T) { utiltesting.MakeWorkload("b", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsName).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: &visibility.PendingWorkloadOptions{ Offset: 1, Limit: 1, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -301,18 +301,18 @@ func TestPendingWorkloadsInCQ(t *testing.T) { clusterQueues: []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue(cqNameA).Obj(), }, - req: &req{ + req: &pendingReq{ queueName: cqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{}, + wantResp: &pendingResp{}, }, "nonexistent queue name": { - req: &req{ + req: &pendingReq{ queueName: "nonexistent-queue", queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantErr: errors.NewNotFound(visibility.Resource("clusterqueue"), "nonexistent-queue"), }, wantErrMatch: errors.IsNotFound, diff --git a/pkg/visibility/api/rest/pending_workloads_lq_test.go b/pkg/visibility/api/rest/pending_workloads_lq_test.go index 008d09e34d..73cf9aba52 100644 --- a/pkg/visibility/api/rest/pending_workloads_lq_test.go +++ b/pkg/visibility/api/rest/pending_workloads_lq_test.go @@ -63,8 +63,8 @@ func TestPendingWorkloadsInLQ(t *testing.T) { clusterQueues []*kueue.ClusterQueue queues []*kueue.LocalQueue workloads []*kueue.Workload - req *req - wantResp *resp + req *pendingReq + wantResp *pendingResp wantErrMatch func(error) bool }{ "single ClusterQueue and single LocalQueue setup with two workloads and default query parameters": { @@ -78,12 +78,12 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("a", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now).Obj(), utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(lowPrio).Creation(now).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -123,12 +123,12 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("lqB-high-prio", nsNameA).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("lqB-low-prio", nsNameA).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -168,12 +168,12 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("lqB-high-prio", nsNameA).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("lqB-low-prio", nsNameA).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameB, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -215,12 +215,12 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("lqB-high-prio", nsNameB).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("lqB-low-prio", nsNameB).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -262,12 +262,12 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("lqB-high-prio", nsNameB).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("lqB-low-prio", nsNameB).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameB, queueName: lqNameB, queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -306,14 +306,14 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: &visibility.PendingWorkloadOptions{ Limit: 2, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -352,7 +352,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: &visibility.PendingWorkloadOptions{ @@ -360,7 +360,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { Limit: constants.DefaultPendingWorkloadsLimit, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -399,7 +399,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { utiltesting.MakeWorkload("b", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Obj(), utiltesting.MakeWorkload("c", nsNameA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Obj(), }, - req: &req{ + req: &pendingReq{ nsName: nsNameA, queueName: lqNameA, queryParams: &visibility.PendingWorkloadOptions{ @@ -407,7 +407,7 @@ func TestPendingWorkloadsInLQ(t *testing.T) { Limit: 1, }, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantPendingWorkloads: []visibility.PendingWorkload{ { ObjectMeta: v1.ObjectMeta{ @@ -424,11 +424,11 @@ func TestPendingWorkloadsInLQ(t *testing.T) { }, }, "nonexistent queue name": { - req: &req{ + req: &pendingReq{ queueName: "nonexistent-queue", queryParams: defaultQueryParams, }, - wantResp: &resp{ + wantResp: &pendingResp{ wantErr: errors.NewNotFound(visibility.Resource("localqueue"), "invalid-name"), }, wantErrMatch: errors.IsNotFound, diff --git a/pkg/visibility/api/rest/running_workloads_cq.go b/pkg/visibility/api/rest/running_workloads_cq.go new file mode 100644 index 0000000000..dc9a635845 --- /dev/null +++ b/pkg/visibility/api/rest/running_workloads_cq.go @@ -0,0 +1,91 @@ +// Copyright 2023 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + ctrl "sigs.k8s.io/controller-runtime" + + "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + + _ "k8s.io/metrics/pkg/apis/metrics/install" +) + +type runningWorkloadsInCqREST struct { + c *cache.Cache + log logr.Logger +} + +var _ rest.Storage = &runningWorkloadsInCqREST{} +var _ rest.GetterWithOptions = &runningWorkloadsInCqREST{} +var _ rest.Scoper = &runningWorkloadsInCqREST{} + +func NewRunningWorkloadsInCqREST(c *cache.Cache) *runningWorkloadsInCqREST { + return &runningWorkloadsInCqREST{ + c: c, + log: ctrl.Log.WithName("running-workload-in-cq"), + } +} + +// New implements rest.Storage interface +func (m *runningWorkloadsInCqREST) New() runtime.Object { + return &v1alpha1.RunningWorkloadsSummary{} +} + +// Destroy implements rest.Storage interface +func (m *runningWorkloadsInCqREST) Destroy() {} + +// Get implements rest.GetterWithOptions interface +// It fetches information about running workloads and returns according to query params +func (m *runningWorkloadsInCqREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) { + runningWorkloadOpts, ok := opts.(*v1alpha1.RunningWorkloadOptions) + if !ok { + return nil, fmt.Errorf("invalid options object: %#v", opts) + } + limit := runningWorkloadOpts.Limit + offset := runningWorkloadOpts.Offset + + wls := make([]v1alpha1.RunningWorkload, 0, limit) + runningWorkloadsInfo, err := m.c.RunningWorkload(name) + if err != nil { + return nil, err + } + + for index := int(offset); index < int(offset+limit) && index < len(runningWorkloadsInfo); index++ { + wlInfo := runningWorkloadsInfo[index] + wls = append(wls, *newRunningWorkload(wlInfo)) + } + return &v1alpha1.RunningWorkloadsSummary{Items: wls}, nil +} + +// NewGetOptions creates a new options object +func (m *runningWorkloadsInCqREST) NewGetOptions() (runtime.Object, bool, string) { + // If no query parameters were passed the generated defaults function are not executed so it's necessary to set default values here as well + return &v1alpha1.RunningWorkloadOptions{ + Limit: constants.DefaultRunningWorkloadsLimit, + }, false, "" +} + +// NamespaceScoped implements rest.Scoper interface +func (m *runningWorkloadsInCqREST) NamespaceScoped() bool { + return false +} diff --git a/pkg/visibility/api/rest/running_workloads_cq_test.go b/pkg/visibility/api/rest/running_workloads_cq_test.go new file mode 100644 index 0000000000..5706dfe18c --- /dev/null +++ b/pkg/visibility/api/rest/running_workloads_cq_test.go @@ -0,0 +1,397 @@ +// Copyright 2024 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rest + +import ( + "context" + "sort" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" + "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +func TestRunningWorkloadsInCQ(t *testing.T) { + const ( + nsName = "foo" + cqNameA = "cqA" + lqNameA = "lqA" + lqNameB = "lqB" + lowPrio = 50 + highPrio = 100 + ) + + var ( + defaultQueryParams = &visibility.RunningWorkloadOptions{ + Offset: 0, + Limit: constants.DefaultRunningWorkloadsLimit, + } + ) + + var ( + q1 = utiltesting.MakeFlavorQuotas("flavor1").Resource("cpu", "10").Obj() + ) + + podSets := []kueue.PodSet{ + *utiltesting.MakePodSet("driver", 1). + Request(corev1.ResourceCPU, "1"). + Obj(), + } + podSetFlavors := []kueue.PodSetAssignment{ + { + Name: "driver", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "flavor1", + }, + ResourceUsage: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + } + + var ( + adA = utiltesting.MakeAdmission(cqNameA).PodSets(podSetFlavors...).Obj() + ) + + scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + if err := visibility.AddToScheme(scheme); err != nil { + t.Fatalf("Failed adding kueue scheme: %s", err) + } + + now := time.Now() + cases := map[string]struct { + clusterQueues []*kueue.ClusterQueue + workloads []*kueue.Workload + req *runningReq + wantResp *runningResp + wantErrMatch func(error) bool + }{ + "single ClusterQueue and single LocalQueue setup with two workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Creation(now).Admitted(true).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(lowPrio).Creation(now).Admitted(true).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: lowPrio, + }}, + }, + }, + "single ClusterQueue and two LocalQueue setup with four workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("lqA-high-prio", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Creation(now).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqA-low-prio", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(lowPrio).Creation(now).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqB-high-prio", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Priority(highPrio).Creation(now.Add(time.Second)).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqB-low-prio", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Priority(lowPrio).Creation(now.Add(time.Second)).Admitted(true).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-high-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-high-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-low-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: lowPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-low-prio", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: lowPrio, + }}, + }, + }, + "single ClusterQueue and two LocalQueue setup with four running workloads and two pending workloads and default query parameters": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("lqA-1", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Creation(now).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqA-2", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Creation(now.Add(time.Second * 1)).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqB-1", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Creation(now.Add(time.Second * 2)).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqB-2", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Creation(now.Add(time.Second * 3)).Admitted(true).Obj(), + utiltesting.MakeWorkload("lqA-pending", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Creation(now.Add(time.Second * 4)).Obj(), + utiltesting.MakeWorkload("lqB-pending", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Creation(now.Add(time.Second * 5)).Obj(), + utiltesting.MakeWorkload("lqA-finished", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Creation(now.Add(time.Second * 6)).Finished().Obj(), + utiltesting.MakeWorkload("lqB-finished", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameB).Creation(now.Add(time.Second * 7)).Finished().Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-1", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-1", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second * 2)), + }, + Priority: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqA-2", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second * 1)), + }, + Priority: 0, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "lqB-2", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second * 3)), + }, + Priority: 0, + }}, + }, + }, + "limit query parameter set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now.Add(time.Second * 2)).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.RunningWorkloadOptions{ + Limit: 2, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "a", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }}, + }, + }, + "offset query parameter set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now).Obj(), + utiltesting.MakeWorkload("b", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now.Add(time.Second)).Obj(), + utiltesting.MakeWorkload("c", nsName).PodSets(podSets...).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Admitted(true).Creation(now.Add(time.Second * 2)).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.RunningWorkloadOptions{ + Offset: 1, + Limit: constants.DefaultRunningWorkloadsLimit, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: "c", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second * 2)), + }, + Priority: highPrio, + }}, + }, + }, + "limit offset query parameters set": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + workloads: []*kueue.Workload{ + utiltesting.MakeWorkload("a", nsName).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Creation(now).Admitted(true).Obj(), + utiltesting.MakeWorkload("b", nsName).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second)).Admitted(true).Obj(), + utiltesting.MakeWorkload("c", nsName).ReserveQuota(adA).Queue(lqNameA).Priority(highPrio).Creation(now.Add(time.Second * 2)).Admitted(true).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: &visibility.RunningWorkloadOptions{ + Offset: 1, + Limit: 1, + }, + }, + wantResp: &runningResp{ + wantRunningWorkloads: []visibility.RunningWorkload{ + { + ObjectMeta: v1.ObjectMeta{ + Name: "b", + Namespace: nsName, + CreationTimestamp: v1.NewTime(now.Add(time.Second)), + }, + Priority: highPrio, + }}, + }, + }, + "empty cluster queue": { + clusterQueues: []*kueue.ClusterQueue{ + utiltesting.MakeClusterQueue(cqNameA).ResourceGroup(*q1).Obj(), + }, + req: &runningReq{ + queueName: cqNameA, + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{}, + }, + "nonexistent queue name": { + req: &runningReq{ + queueName: "nonexistent-queue", + queryParams: defaultQueryParams, + }, + wantResp: &runningResp{ + wantErr: errors.NewNotFound(visibility.Resource("clusterqueue"), "nonexistent-queue"), + }, + wantErrMatch: errors.IsNotFound, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + client := utiltesting.NewFakeClient() + cCache := cache.New(client, cache.WithPodsReadyTracking(false)) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + runningWorkloadsInCqREST := NewRunningWorkloadsInCqREST(cCache) + for _, cq := range tc.clusterQueues { + if err := cCache.AddClusterQueue(ctx, cq); err != nil { + t.Fatalf("Adding cluster queue %s: %v", cq.Name, err) + } + } + for _, w := range tc.workloads { + cCache.AddOrUpdateWorkload(w) + } + + info, err := runningWorkloadsInCqREST.Get(ctx, tc.req.queueName, tc.req.queryParams) + switch { + case tc.wantErrMatch != nil: + if !tc.wantErrMatch(err) { + t.Errorf("Error differs: (-want,+got):\n%s", cmp.Diff(tc.wantResp.wantErr.Error(), err.Error())) + } + case err != nil: + t.Error(err) + default: + runningWorkloadsInfo := info.(*visibility.RunningWorkloadsSummary) + less := func(a, b visibility.RunningWorkload) bool { + p1 := a.Priority + p2 := b.Priority + + if p1 != p2 { + return p1 > p2 + } + + return a.CreationTimestamp.Before(&b.CreationTimestamp) + } + sort.Slice(tc.wantResp.wantRunningWorkloads, func(i, j int) bool { + return less(tc.wantResp.wantRunningWorkloads[i], tc.wantResp.wantRunningWorkloads[j]) + }) + if diff := cmp.Diff(tc.wantResp.wantRunningWorkloads, runningWorkloadsInfo.Items, cmpopts.EquateEmpty(), cmpopts.IgnoreFields(visibility.RunningWorkload{}, "AdmissionTime")); diff != "" { + t.Errorf("Running workloads differ: (-want,+got):\n%s", diff) + } + } + }) + } +} diff --git a/pkg/visibility/api/rest/test_utils.go b/pkg/visibility/api/rest/test_utils.go index 4a69b1fe93..c12058f32c 100644 --- a/pkg/visibility/api/rest/test_utils.go +++ b/pkg/visibility/api/rest/test_utils.go @@ -18,13 +18,23 @@ import ( visibility "sigs.k8s.io/kueue/apis/visibility/v1alpha1" ) -type req struct { +type pendingReq struct { nsName string queueName string queryParams *visibility.PendingWorkloadOptions } -type resp struct { +type pendingResp struct { wantErr error wantPendingWorkloads []visibility.PendingWorkload } + +type runningReq struct { + queueName string + queryParams *visibility.RunningWorkloadOptions +} + +type runningResp struct { + wantErr error + wantRunningWorkloads []visibility.RunningWorkload +} diff --git a/pkg/visibility/api/rest/utils.go b/pkg/visibility/api/rest/utils.go index 81b73a7c10..386a193897 100644 --- a/pkg/visibility/api/rest/utils.go +++ b/pkg/visibility/api/rest/utils.go @@ -44,3 +44,34 @@ func newPendingWorkload(wlInfo *workload.Info, positionInLq int32, positionInCq PositionInLocalQueue: positionInLq, } } + +func newRunningWorkload(wlInfo *workload.Info) *v1alpha1.RunningWorkload { + ownerReferences := make([]metav1.OwnerReference, 0, len(wlInfo.Obj.OwnerReferences)) + for _, ref := range wlInfo.Obj.OwnerReferences { + ownerReferences = append(ownerReferences, metav1.OwnerReference{ + APIVersion: ref.APIVersion, + Kind: ref.Kind, + Name: ref.Name, + UID: ref.UID, + }) + } + admittedTime, err := workload.GetAdmissionTime(wlInfo.Obj) + if err != nil { + // this should never happen + return nil + } + var priority int32 = 0 + if wlInfo.Obj.Spec.Priority != nil { + priority = *wlInfo.Obj.Spec.Priority + } + return &v1alpha1.RunningWorkload{ + ObjectMeta: metav1.ObjectMeta{ + Name: wlInfo.Obj.Name, + Namespace: wlInfo.Obj.Namespace, + OwnerReferences: ownerReferences, + CreationTimestamp: wlInfo.Obj.CreationTimestamp, + }, + Priority: priority, + AdmissionTime: metav1.NewTime(admittedTime), + } +} diff --git a/pkg/visibility/server.go b/pkg/visibility/server.go index 59b21b9003..a5e3e8ed77 100644 --- a/pkg/visibility/server.go +++ b/pkg/visibility/server.go @@ -22,6 +22,7 @@ import ( "sigs.k8s.io/kueue/apis/visibility/v1alpha1" generatedopenapi "sigs.k8s.io/kueue/apis/visibility/v1alpha1/openapi" + "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/queue" "sigs.k8s.io/kueue/pkg/visibility/api" @@ -46,7 +47,7 @@ type server struct { // +kubebuilder:rbac:groups=flowcontrol.apiserver.k8s.io,resources=flowschemas/status,verbs=patch // CreateAndStartVisibilityServer creates visibility server injecting KueueManager and starts it -func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context) { +func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, cCache *cache.Cache, ctx context.Context) { config := newVisibilityServerConfig() if err := applyVisibilityServerOptions(config); err != nil { setupLog.Error(err, "Unable to apply VisibilityServerOptions") @@ -57,7 +58,7 @@ func CreateAndStartVisibilityServer(kueueMgr *queue.Manager, ctx context.Context setupLog.Error(err, "Unable to create visibility server") } - if err := api.Install(visibilityServer, kueueMgr); err != nil { + if err := api.Install(visibilityServer, kueueMgr, cCache); err != nil { setupLog.Error(err, "Unable to install visibility.kueue.x-k8s.io/v1alpha1 API") } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index cbf9cf4c74..780f64302a 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -557,6 +557,19 @@ func IsAdmitted(w *kueue.Workload) bool { return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadAdmitted) } +func GetAdmissionTime(w *kueue.Workload) (time.Time, error) { + for _, cond := range w.Status.Conditions { + if cond.Type != kueue.WorkloadAdmitted { + continue + } + if cond.Status != metav1.ConditionTrue { + return time.Time{}, fmt.Errorf("workload not admitted") + } + return cond.LastTransitionTime.Time, nil + } + return time.Time{}, fmt.Errorf("workload not admitted") +} + // IsFinished returns true if the workload is finished. func IsFinished(w *kueue.Workload) bool { return apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadFinished)