Skip to content
Permalink
Browse files

revert cache to original state on evict and bind errors

This change ensures that node, task and job info will remain unchanged in case of an error during SchedulerCache.Bind and SchedulerCache.Evict calls.
Before if error occurred during binding phase (e.g. "Selected node NotReady") task could get stuck in the Binding status indefinitely while the real pod would be in the Pending status.

- SchedulerCache.Evict and SchedulerCache.Bind will revert task status on NodeInfo.UpdateTask and NodeInfo.AddTask errors.
- Modified behavior of NodeInfo.AddTask. AddTask will now update task's node name upon successful addition, this is similar to how JobInfo.UpdateTaskStatus updates task's status.
- Handling unchecked error in JobInfo.UpdateTaskStatus.
- FATAL logging in NodeInfo.UpdateTask when impossible situation happens - failing to add a task after removal of a task from node info.

Might be related to #891
  • Loading branch information
mateuszlitwin committed Nov 20, 2019
1 parent 791c395 commit 272857bed63ac8ea6fe9665b066f301a3f41cc6e
@@ -229,7 +229,7 @@ func (ji *JobInfo) addTaskIndex(ti *TaskInfo) {
ji.TaskStatusIndex[ti.Status][ti.UID] = ti
}

// AddTaskInfo is used to add a task to a job
// AddTaskInfo is used to add a task to a job.
func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
ji.Tasks[ti.UID] = ti
ji.addTaskIndex(ti)
@@ -241,16 +241,22 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
}
}

// UpdateTaskStatus is used to update task's status in a job
// UpdateTaskStatus is used to update task's status in a job.
//
// If error occurs both task and job are guaranteed to be in the original state.
func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
if err := validateStatusUpdate(task.Status, status); err != nil {
return err
}

// Remove the task from the task list firstly
ji.DeleteTaskInfo(task)
// First remove the task (if exist) from the task list.
if _, found := ji.Tasks[task.UID]; found {
if err := ji.DeleteTaskInfo(task); err != nil {
return err
}
}

// Update task's status to the target status
// Update task's status to the target status once task addition is guaranteed to succeed.
task.Status = status
ji.AddTaskInfo(task)

@@ -268,6 +274,8 @@ func (ji *JobInfo) deleteTaskIndex(ti *TaskInfo) {
}

// DeleteTaskInfo is used to delete a task from a job
//
// If error occurs both task and job are guaranteed to be in the original state.
func (ji *JobInfo) DeleteTaskInfo(ti *TaskInfo) error {
if task, found := ji.Tasks[ti.UID]; found {
ji.TotalRequest.Sub(task.Resreq)
@@ -171,7 +171,14 @@ func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error {
}

// AddTask is used to add a task in nodeInfo object
//
// If error occurs both task and node are guaranteed to be in the original state.
func (ni *NodeInfo) AddTask(task *TaskInfo) error {
if len(task.NodeName) > 0 && len(ni.Name) > 0 && task.NodeName != ni.Name {
return fmt.Errorf("task <%v/%v> already on different node <%v>",
task.Namespace, task.Name, task.NodeName)
}

key := PodKey(task.Pod)
if _, found := ni.Tasks[key]; found {
return fmt.Errorf("task <%v/%v> already on node <%v>",
@@ -200,12 +207,17 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
ni.Used.Add(ti.Resreq)
}

// Update task node name upon successful task addition.
task.NodeName = ni.Name
ti.NodeName = ni.Name
ni.Tasks[key] = ti

return nil
}

// RemoveTask used to remove a task from nodeInfo object
// RemoveTask used to remove a task from nodeInfo object.
//
// If error occurs both task and node are guaranteed to be in the original state.
func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
key := PodKey(ti.Pod)

@@ -234,13 +246,20 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
return nil
}

// UpdateTask is used to update a task in nodeInfo object
// UpdateTask is used to update a task in nodeInfo object.
//
// If error occurs both task and node are guaranteed to be in the original state.
func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {
if err := ni.RemoveTask(ti); err != nil {
return err
}

return ni.AddTask(ti)
if err := ni.AddTask(ti); err != nil {
// This should never happen if task removal was successful,
// because only possible error during task addition is when task is still on a node.
glog.Fatalf("Failed to add Task <%s,%s> to Node <%s> during task update",
ti.Namespace, ti.Name, ni.Name)
}
return nil
}

// String returns nodeInfo details in string format
@@ -417,13 +417,14 @@ func (sc *SchedulerCache) findJobAndTask(taskInfo *kbapi.TaskInfo) (*kbapi.JobIn
return job, task, nil
}

// Evict will evict the pod
// Evict will evict the pod.
//
// If error occurs both task and job are guaranteed to be in the original state.
func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

job, task, err := sc.findJobAndTask(taskInfo)

if err != nil {
return err
}
@@ -434,13 +435,21 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
task.UID, task.NodeName)
}

err = job.UpdateTaskStatus(task, kbapi.Releasing)
if err != nil {
originalStatus := task.Status
if err := job.UpdateTaskStatus(task, kbapi.Releasing); err != nil {
return err
}

// Add new task to node.
if err := node.UpdateTask(task); err != nil {
// After failing to update task to a node we need to revert task status from Releasing,
// otherwise task might be stuck in the Releasing state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
glog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to update Task on Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}

@@ -482,7 +491,6 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
defer sc.Mutex.Unlock()

job, task, err := sc.findJobAndTask(taskInfo)

if err != nil {
return err
}
@@ -493,16 +501,21 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
task.UID, hostname)
}

err = job.UpdateTaskStatus(task, kbapi.Binding)
if err != nil {
originalStatus := task.Status
if err := job.UpdateTaskStatus(task, kbapi.Binding); err != nil {
return err
}

// Set `.nodeName` to the hostname
task.NodeName = hostname

// Add task to the node.
if err := node.AddTask(task); err != nil {
// After failing to add task to a node we need to revert task status from Binding,
// otherwise task might be stuck in the Binding state indefinitely.
if err := job.UpdateTaskStatus(task, originalStatus); err != nil {
glog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
"from %s to %s after failing to add Task to Node <%s>: %v",
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
sc.resyncTask(task)
}
return err
}

@@ -206,8 +206,6 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {
return fmt.Errorf("failed to find job %s when binding", task.Job)
}

task.NodeName = hostname

if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
@@ -253,8 +251,6 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
return fmt.Errorf("failed to find job %s", task.Job)
}

task.NodeName = hostname

if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
@@ -123,8 +123,6 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
task.Job, s.ssn.UID)
}

task.NodeName = hostname

if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v",

0 comments on commit 272857b

Please sign in to comment.
You can’t perform that action at this time.