Skip to content

Commit

Permalink
use a lock instead of a channel semaphore
Browse files Browse the repository at this point in the history
Signed-off-by: Olga Shestopalova <oshestopalova1@gmail.com>
  • Loading branch information
olyazavr committed May 15, 2024
1 parent 957424b commit 6a65e03
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
19 changes: 9 additions & 10 deletions pkg/kubelet/eviction/eviction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ type managerImpl struct {
thresholdsLastUpdated time.Time
// whether can support local storage capacity isolation
localStorageCapacityIsolation bool
// softEvictionSemaphore prevents more than 1 pod from being soft evicted at once
softEvictionSemaphore chan bool
// softEvictionLock prevents more than 1 pod from being soft evicted at once
softEvictionLock *sync.Mutex
}

// ensure it implements the required interface
Expand Down Expand Up @@ -139,7 +139,7 @@ func NewManager(
splitContainerImageFs: nil,
thresholdNotifiers: []ThresholdNotifier{},
localStorageCapacityIsolation: localStorageCapacityIsolation,
softEvictionSemaphore: make(chan bool, 1),
softEvictionLock: &sync.Mutex{},
}
return manager, manager
}
Expand Down Expand Up @@ -412,14 +412,13 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(immediateEvictionGracePeriodSeconds)
var semaphore chan bool
var lock *sync.Mutex
if !isHardEvictionThreshold(thresholdToReclaim) {
semaphore = m.softEvictionSemaphore
if len(m.softEvictionSemaphore) != 0 {
lock = m.softEvictionLock
if !m.softEvictionLock.TryLock() {
klog.InfoS("Eviction manager: soft eviction already in progress, will not soft evict another pod")
return nil, nil
}
m.softEvictionSemaphore <- true
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc, thresholds, observations)
Expand All @@ -432,7 +431,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
Message: message,
}
}
if m.evictPod(pod, gracePeriodOverride, message, annotations, condition, semaphore) {
if m.evictPod(pod, gracePeriodOverride, message, annotations, condition, lock) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}, nil
}
Expand Down Expand Up @@ -603,7 +602,7 @@ func (m *managerImpl) containerEphemeralStorageLimitEviction(podStats statsapi.P
return false
}

func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string, condition *v1.PodCondition, semaphore chan bool) bool {
func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg string, annotations map[string]string, condition *v1.PodCondition, lock *sync.Mutex) bool {
// If the pod is marked as critical and static, and support for critical pod annotations is enabled,
// do not evict such pods. Static pods are not re-admitted after evictions.
// https://github.com/kubernetes/kubernetes/issues/40573 has more details.
Expand All @@ -615,7 +614,7 @@ func (m *managerImpl) evictPod(pod *v1.Pod, gracePeriodOverride int64, evictMsg
m.recorder.AnnotatedEventf(pod, annotations, v1.EventTypeWarning, Reason, evictMsg)
// this is a non-blocking call, it will return right away, semaphore (if non-nil) will be released once the pod is actually killed
klog.V(3).InfoS("Evicting pod", "pod", klog.KObj(pod), "podUID", pod.UID, "message", evictMsg)
err := m.killPodFunc(pod, true, &gracePeriodOverride, semaphore, func(status *v1.PodStatus) {
err := m.killPodFunc(pod, true, &gracePeriodOverride, lock, func(status *v1.PodStatus) {
status.Phase = v1.PodFailed
status.Reason = Reason
status.Message = evictMsg
Expand Down
19 changes: 10 additions & 9 deletions pkg/kubelet/eviction/eviction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/utils/clock"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -59,16 +60,16 @@ type mockPodKiller struct {
}

// killPodNow records the pod that was killed
func (m *mockPodKiller) killPodNow(pod *v1.Pod, evict bool, gracePeriodOverride *int64, semaphore chan bool, statusFn func(*v1.PodStatus)) error {
m.killPodNowNoSemaphoreRelease(pod, evict, gracePeriodOverride, semaphore, statusFn)
if semaphore != nil {
<-semaphore
func (m *mockPodKiller) killPodNow(pod *v1.Pod, evict bool, gracePeriodOverride *int64, lock *sync.Mutex, statusFn func(*v1.PodStatus)) error {
_ = m.killPodNowLongShutdown(pod, evict, gracePeriodOverride, lock, statusFn)
if lock != nil {
lock.Unlock()
}
return nil
}

// killPodNowNoSemaphoreRelease records the pod that was killed, and does not release the semaphore, simulating a long pod shutdown
func (m *mockPodKiller) killPodNowNoSemaphoreRelease(pod *v1.Pod, evict bool, gracePeriodOverride *int64, semaphore chan bool, statusFn func(*v1.PodStatus)) error {
// killPodNowLongShutdown records the pod that was killed, and does not unlock the lock, simulating a long pod shutdown
func (m *mockPodKiller) killPodNowLongShutdown(pod *v1.Pod, evict bool, gracePeriodOverride *int64, lock *sync.Mutex, statusFn func(*v1.PodStatus)) error {
m.pod = pod
m.statusFn = statusFn
m.evict = evict
Expand Down Expand Up @@ -3027,7 +3028,7 @@ func TestHardEvictPodThatHasBeenSoftEvictedOnceHardThresholdReached(t *testing.T
}
summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("600Mi", podStats)}
manager := newManagerImpl(fakeClock, podKiller.killPodNow, config, summaryProvider, nodeRef)
manager.killPodFunc = podKiller.killPodNowNoSemaphoreRelease
manager.killPodFunc = podKiller.killPodNowLongShutdown
fakeClock.Step(1 * time.Minute)

// first run doesn't meet the grace period
Expand Down Expand Up @@ -3086,7 +3087,7 @@ func TestHardEvictPodThatHasBeenSoftEvictedOnceHardThresholdReached(t *testing.T
t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod.Name, podToEvict.Name)
}
observedGracePeriod = *podKiller.gracePeriodOverride
if observedGracePeriod != int64(0) {
if observedGracePeriod != int64(1) {
t.Errorf("Manager chose to kill pod with incorrect grace period. Expected: %d, actual: %d", 0, observedGracePeriod)
}
}
Expand All @@ -3104,6 +3105,6 @@ func newManagerImpl(clock clock.WithTicker, killPodFunc KillPodFuncAsync, config
containerGC: diskGC,
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
thresholdsFirstObservedAt: thresholdsObservedAt{},
softEvictionSemaphore: make(chan bool, 1),
softEvictionLock: &sync.Mutex{},
}
}
5 changes: 3 additions & 2 deletions pkg/kubelet/eviction/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eviction

import (
"context"
"sync"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -107,8 +108,8 @@ type KillPodFunc func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, f
// pod - the pod to kill
// status - the desired status to associate with the pod (i.e. why its killed)
// gracePeriodOverride - the grace period override to use instead of what is on the pod spec
// semaphore - if non-nil, will be released once the kill is completed
type KillPodFuncAsync func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, semaphore chan bool, fn func(*v1.PodStatus)) error
// lock - if non-nil, will be released once the kill is completed
type KillPodFuncAsync func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, lock *sync.Mutex, fn func(*v1.PodStatus)) error

// MirrorPodFunc returns the mirror pod for the given static pod and
// whether it was known to the pod manager.
Expand Down
12 changes: 5 additions & 7 deletions pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,9 +1647,9 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID, status *podSyncStatus

// killPodNowAsync returns a KillPodFunc that can be used to kill a pod.
// It is intended to be injected into other modules that need to kill a pod.
// The kill will happen async, and once it is complete, the semaphore will be drained
// The kill will happen async, and once it is complete, the lock will be unlocked
func killPodNowAsync(podWorkers PodWorkers, recorder record.EventRecorder) eviction.KillPodFuncAsync {
return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, semaphore chan bool, statusFn func(*v1.PodStatus)) error {
return func(pod *v1.Pod, isEvicted bool, gracePeriodOverride *int64, lock *sync.Mutex, statusFn func(*v1.PodStatus)) error {
go func() {
ch := make(chan struct{}, 1)
podWorkers.UpdatePod(UpdatePodOptions{
Expand All @@ -1663,13 +1663,11 @@ func killPodNowAsync(podWorkers PodWorkers, recorder record.EventRecorder) evict
},
})

if semaphore == nil {
if lock == nil {
return
}
select {
case <-ch:
<-semaphore
}
<-ch
lock.Unlock()
}()
return nil
}
Expand Down

0 comments on commit 6a65e03

Please sign in to comment.