Skip to content

Commit

Permalink
koord-descheduler: fix object limiter by move it to reconciler (#2088)
Browse files Browse the repository at this point in the history
Signed-off-by: songtao98 <songtao2603060@gmail.com>
  • Loading branch information
songtao98 committed Jun 17, 2024
1 parent ae25a92 commit 8e25776
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitrator
type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type Arbitrator interface {
Expand Down Expand Up @@ -146,10 +145,6 @@ func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool {
return a.filter.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) {
a.filter.trackEvictedPod(pod)
}

// sort stably sorts jobs, outputs the sorted results and corresponding ranking map.
func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
for _, sortFn := range a.sorts {
Expand Down
94 changes: 0 additions & 94 deletions pkg/descheduler/controllers/migration/arbitrator/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"

gocache "github.com/patrickmn/go-cache"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -60,9 +57,6 @@ type filter struct {

args *deschedulerconfig.MigrationControllerArgs
controllerFinder controllerfinder.Interface
objectLimiters map[types.UID]*rate.Limiter
limiterCache *gocache.Cache
limiterLock sync.Mutex

arbitratedPodMigrationJobs map[types.UID]bool
arbitratedMapLock sync.Mutex
Expand All @@ -83,7 +77,6 @@ func newFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework
if err := f.initFilters(args, handle); err != nil {
return nil, err
}
f.initObjectLimiters()
return f, nil
}

Expand Down Expand Up @@ -129,7 +122,6 @@ func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, ha
return err
}
retriablePodFilters := podutil.WrapFilterFuncs(
f.filterLimitedObject,
f.filterMaxMigratingPerNode,
f.filterMaxMigratingPerNamespace,
f.filterMaxMigratingOrUnavailablePerWorkload,
Expand Down Expand Up @@ -417,92 +409,6 @@ func mergeUnavailableAndMigratingPods(unavailablePods, migratingPods map[types.N
}
}

func (f *filter) trackEvictedPod(pod *corev1.Pod) {
if f.objectLimiters == nil || f.limiterCache == nil {
return
}
ownerRef := metav1.GetControllerOf(pod)
if ownerRef == nil {
return
}

objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Seconds() == 0 {
return
}

var maxMigratingReplicas int
if expectedReplicas, err := f.controllerFinder.GetExpectedScaleForPod(pod); err == nil {
maxMigrating := objectLimiterArgs.MaxMigrating
if maxMigrating == nil {
maxMigrating = f.args.MaxMigratingPerWorkload
}
maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating)
}
if maxMigratingReplicas == 0 {
return
}

f.limiterLock.Lock()
defer f.limiterLock.Unlock()

uid := ownerRef.UID
limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds())
limiter := f.objectLimiters[uid]
if limiter == nil {
limiter = rate.NewLimiter(limit, 1)
f.objectLimiters[uid] = limiter
} else if limiter.Limit() != limit {
limiter.SetLimit(limit)
}

if !limiter.AllowN(f.clock.Now(), 1) {
klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for f period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)
}
f.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration)
}

func (f *filter) filterLimitedObject(pod *corev1.Pod) bool {
if f.objectLimiters == nil || f.limiterCache == nil {
return true
}
objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload]
if !ok || objectLimiterArgs.Duration.Duration == 0 {
return true
}
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()
if limiter := f.objectLimiters[ownerRef.UID]; limiter != nil {
if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 {
klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject",
"owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion))
return false
}
}
}
return true
}

func (f *filter) initObjectLimiters() {
var trackExpiration time.Duration
for _, v := range f.args.ObjectLimiters {
if v.Duration.Duration > trackExpiration {
trackExpiration = v.Duration.Duration
}
}
if trackExpiration > 0 {
f.objectLimiters = make(map[types.UID]*rate.Limiter)
limiterExpiration := trackExpiration + trackExpiration/2
f.limiterCache = gocache.New(limiterExpiration, limiterExpiration)
f.limiterCache.OnEvicted(func(s string, _ interface{}) {
f.limiterLock.Lock()
defer f.limiterLock.Unlock()
delete(f.objectLimiters, types.UID(s))
})
}
}

func (f *filter) checkJobPassedArbitration(uid types.UID) bool {
f.arbitratedMapLock.Lock()
defer f.arbitratedMapLock.Unlock()
Expand Down
150 changes: 0 additions & 150 deletions pkg/descheduler/controllers/migration/arbitrator/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/v1alpha2"
)

func TestFilterExistingMigrationJob(t *testing.T) {
Expand Down Expand Up @@ -985,154 +983,6 @@ func TestFilterExpectedReplicas(t *testing.T) {
}
}

func TestFilterObjectLimiter(t *testing.T) {
ownerReferences1 := []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Controller: pointer.Bool(true),
Kind: "StatefulSet",
Name: "test-1",
UID: uuid.NewUUID(),
},
}
otherOwnerReferences := metav1.OwnerReference{
APIVersion: "apps/v1",
Controller: pointer.Bool(true),
Kind: "StatefulSet",
Name: "test-2",
UID: uuid.NewUUID(),
}
testObjectLimiters := config.ObjectLimiterMap{
config.MigrationLimitObjectWorkload: {
Duration: metav1.Duration{Duration: 1 * time.Second},
MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10},
},
}

tests := []struct {
name string
objectLimiters config.ObjectLimiterMap
totalReplicas int32
sleepDuration time.Duration
pod *corev1.Pod
evictedPodsCount int
evictedWorkload *metav1.OwnerReference
want bool
}{
{
name: "less than default maxMigrating",
totalReplicas: 100,
objectLimiters: testObjectLimiters,
sleepDuration: 100 * time.Millisecond,
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: ownerReferences1,
},
},
evictedPodsCount: 6,
want: true,
},
{
name: "exceeded default maxMigrating",
totalReplicas: 100,
objectLimiters: testObjectLimiters,
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: ownerReferences1,
},
},
evictedPodsCount: 11,
want: false,
},
{
name: "other than workload",
totalReplicas: 100,
objectLimiters: testObjectLimiters,
sleepDuration: 100 * time.Millisecond,
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: ownerReferences1,
},
},
evictedPodsCount: 11,
evictedWorkload: &otherOwnerReferences,
want: true,
},
{
name: "disable objectLimiters",
totalReplicas: 100,
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: ownerReferences1,
},
},
evictedPodsCount: 11,
objectLimiters: config.ObjectLimiterMap{
config.MigrationLimitObjectWorkload: config.MigrationObjectLimiter{
Duration: metav1.Duration{Duration: 0},
},
},
want: true,
},
{
name: "default limiter",
totalReplicas: 100,
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
OwnerReferences: ownerReferences1,
},
},
evictedPodsCount: 1,
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheme := runtime.NewScheme()
_ = v1alpha1.AddToScheme(scheme)
_ = clientgoscheme.AddToScheme(scheme)
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()

var v1beta2args v1alpha2.MigrationControllerArgs
v1alpha2.SetDefaults_MigrationControllerArgs(&v1beta2args)
var args config.MigrationControllerArgs
err := v1alpha2.Convert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerArgs(&v1beta2args, &args, nil)
if err != nil {
panic(err)
}
a := filter{client: fakeClient, args: &args, clock: clock.RealClock{}}

controllerFinder := &fakeControllerFinder{}
if tt.objectLimiters != nil {
a.args.ObjectLimiters = tt.objectLimiters
}

a.initObjectLimiters()
if tt.totalReplicas > 0 {
controllerFinder.replicas = tt.totalReplicas
}
a.controllerFinder = controllerFinder
if tt.evictedPodsCount > 0 {
for i := 0; i < tt.evictedPodsCount; i++ {
pod := tt.pod.DeepCopy()
if tt.evictedWorkload != nil {
pod.OwnerReferences = []metav1.OwnerReference{
*tt.evictedWorkload,
}
}
a.trackEvictedPod(pod)
if tt.sleepDuration > 0 {
time.Sleep(tt.sleepDuration)
}
}
}
got := a.filterLimitedObject(tt.pod)
assert.Equal(t, tt.want, got)
})
}
}

func TestArbitratedMap(t *testing.T) {
f := filter{
arbitratedPodMigrationJobs: map[types.UID]bool{},
Expand Down
Loading

0 comments on commit 8e25776

Please sign in to comment.