Skip to content

Commit

Permalink
Extend visibility ClusterQueue PendingWorkloads endpoint, add e2e tes…
Browse files Browse the repository at this point in the history
…ts, add unit tests, change query parameters types: int -> int32 (#1362)
  • Loading branch information
PBundyra committed Nov 27, 2023
1 parent 23063a7 commit cb1c985
Show file tree
Hide file tree
Showing 8 changed files with 573 additions and 68 deletions.
35 changes: 34 additions & 1 deletion apis/visibility/v1alpha1/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion apis/visibility/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ import (
type PendingWorkload struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

// Priority indicates the workload's priority
Priority int32 `json:"priority"`

// LocalQueueName indicates the name of the LocalQueue the workload is submitted to
LocalQueueName string `json:"localQueueName"`

// PositionInClusterQueue indicates the workload's position in the ClusterQueue, starting from 0
PositionInClusterQueue int32 `json:"positionInClusterQueue"`

// PositionInLocalQueue indicates the workload's position in the LocalQueue, starting from 0
PositionInLocalQueue int32 `json:"positionInLocalQueue"`
}

// +genclient
Expand Down Expand Up @@ -78,7 +90,7 @@ type PendingWorkloadsSummaryList struct {
type PendingWorkloadOptions struct {
metav1.TypeMeta `json:",inline"`

// Offset indicates position of the first pending workload that should be fetched starting from 0. 0 by default
// Offset indicates position of the first pending workload that should be fetched, starting from 0. 0 by default
Offset int64 `json:"offset"`

// Limit indicates max number of pending workloads that should be fetched. 1000 by default
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 18 additions & 14 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,30 +571,34 @@ func (m *Manager) getClusterQueue(cqName string) ClusterQueue {
return m.clusterQueues[cqName]
}

// PendingWorkloadsInfo computes a head of the new snapshot. Length of the head is set by the maxCount parameter
func (m *Manager) PendingWorkloadsInfo(cqName string) []kueue.ClusterQueuePendingWorkload {
pendingWlsInfo := make([]kueue.ClusterQueuePendingWorkload, 0)
func (m *Manager) PendingWorkloadsInfo(cqName string) []*workload.Info {
cq := m.getClusterQueue(cqName)
if cq == nil {
return pendingWlsInfo
return nil
}
return cq.Snapshot()
}

// UpdateSnapshot computes the new snapshot and replaces if it differs from the
// previous version. It returns true if the snapshot was actually updated.
func (m *Manager) UpdateSnapshot(cqName string, maxCount int32) bool {
cq := m.getClusterQueue(cqName)
if cq == nil {
return false
}
for _, info := range cq.Snapshot() {
newSnapshot := make([]kueue.ClusterQueuePendingWorkload, 0)
for index, info := range cq.Snapshot() {
if int32(index) >= maxCount {
break
}
if info == nil {
continue
}
pendingWlsInfo = append(pendingWlsInfo, kueue.ClusterQueuePendingWorkload{
newSnapshot = append(newSnapshot, kueue.ClusterQueuePendingWorkload{
Name: info.Obj.Name,
Namespace: info.Obj.Namespace,
})
}
return pendingWlsInfo
}

// UpdateSnapshot computes the new snapshot and replaces if it differs from the
// previous version. It returns true if the snapshot was actually updated.
func (m *Manager) UpdateSnapshot(cqName string, maxCount int32) bool {
pendingWlsInfo := m.PendingWorkloadsInfo(cqName)
newSnapshot := pendingWlsInfo[:min(maxCount, int32(len(pendingWlsInfo)))]
prevSnapshot := m.GetSnapshot(cqName)
if !equality.Semantic.DeepEqual(prevSnapshot, newSnapshot) {
m.setSnapshot(cqName, newSnapshot)
Expand Down
35 changes: 27 additions & 8 deletions pkg/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,30 +1093,49 @@ func TestGetPendingWorkloadsInfo(t *testing.T) {

cases := map[string]struct {
cqName string
wantPendingWorkloadsInfo []kueue.ClusterQueuePendingWorkload
wantPendingWorkloadsInfo []*workload.Info
}{
"Invalid ClusterQueue name": {
cqName: "invalid",
wantPendingWorkloadsInfo: make([]kueue.ClusterQueuePendingWorkload, 0),
wantPendingWorkloadsInfo: nil,
},
"ClusterQueue with 2 pending workloads": {
cqName: "cq",
wantPendingWorkloadsInfo: []kueue.ClusterQueuePendingWorkload{
wantPendingWorkloadsInfo: []*workload.Info{
{
Name: "a",
Namespace: "",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "a",
Namespace: "",
},
Spec: kueue.WorkloadSpec{
QueueName: "foo",
},
},
},
{
Name: "b",
Namespace: "",
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "b",
Namespace: "",
},
Spec: kueue.WorkloadSpec{
QueueName: "foo",
},
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
pendingWorkloadsInfo := manager.PendingWorkloadsInfo(tc.cqName)
if diff := cmp.Diff(tc.wantPendingWorkloadsInfo, pendingWorkloadsInfo, ignoreTypeMeta); diff != "" {
if diff := cmp.Diff(tc.wantPendingWorkloadsInfo, pendingWorkloadsInfo,
ignoreTypeMeta,
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "CreationTimestamp"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "PodSets"),
cmpopts.IgnoreFields(workload.Info{}, "TotalRequests"),
); diff != "" {
t.Errorf("GetPendingWorkloadsInfo returned wrong heads (-want,+got):\n%s", diff)
}
})
Expand Down
34 changes: 24 additions & 10 deletions pkg/visibility/api/rest/pending_workload_CQ.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (m *pendingWorkloadsInCqREST) New() runtime.Object {
// Destroy implements rest.Storage interface
func (m *pendingWorkloadsInCqREST) Destroy() {}

// Get implements rest.Getter interface
// Get implements rest.GetterWithOptions interface
// It fetches information about pending workloads and returns according to query params
func (m *pendingWorkloadsInCqREST) Get(ctx context.Context, name string, opts runtime.Object) (runtime.Object, error) {
pendingWorkloadOpts, ok := opts.(*v1alpha1.PendingWorkloadOptions)
Expand All @@ -66,15 +66,29 @@ func (m *pendingWorkloadsInCqREST) Get(ctx context.Context, name string, opts ru
offset := pendingWorkloadOpts.Offset

wls := make([]v1alpha1.PendingWorkload, 0, limit)
allPendingWorkloads := m.queueMgr.PendingWorkloadsInfo(name)
for index := offset; index < offset+limit && index < int64(len(allPendingWorkloads)); index++ {
wlInfo := allPendingWorkloads[index]
wls = append(wls, v1alpha1.PendingWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: wlInfo.Name,
Namespace: wlInfo.Namespace,
},
})
pendingWorkloadsInfo := m.queueMgr.PendingWorkloadsInfo(name)
localQueuePositions := make(map[string]int32, 0)

for index := 0; index < int(offset+limit) && index < len(pendingWorkloadsInfo); index++ {
// Update positions in LocalQueue
wlInfo := pendingWorkloadsInfo[index]
queueName := wlInfo.Obj.Spec.QueueName
positionInLocalQueue := localQueuePositions[queueName]
localQueuePositions[queueName]++

if index >= int(offset) {
// Add a workload to results
wls = append(wls, v1alpha1.PendingWorkload{
ObjectMeta: metav1.ObjectMeta{
Name: wlInfo.Obj.Name,
Namespace: wlInfo.Obj.Namespace,
},
PositionInClusterQueue: int32(index),
Priority: *wlInfo.Obj.Spec.Priority,
LocalQueueName: queueName,
PositionInLocalQueue: positionInLocalQueue,
})
}
}
return &v1alpha1.PendingWorkloadsSummary{Items: wls}, nil
}
Expand Down
Loading

0 comments on commit cb1c985

Please sign in to comment.