Skip to content

Commit

Permalink
fix reservation on pod patch failed (#428)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Aug 2, 2022
1 parent 82dc2ac commit 9e8fc01
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 33 deletions.
9 changes: 5 additions & 4 deletions apis/extension/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)
Expand Down Expand Up @@ -52,8 +53,8 @@ func GetCustomUsageThresholds(node *corev1.Node) (*CustomUsageThresholds, error)
}

type ReservationAllocated struct {
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
Name string `json:"name,omitempty"`
UID types.UID `json:"uid,omitempty"`
}

func GetReservationAllocated(pod *corev1.Pod) (*ReservationAllocated, error) {
Expand All @@ -77,8 +78,8 @@ func SetReservationAllocated(pod *corev1.Pod, r *schedulingv1alpha1.Reservation)
pod.Annotations = map[string]string{}
}
reservationAllocated := &ReservationAllocated{
Namespace: r.Namespace,
Name: r.Name,
Name: r.Name,
UID: r.UID,
}
data, _ := json.Marshal(reservationAllocated) // assert no error
pod.Annotations[AnnotationReservationAllocated] = string(data)
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func updateReservationInCache(sched *scheduler.Scheduler, internalHandler Schedu
}

// nodeName update of the same reservations is not allowed and may corrupt the cache
if oldR.Status.NodeName != newR.Status.NodeName {
klog.Errorf("updateReservationInCache failed, update on status.nodeName is forbidden, old %s, new %s",
oldR.Status.NodeName, newR.Status.NodeName)
if reservation.GetReservationNodeName(oldR) != reservation.GetReservationNodeName(newR) {
klog.Errorf("updateReservationInCache failed, update on existing nodeName is forbidden, old %s, new %s",
reservation.GetReservationNodeName(oldR), reservation.GetReservationNodeName(newR))
return
}

Expand Down Expand Up @@ -304,7 +304,7 @@ func handleExpiredReservation(sched *scheduler.Scheduler, internalHandler Schedu
return
}
reservePod := reservation.NewReservePod(r)
if len(r.Status.NodeName) > 0 {
if len(reservation.GetReservationNodeName(r)) > 0 {
err := internalHandler.GetCache().RemovePod(reservePod)
if err != nil {
klog.Errorf("failed to remove reserve pod in scheduler cache, reservation %v, err: %v", klog.KObj(r), err)
Expand Down
12 changes: 9 additions & 3 deletions pkg/scheduler/plugins/reservation/garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,20 @@ func (p *Plugin) syncPodDeleted(pod *corev1.Pod) {
// assert pod != nil
reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err != nil {
klog.V(3).InfoS("failed to get reservation allocation info of the pod",
klog.V(3).InfoS("failed to parse reservation allocation info of the pod",
"pod", klog.KObj(pod), "err", err)
return
}
// pod does not allocate any reservation
// Most pods have no reservation allocated.
if reservationAllocated == nil {
return
}

// pod has allocated reservation, should remove allocation info in the reservation
err = retryOnConflictOrTooManyRequests(func() error {
r, err1 := p.lister.Get(reservationAllocated.Name)
if errors.IsNotFound(err1) {
klog.V(4).InfoS("skip sync for reservation not found", "reservation", klog.KObj(r))
klog.V(5).InfoS("skip sync for reservation not found", "reservation", klog.KObj(r))
return nil
} else if err1 != nil {
klog.V(4).InfoS("failed to get reservation",
Expand All @@ -149,6 +150,11 @@ func (p *Plugin) syncPodDeleted(pod *corev1.Pod) {
"reservation", klog.KObj(r))
return nil
}
// got different versions of the reservation; still check if the reservation was allocated by this pod
if r.UID != reservationAllocated.UID {
klog.V(4).InfoS("failed to get original reservation, got reservation with a different UID",
"reservation", reservationAllocated.Name, "old UID", reservationAllocated.UID, "current UID", r.UID)
}
err1 = removeReservationAllocated(r, pod)
if err1 != nil {
klog.V(4).InfoS("failed to remove reservation allocated",
Expand Down
18 changes: 17 additions & 1 deletion pkg/scheduler/plugins/reservation/garbage_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ func Test_syncPodDeleted(t *testing.T) {
Annotations: map[string]string{
apiext.AnnotationReservationAllocated: `
{
"Name": "test-reserve-0"
"name": "test-reserve-0",
"uid": "aaabbbccc"
}
`,
},
Expand All @@ -493,6 +494,7 @@ func Test_syncPodDeleted(t *testing.T) {
testReservation := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reserve-0",
UID: "aaabbbccc",
},
Spec: schedulingv1alpha1.ReservationSpec{
Expires: &metav1.Time{Time: now.Add(10 * time.Hour)},
Expand Down Expand Up @@ -525,6 +527,8 @@ func Test_syncPodDeleted(t *testing.T) {
NodeName: "test-node-0",
},
}
testReservationDifferent := testReservation.DeepCopy()
testReservationDifferent.UID = "xxxyyyzzz"
type fields struct {
lister *fakeReservationLister
client *fakeReservationClient
Expand Down Expand Up @@ -599,6 +603,18 @@ func Test_syncPodDeleted(t *testing.T) {
client: &fakeReservationClient{},
},
},
{
name: "get different versions of the reservation",
arg: testPod,
fields: fields{
lister: &fakeReservationLister{
reservations: map[string]*schedulingv1alpha1.Reservation{
testReservationDifferent.Name: testReservationDifferent,
},
},
client: &fakeReservationClient{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/scheduler/plugins/reservation/rcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newAvailableCache(rList ...*schedulingv1alpha1.Reservation) *AvailableCache
}
meta := newReservationInfo(r)
a.reservations[GetReservationKey(r)] = meta
nodeName := getReservationNodeName(r)
nodeName := GetReservationNodeName(r)
a.nodeToR[nodeName] = append(a.nodeToR[nodeName], meta)
}
return a
Expand All @@ -118,18 +118,18 @@ func (a *AvailableCache) Add(r *schedulingv1alpha1.Reservation) {
defer a.lock.Unlock()
meta := newReservationInfo(r)
a.reservations[GetReservationKey(r)] = meta
nodeName := getReservationNodeName(r)
nodeName := GetReservationNodeName(r)
a.nodeToR[nodeName] = append(a.nodeToR[nodeName], meta)
}

func (a *AvailableCache) Delete(r *schedulingv1alpha1.Reservation) {
a.lock.Lock()
defer a.lock.Unlock()
if r == nil || len(r.Status.NodeName) <= 0 {
if r == nil || len(GetReservationNodeName(r)) <= 0 {
return
}
delete(a.reservations, GetReservationKey(r))
nodeName := getReservationNodeName(r)
nodeName := GetReservationNodeName(r)
rOnNode := a.nodeToR[nodeName]
for i, rInfo := range rOnNode {
if rInfo.Reservation.Name == r.Name && rInfo.Reservation.Namespace == r.Namespace {
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/plugins/reservation/rpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod {
reservePod.Annotations[AnnotationReservePod] = "true"
reservePod.Annotations[AnnotationReservationName] = r.Name // for search inversely

// annotate node name
// annotate node name specified
if len(reservePod.Spec.NodeName) > 0 {
// if the reservation specifies a nodeName, annotate it and cleanup spec.nodeName for other plugins not
// processing the nodeName before binding
reservePod.Annotations[AnnotationReservationNode] = reservePod.Spec.NodeName
reservePod.Spec.NodeName = ""
}
// use reservation status.nodeName as the real scheduled result
if len(r.Status.NodeName) > 0 {
reservePod.Spec.NodeName = r.Status.NodeName
if nodeName := GetReservationNodeName(r); len(nodeName) > 0 {
reservePod.Spec.NodeName = nodeName
}

return reservePod
Expand Down
43 changes: 29 additions & 14 deletions pkg/scheduler/plugins/reservation/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func StatusNodeNameIndexFunc(obj interface{}) ([]string, error) {

// IsReservationActive checks if the reservation is scheduled and its status is Available/Waiting.
func IsReservationActive(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(r.Status.NodeName) > 0 &&
return r != nil && len(GetReservationNodeName(r)) > 0 &&
(r.Status.Phase == schedulingv1alpha1.ReservationAvailable || r.Status.Phase == schedulingv1alpha1.ReservationWaiting)
}

// IsReservationScheduled checks if the reservation is scheduled on a node and its status is Available.
func IsReservationScheduled(r *schedulingv1alpha1.Reservation) bool {
return r != nil && len(r.Status.NodeName) > 0 && r.Status.Phase == schedulingv1alpha1.ReservationAvailable
return r != nil && len(GetReservationNodeName(r)) > 0 && r.Status.Phase == schedulingv1alpha1.ReservationAvailable
}

func IsReservationFailed(r *schedulingv1alpha1.Reservation) bool {
Expand All @@ -81,6 +81,14 @@ func IsReservationExpired(r *schedulingv1alpha1.Reservation) bool {
return false
}

func GetReservationNodeName(r *schedulingv1alpha1.Reservation) string {
return r.Status.NodeName
}

func SetReservationNodeName(r *schedulingv1alpha1.Reservation, nodeName string) {
r.Status.NodeName = nodeName
}

func ValidateReservation(r *schedulingv1alpha1.Reservation) error {
if r == nil || r.Spec.Template == nil {
return fmt.Errorf("the reservation misses the template spec")
Expand Down Expand Up @@ -119,10 +127,8 @@ func isReservationNeedCleanup(r *schedulingv1alpha1.Reservation) bool {
}

func setReservationAvailable(r *schedulingv1alpha1.Reservation, nodeName string) {
// TBD: how to annotate nodeName
r.Spec.Template.Spec.NodeName = nodeName

r.Status.NodeName = nodeName
// just annotate scheduled node at status
SetReservationNodeName(r, nodeName)
r.Status.Phase = schedulingv1alpha1.ReservationAvailable
r.Status.CurrentOwners = make([]corev1.ObjectReference, 0)

Expand Down Expand Up @@ -220,12 +226,25 @@ func setReservationUnschedulable(r *schedulingv1alpha1.Reservation, msg string)
}

func setReservationAllocated(r *schedulingv1alpha1.Reservation, pod *corev1.Pod) {
r.Status.CurrentOwners = append(r.Status.CurrentOwners, getPodOwner(pod))
owner := getPodOwner(pod)
requests, _ := resourceapi.PodRequestsAndLimits(pod)
if r.Status.Allocated == nil {
r.Status.Allocated = requests
// avoid duplication (it happens if pod allocated annotation was missing)
idx := -1
for i, current := range r.Status.CurrentOwners {
if matchObjectRef(pod, &current) {
idx = i
}
}
if idx < 0 {
r.Status.CurrentOwners = append(r.Status.CurrentOwners, owner)
if r.Status.Allocated == nil {
r.Status.Allocated = requests
} else {
r.Status.Allocated = quotav1.Add(r.Status.Allocated, requests)
}
} else {
r.Status.Allocated = quotav1.Add(r.Status.Allocated, requests)
// keep old allocated
r.Status.CurrentOwners[idx] = owner
}
}

Expand Down Expand Up @@ -258,10 +277,6 @@ func getReservationRequests(r *schedulingv1alpha1.Reservation) corev1.ResourceLi
return requests
}

func getReservationNodeName(r *schedulingv1alpha1.Reservation) string {
return r.Status.NodeName
}

func getUnschedulableMessage(filteredNodeStatusMap framework.NodeToStatusMap) string {
var msg strings.Builder
// format: [node=xxx msg=yyy failedPlugin=zzz]
Expand Down

0 comments on commit 9e8fc01

Please sign in to comment.