Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #37293 #37379 upstream release 1.4 #37938

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions plugin/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ func (s *Scheduler) scheduleOne() {
assumed.Spec.NodeName = dest
if err := s.config.SchedulerCache.AssumePod(&assumed); err != nil {
glog.Errorf("scheduler cache AssumePod failed: %v", err)
// TODO: This means that a given pod is already in cache (which means it
// is either assumed or already added). This is most probably result of a
// BUG in retrying logic. As a temporary workaround (which doesn't fully
// fix the problem, but should reduce its impact), we simply return here,
// as binding doesn't make sense anyway.
// This should be fixed properly though.
return
}

go func() {
Expand Down
100 changes: 61 additions & 39 deletions plugin/pkg/scheduler/schedulercache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (cache *schedulerCache) AssumePod(pod *api.Pod) error {

// assumePod exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePod(pod *api.Pod, now time.Time) error {
cache.mu.Lock()
defer cache.mu.Unlock()

key, err := getPodKey(pod)
if err != nil {
return err
}

cache.mu.Lock()
defer cache.mu.Unlock()
if _, ok := cache.podStates[key]; ok {
return fmt.Errorf("pod state wasn't initial but get assumed. Pod key: %v", key)
return fmt.Errorf("pod %v state wasn't initial but get assumed", key)
}

cache.addPod(pod)
Expand All @@ -141,7 +141,11 @@ func (cache *schedulerCache) ForgetPod(pod *api.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()

_, ok := cache.podStates[key]
currState, ok := cache.podStates[key]
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
return fmt.Errorf("pod %v state was assumed on a different node", key)
}

switch {
// Only assumed pod can be forgotten.
case ok && cache.assumedPods[key]:
Expand All @@ -152,7 +156,38 @@ func (cache *schedulerCache) ForgetPod(pod *api.Pod) error {
delete(cache.assumedPods, key)
delete(cache.podStates, key)
default:
return fmt.Errorf("pod state wasn't assumed but get forgotten. Pod key: %v", key)
return fmt.Errorf("pod %v state wasn't assumed but get forgotten", key)
}
return nil
}

// Assumes that lock is already acquired.
func (cache *schedulerCache) addPod(pod *api.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
}

// Assumes that lock is already acquired.
func (cache *schedulerCache) updatePod(oldPod, newPod *api.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
}

// Assumes that lock is already acquired.
func (cache *schedulerCache) removePod(pod *api.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.removePod(pod); err != nil {
return err
}
if len(n.pods) == 0 && n.node == nil {
delete(cache.nodes, pod.Spec.NodeName)
}
return nil
}
Expand All @@ -166,9 +201,16 @@ func (cache *schedulerCache) AddPod(pod *api.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()

_, ok := cache.podStates[key]
currState, ok := cache.podStates[key]
switch {
case ok && cache.assumedPods[key]:
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
// The pod was added to a different node than it was assumed to.
glog.Warningf("Pod %v assumed to a different node than added to.", key)
// Clean this up.
cache.removePod(currState.pod)
cache.addPod(pod)
}
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
case !ok:
Expand All @@ -193,44 +235,20 @@ func (cache *schedulerCache) UpdatePod(oldPod, newPod *api.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()

_, ok := cache.podStates[key]
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Update/Remove event. It needs to have Add event
// before Update event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
glog.Errorf("Pod %v updated on a different node than previously added to.", key)
glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
if err := cache.updatePod(oldPod, newPod); err != nil {
return err
}
default:
return fmt.Errorf("pod state wasn't added but get updated. Pod key: %v", key)
}
return nil
}

func (cache *schedulerCache) updatePod(oldPod, newPod *api.Pod) error {
if err := cache.removePod(oldPod); err != nil {
return err
}
cache.addPod(newPod)
return nil
}

func (cache *schedulerCache) addPod(pod *api.Pod) {
n, ok := cache.nodes[pod.Spec.NodeName]
if !ok {
n = NewNodeInfo()
cache.nodes[pod.Spec.NodeName] = n
}
n.addPod(pod)
}

func (cache *schedulerCache) removePod(pod *api.Pod) error {
n := cache.nodes[pod.Spec.NodeName]
if err := n.removePod(pod); err != nil {
return err
}
if len(n.pods) == 0 && n.node == nil {
delete(cache.nodes, pod.Spec.NodeName)
return fmt.Errorf("pod %v state wasn't added but get updated", key)
}
return nil
}
Expand All @@ -244,12 +262,16 @@ func (cache *schedulerCache) RemovePod(pod *api.Pod) error {
cache.mu.Lock()
defer cache.mu.Unlock()

cachedstate, ok := cache.podStates[key]
currState, ok := cache.podStates[key]
switch {
// An assumed pod won't have Delete/Remove event. It needs to have Add event
// before Remove event, in which case the state would change from Assumed to Added.
case ok && !cache.assumedPods[key]:
err := cache.removePod(cachedstate.pod)
if currState.pod.Spec.NodeName != pod.Spec.NodeName {
glog.Errorf("Pod %v removed from a different node than previously added to.", key)
glog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
}
err := cache.removePod(currState.pod)
if err != nil {
return err
}
Expand Down