Skip to content

Commit

Permalink
fix reservation on mutil-scheduler (#431)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Aug 2, 2022
1 parent a89cd98 commit ecead7c
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 19 deletions.
39 changes: 20 additions & 19 deletions pkg/scheduler/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/profile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand Down Expand Up @@ -66,12 +67,11 @@ func AddScheduleEventHandler(sched *scheduler.Scheduler, internalHandler Schedul
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *schedulingv1alpha1.Reservation:
// scheduler is always responsible for schedulingv1alpha1.reservation object
return !reservation.IsReservationScheduled(t) && !reservation.IsReservationFailed(t)
return !reservation.IsReservationScheduled(t) && !reservation.IsReservationFailed(t) && isResponsibleForReservation(sched.Profiles, t)
case cache.DeletedFinalStateUnknown:
if _, ok := t.Obj.(*schedulingv1alpha1.Reservation); ok {
if r, ok := t.Obj.(*schedulingv1alpha1.Reservation); ok {
// DeletedFinalStateUnknown object can be stale, so just try to cleanup without check.
return true
return isResponsibleForReservation(sched.Profiles, r)
}
klog.Errorf("unable to convert object %T to *schedulingv1alpha1.Reservation in %T", t.Obj, sched)
return false
Expand Down Expand Up @@ -304,28 +304,29 @@ func handleExpiredReservation(sched *scheduler.Scheduler, internalHandler Schedu
return
}
reservePod := reservation.NewReservePod(r)
if len(reservation.GetReservationNodeName(r)) > 0 {
err := internalHandler.GetCache().RemovePod(reservePod)

// in case the pod has expired before scheduling cache initialized, or the pod just finished scheduling cycle and
// deleted, both we need to check if pod is cached
_, err = internalHandler.GetCache().GetPod(reservePod)
if err == nil {
err = internalHandler.GetCache().RemovePod(reservePod)
if err != nil {
klog.Errorf("failed to remove reserve pod in scheduler cache, reservation %v, err: %v", klog.KObj(r), err)
klog.Errorf("failed to remove expired reserve pod in scheduler cache, reservation %v, err: %s",
klog.KObj(r), err)
}
internalHandler.MoveAllToActiveOrBackoffQueue(assignedPodDelete)
} else { // otherwise, try dequeue the reserve pod from the scheduling queue
// in case the pod just finished scheduling cycle and deleted, also check if pod is cached
_, err := internalHandler.GetCache().GetPod(reservePod)
if err == nil {
klog.V(5).InfoS("reserve pod is just scheduled and deleted, remove it in cache", "reservation", klog.KObj(r))
err = internalHandler.GetCache().RemovePod(reservePod)
if err != nil {
klog.Errorf("failed to remove reserve pod in scheduler cache, reservation %v, err: %v", klog.KObj(r), err)
}
internalHandler.MoveAllToActiveOrBackoffQueue(assignedPodDelete)
}
}

if len(reservation.GetReservationNodeName(r)) <= 0 {
// pod is unscheduled, try dequeue the reserve pod from the scheduling queue
err = internalHandler.GetQueue().Delete(reservePod)
if err != nil {
klog.Errorf("failed to delete reserve pod in scheduling queue, reservation %v, err: %v", klog.KObj(r), err)
klog.Errorf("failed to delete expired reserve pod in scheduling queue, reservation %v, err: %v", klog.KObj(r), err)
}
}
klog.V(4).InfoS("handle expired reservation", "reservation", klog.KObj(r), "phase", r.Status.Phase)
}

func isResponsibleForReservation(profiles profile.Map, r *schedulingv1alpha1.Reservation) bool {
return profiles.HandlesSchedulerName(reservation.GetReservationSchedulerName(r))
}
101 changes: 101 additions & 0 deletions pkg/scheduler/eventhandlers/reservation_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/profile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake"
Expand Down Expand Up @@ -440,6 +443,50 @@ func Test_updateReservationInSchedulingQueue(t *testing.T) {
},
},
},
{
name: "update new reservation successfully",
args: args{
internalHandler: &fakeSchedulerInternalHandler{},
oldObj: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "r-0",
UID: "456",
ResourceVersion: "0",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod-0",
},
},
},
Expires: &metav1.Time{Time: now.Add(30 * time.Minute)},
},
},
newObj: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "r-0",
UID: "456",
ResourceVersion: "1",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{},
Owners: []schedulingv1alpha1.ReservationOwner{
{
Object: &corev1.ObjectReference{
Kind: "Pod",
Name: "pod-0",
},
},
},
Expires: &metav1.Time{Time: now.Add(30 * time.Minute)},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -597,3 +644,57 @@ func Test_handleExpiredReservation(t *testing.T) {
})
}
}

var _ framework.Framework = &fakeFramework{}

type fakeFramework struct {
framework.Framework
}

func Test_isResponsibleForReservation(t *testing.T) {
type args struct {
profiles profile.Map
r *schedulingv1alpha1.Reservation
}
tests := []struct {
name string
args args
want bool
}{
{
name: "not responsible when profile is empty",
args: args{
profiles: profile.Map{},
r: &schedulingv1alpha1.Reservation{},
},
want: false,
},
{
name: "responsible when scheduler name matched the profile",
args: args{
profiles: profile.Map{
"test-scheduler": &fakeFramework{},
},
r: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reserve-sample",
},
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SchedulerName: "test-scheduler",
},
},
},
},
},
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := isResponsibleForReservation(tt.args.profiles, tt.args.r)
assert.Equal(t, tt.want, got)
})
}
}
7 changes: 7 additions & 0 deletions pkg/scheduler/plugins/reservation/rpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,10 @@ func GetReservePodNodeName(pod *corev1.Pod) string {
func GetReservationNameFromReservePod(pod *corev1.Pod) string {
return pod.Annotations[AnnotationReservationName]
}

func GetReservationSchedulerName(r *schedulingv1alpha1.Reservation) string {
if r == nil || r.Spec.Template == nil || len(r.Spec.Template.Spec.SchedulerName) <= 0 {
return corev1.DefaultSchedulerName
}
return r.Spec.Template.Spec.SchedulerName
}
47 changes: 47 additions & 0 deletions pkg/scheduler/plugins/reservation/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,50 @@ func Test_matchReservationOwners(t *testing.T) {
})
}
}

func TestGetReservationSchedulerName(t *testing.T) {
tests := []struct {
name string
arg *schedulingv1alpha1.Reservation
want string
}{
{
name: "empty reservation",
arg: nil,
want: corev1.DefaultSchedulerName,
},
{
name: "empty template",
arg: &schedulingv1alpha1.Reservation{},
want: corev1.DefaultSchedulerName,
},
{
name: "empty scheduler name",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{},
},
},
want: corev1.DefaultSchedulerName,
},
{
name: "get scheduler name successfully",
arg: &schedulingv1alpha1.Reservation{
Spec: schedulingv1alpha1.ReservationSpec{
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
SchedulerName: "test-scheduler",
},
},
},
},
want: "test-scheduler",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := GetReservationSchedulerName(tt.arg)
assert.Equal(t, tt.want, got)
})
}
}

0 comments on commit ecead7c

Please sign in to comment.