Skip to content

Commit

Permalink
PipelineRunState cleanup: tests + comments
Browse files Browse the repository at this point in the history
This commit cleans up PipelineRunState logic in preparation for a subsequent commit affecting retries.
It adds tests for DAGExecutionQueue and ensures that that function does not return any results if the PipelineRun is cancelled,
since we do not want to continue to schedule tasks if the PipelineRun is cancelled. (This is already documented and implemented elsewhere.)
It also fixes comments to correctly describe the behavior of GetFinalTasks
No functional changes.
  • Loading branch information
lbernick authored and tekton-robot committed Mar 21, 2022
1 parent 2c025fb commit e055233
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 19 deletions.
23 changes: 14 additions & 9 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Expand Up @@ -290,22 +290,28 @@ func (facts *PipelineRunFacts) IsRunning() bool {
return false
}

// IsGracefullyCancelled returns true if the PipelineRun won't be scheduling any new Task because it was gracefully cancelled
// IsCancelled returns true if the PipelineRun was cancelled
func (facts *PipelineRunFacts) IsCancelled() bool {
return facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelledDeprecated ||
facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelled
}

// IsGracefullyCancelled returns true if the PipelineRun was gracefully cancelled
func (facts *PipelineRunFacts) IsGracefullyCancelled() bool {
return facts.SpecStatus == v1beta1.PipelineRunSpecStatusCancelledRunFinally
}

// IsGracefullyStopped returns true if the PipelineRun won't be scheduling any new Task because it was gracefully stopped
// IsGracefullyStopped returns true if the PipelineRun was gracefully stopped
func (facts *PipelineRunFacts) IsGracefullyStopped() bool {
return facts.SpecStatus == v1beta1.PipelineRunSpecStatusStoppedRunFinally
}

// DAGExecutionQueue returns a list of DAG tasks which needs to be scheduled next
func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) {
tasks := PipelineRunState{}
// when pipeline run is stopping, gracefully cancelled or stopped, do not schedule any new task and only
var tasks PipelineRunState
// when pipeline run is stopping, cancelled, gracefully cancelled or stopped, do not schedule any new task and only
// wait for all running tasks to complete and report their status
if !facts.IsStopping() && !facts.IsGracefullyCancelled() && !facts.IsGracefullyStopped() {
if !facts.IsStopping() && !facts.IsCancelled() && !facts.IsGracefullyCancelled() && !facts.IsGracefullyStopped() {
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(facts.TasksGraph, facts.successfulOrSkippedDAGTasks()...)
Expand All @@ -318,13 +324,12 @@ func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) {
}

// GetFinalTasks returns a list of final tasks without any taskRun associated with it
// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or skipped or
// any one DAG task resulted in failure
// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or have been skipped
func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState {
tasks := PipelineRunState{}
finalCandidates := sets.NewString()
// check either pipeline has finished executing all DAG pipelineTasks
// or any one of the DAG pipelineTask has failed
// check either pipeline has finished executing all DAG pipelineTasks,
// where "finished executing" means succeeded, failed, or skipped.
if facts.checkDAGTasksDone() {
// return list of tasks with all final tasks
for _, t := range facts.State {
Expand Down
191 changes: 181 additions & 10 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Expand Up @@ -220,7 +220,7 @@ func TestPipelineRunFacts_CheckDAGTasksDoneDone(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
d, err := dagFromState(tc.state)
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err)
t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err)
}
facts := PipelineRunFacts{
State: tc.state,
Expand Down Expand Up @@ -600,6 +600,177 @@ func TestGetNextTaskWithRetries(t *testing.T) {
}
}

// TestDAGExecutionQueue tests the DAGExecutionQueue function for PipelineTasks
// in different states (without dependencies on each other) and the PipelineRun in different states.
func TestDAGExecutionQueue(t *testing.T) {
createdTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "createdtask",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRunName: "createdtask",
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
createdRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "createdrun",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
RunName: "createdrun",
CustomTask: true,
}
runningTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "runningtask",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRunName: "runningtask",
TaskRun: newTaskRun(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
runningRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "runningrun",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
RunName: "runningrun",
Run: newRun(runs[0]),
CustomTask: true,
}
successfulTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "successfultask",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRunName: "successfultask",
TaskRun: makeSucceeded(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
successfulRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "successfulrun",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
RunName: "successfulrun",
Run: makeRunSucceeded(runs[0]),
CustomTask: true,
}
failedTask := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "failedtask",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRunName: "failedtask",
TaskRun: makeFailed(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
failedRun := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "failedrun",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
RunName: "failedrun",
Run: makeRunFailed(runs[0]),
CustomTask: true,
}
failedTaskWithRetries := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "failedtaskwithretries",
TaskRef: &v1beta1.TaskRef{Name: "task"},
Retries: 1,
},
TaskRunName: "failedtaskwithretries",
TaskRun: makeFailed(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}
failedRunWithRetries := ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: "failedrunwithretries",
TaskRef: &v1beta1.TaskRef{Name: "task"},
Retries: 1,
},
RunName: "failedrunwithretries",
Run: makeRunFailed(runs[0]),
CustomTask: true,
}
tcs := []struct {
name string
state PipelineRunState
specStatus v1beta1.PipelineRunSpecStatus
want PipelineRunState
}{{
name: "cancelled",
specStatus: v1beta1.PipelineRunSpecStatusCancelled,
state: PipelineRunState{
&createdTask, &createdRun,
&runningTask, &runningRun, &successfulTask, &successfulRun,
&failedTaskWithRetries, &failedRunWithRetries,
},
}, {
name: "gracefully cancelled",
specStatus: v1beta1.PipelineRunSpecStatusCancelledRunFinally,
state: PipelineRunState{
&createdTask, &createdRun,
&runningTask, &runningRun, &successfulTask, &successfulRun,
&failedTaskWithRetries, &failedRunWithRetries,
},
}, {
name: "gracefully stopped",
specStatus: v1beta1.PipelineRunSpecStatusStoppedRunFinally,
state: PipelineRunState{
&createdTask, &createdRun, &runningTask, &runningRun, &successfulTask, &successfulRun,
&failedTaskWithRetries, &failedRunWithRetries,
},
}, {
name: "running",
state: PipelineRunState{
&createdTask, &createdRun, &runningTask, &runningRun,
&failedTaskWithRetries, &failedRunWithRetries, &successfulTask, &successfulRun,
},
want: PipelineRunState{&createdTask, &createdRun, &failedTaskWithRetries, &failedRunWithRetries},
}, {
name: "stopped",
state: PipelineRunState{
&createdTask, &createdRun, &runningTask, &runningRun,
&successfulTask, &successfulRun, &failedTask, &failedRun,
},
}, {
name: "all tasks finished",
state: PipelineRunState{&successfulTask, &successfulRun, &failedTask, &failedRun},
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
d, err := dagFromState(tc.state)
if err != nil {
t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err)
}
facts := PipelineRunFacts{
State: tc.state,
SpecStatus: tc.specStatus,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
queue, err := facts.DAGExecutionQueue()
if err != nil {
t.Errorf("unexpected error getting DAG execution queue: %s", err)
}
if d := cmp.Diff(tc.want, queue); d != "" {
t.Errorf("Didn't get expected execution queue: %s", diff.PrintWantGot(d))
}
})
}
}

func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
tcs := []struct {
Expand Down Expand Up @@ -685,7 +856,7 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
d, err := dagFromState(tc.state)
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err)
t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err)
}
facts := PipelineRunFacts{
State: tc.state,
Expand Down Expand Up @@ -910,11 +1081,11 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) {
for _, tc := range tcs {
dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks), v1beta1.PipelineTaskList(tc.DAGTasks).Deps())
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for pipelineTasks %v: %v", tc.DAGTasks, err)
t.Fatalf("Unexpected error while building DAG for pipelineTasks %v: %v", tc.DAGTasks, err)
}
finalGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{})
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for final pipelineTasks %v: %v", tc.finalTasks, err)
t.Fatalf("Unexpected error while building DAG for final pipelineTasks %v: %v", tc.finalTasks, err)
}
t.Run(tc.name, func(t *testing.T) {
facts := PipelineRunFacts{
Expand Down Expand Up @@ -1231,11 +1402,11 @@ func TestGetPipelineConditionStatus(t *testing.T) {
}
d, err := dagFromState(tc.state)
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err)
t.Fatalf("Unexpected error while building DAG for state %v: %v", tc.state, err)
}
dfinally, err := dagFromState(tc.finallyState)
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for finally state %v: %v", tc.finallyState, err)
t.Fatalf("Unexpected error while building DAG for finally state %v: %v", tc.finallyState, err)
}
facts := PipelineRunFacts{
State: tc.state,
Expand Down Expand Up @@ -1374,11 +1545,11 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) {
}
d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks), v1beta1.PipelineTaskList(tc.dagTasks).Deps())
if err != nil {
t.Fatalf("Unexpected error while buildig graph for DAG tasks %v: %v", tc.dagTasks, err)
t.Fatalf("Unexpected error while building graph for DAG tasks %v: %v", tc.dagTasks, err)
}
df, err := dag.Build(v1beta1.PipelineTaskList(tc.finalTasks), map[string][]string{})
if err != nil {
t.Fatalf("Unexpected error while buildig graph for final tasks %v: %v", tc.finalTasks, err)
t.Fatalf("Unexpected error while building graph for final tasks %v: %v", tc.finalTasks, err)
}
facts := PipelineRunFacts{
State: tc.state,
Expand All @@ -1404,7 +1575,7 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) {
func TestGetPipelineConditionStatus_PipelineTimeouts(t *testing.T) {
d, err := dagFromState(oneFinishedState)
if err != nil {
t.Fatalf("Unexpected error while buildig DAG for state %v: %v", oneFinishedState, err)
t.Fatalf("Unexpected error while building DAG for state %v: %v", oneFinishedState, err)
}
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "pipelinerun-no-tasks-started"},
Expand Down Expand Up @@ -1670,7 +1841,7 @@ func TestPipelineRunFacts_GetPipelineTaskStatus(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
d, err := dag.Build(v1beta1.PipelineTaskList(tc.dagTasks), v1beta1.PipelineTaskList(tc.dagTasks).Deps())
if err != nil {
t.Fatalf("Unexpected error while buildig graph for DAG tasks %v: %v", tc.dagTasks, err)
t.Fatalf("Unexpected error while building graph for DAG tasks %v: %v", tc.dagTasks, err)
}
facts := PipelineRunFacts{
State: tc.state,
Expand Down

0 comments on commit e055233

Please sign in to comment.