Skip to content

Commit

Permalink
feat(retries) Design and implement retries
Browse files Browse the repository at this point in the history
closes #221
  • Loading branch information
joseblas committed Apr 29, 2019
1 parent 8f16ebe commit d986c2f
Show file tree
Hide file tree
Showing 11 changed files with 521 additions and 315 deletions.
28 changes: 18 additions & 10 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This document defines `Pipelines` and their capabilities.
- [Pipeline Tasks](#pipeline-tasks)
- [From](#from)
- [RunAfter](#runafter)
- [Retries](#retries)
- [Ordering](#ordering)
- [Examples](#examples)

Expand Down Expand Up @@ -41,6 +42,7 @@ following fields:
- [`runAfter`](#runAfter) - Used when the [Pipeline Task](#pipeline-task)
should be executed after another Pipeline Task, but there is no
[output linking](#from) required
- [`retries`](#retries) - Used when the task is wanted to be executed if it fails. Could a network error or a missing dependency. It does not apply to cancellations.

[kubernetes-overview]:
https://kubernetes.io/docs/concepts/overview/working-with-objects/kubernetes-objects/#required-fields
Expand Down Expand Up @@ -140,16 +142,6 @@ tasks:
name: build-push
```

There is an optional attribute called `retries`, which declares how many times that task should be retries in case of failure,

```yaml
tasks:
- name: build-the-image
retries: 1
taskRef:
name: build-push
```

[Declared `PipelineResources`](#declared-resources) can be given to `Task`s in
the `Pipeline` as inputs and outputs, for example:

Expand Down Expand Up @@ -265,6 +257,22 @@ is no output from `test-app`, so `build-app` uses `runAfter` to indicate that
`test-app` should run before it, regardless of the order they appear in the
spec.

#### retries

Sometimes is needed some policy for retrying tasks for various reasons such as network errors, missing dependencies or upload problems.
Any of those issue must be reflected as False (corev1.ConditionFalse) within the TaskRun Status Succeeded Condition.
For that reason there is an optional attribute called `retries` which declares how many times that task should be retried in case of failure,

```yaml
tasks:
- name: build-the-image
retries: 1
taskRef:
name: build-push
```

In this example, the task "build-the-image" will be executed and if the first run fails a second one would triggered. But, if that fails no more would triggered.

## Ordering

The [Pipeline Tasks](#pipeline-tasks) in a `Pipeline` can be connected and run
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type PipelineTask struct {
Name string `json:"name"`
TaskRef TaskRef `json:"taskRef"`

// Number of retries to be run
// Retries represents how many times this task should be retried in case of task failure
// +optional
Retries int `json:"retries",omitempty`

Expand Down
10 changes: 4 additions & 6 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
func(name string) (*v1alpha1.TaskRun, error) {
return c.taskRunLister.TaskRuns(pr.Namespace).Get(name)
},
//c.taskRunLister.TaskRuns(pr.Namespace).Get
func(name string) (v1alpha1.TaskInterface, error) {
return c.clusterTaskLister.Get(name)
},
Expand Down Expand Up @@ -348,11 +349,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}
}

err = resources.ResolveTaskRuns(c.taskRunLister.TaskRuns(pr.Namespace).Get, pipelineState)
if err != nil {
return fmt.Errorf("Error getting TaskRuns for Pipeline %s: %s", p.Name, err)
}

// If the pipelinerun is cancelled, cancel tasks and update status
if pr.IsCancelled() {
return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet)
Expand Down Expand Up @@ -449,15 +445,17 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)

if tr != nil {
//is a retry
addRetryHistory(tr)
tr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
})
return tr, nil
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).UpdateStatus(tr)
}

tr = &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Expand Down
56 changes: 13 additions & 43 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,11 +691,10 @@ func TestReconcileWithTimeout(t *testing.T) {
}
func TestReconcileCancelledPipelineRun(t *testing.T) {
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world"),
tb.PipelineTask("hello-world-1", "hello-world", tb.Retries(1)),
))}
prs := []*v1alpha1.PipelineRun{tb.PipelineRun("test-pipeline-run-with-timeout", "foo",
tb.PipelineRunSpec("test-pipeline",
//tb.PipelineRunServiceAccount("test-sa"),
tb.PipelineRunCancelled,
),
)}
Expand Down Expand Up @@ -806,16 +805,19 @@ func TestReconcilePropagateLabels(t *testing.T) {
func TestReconcileWithTimeoutAndRetry(t *testing.T) {

tcs := []struct {
name string
retries int
name string
retries int
conditionSucceeded corev1.ConditionStatus
}{
{
name: "One try has to be done",
retries: 1,
name: "One try has to be done",
retries: 1,
conditionSucceeded: corev1.ConditionFalse,
},
{
name: "No more retries are needed",
retries: 2,
name: "No more retries are needed",
retries: 2,
conditionSucceeded: corev1.ConditionUnknown,
},
}

Expand Down Expand Up @@ -843,7 +845,7 @@ func TestReconcileWithTimeoutAndRetry(t *testing.T) {
tb.PodName("my-pod-name"),
tb.Condition(apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
Status: corev1.ConditionFalse,
}),
tb.Retry(v1alpha1.TaskRunStatus{
Status: duckv1beta1.Status{
Expand Down Expand Up @@ -887,46 +889,14 @@ func TestReconcileWithTimeoutAndRetry(t *testing.T) {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

fmt.Printf("TaskRun %v \n", reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded))

if len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus) != tc.retries {
t.Fatalf(" %d retry expected but %d ", tc.retries, len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
}

if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != corev1.ConditionUnknown {
t.Fatalf("Succedded expected to be Unknown but is %s", status)
if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded).Status; status != tc.conditionSucceeded {
t.Fatalf("Succedded expected to be %s but is %s", tc.conditionSucceeded, status)
}

// Check that the expected TaskRun was created
actual := clients.Pipeline.Actions()[0].(ktesting.UpdateAction).GetObject().(*v1alpha1.PipelineRun)
if actual == nil {
t.Errorf("Expected a TaskRun to be created, but it wasn't.")
}

// The TaskRun timeout should be less than or equal to the PipelineRun timeout.
if actual.Spec.Timeout.Duration > prs[0].Spec.Timeout.Duration {
t.Errorf("TaskRun timeout %s should be less than or equal to PipelineRun timeout %s", actual.Spec.Timeout.Duration.String(), prs[0].Spec.Timeout.Duration.String())
}

err = c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-retry-run-with-timeout")
if err != nil {
t.Errorf("Did not expect to see error when reconciling completed PipelineRun but saw %s", err)
}

// Check that the PipelineRun was reconciled correctly
reconciledRun2, err3 := clients.Pipeline.TektonV1alpha1().PipelineRuns("foo").Get("test-pipeline-retry-run-with-timeout", metav1.GetOptions{})
if err3 != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

if reconciledRun2 != nil {
fmt.Println("Second recon ", reconciledRun2.Status)
}

if reconciledRun != nil {
fmt.Printf("Retries %v \n", len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
fmt.Printf("condition %v \n", reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(apis.ConditionSucceeded))
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,12 @@ type PipelineRunState []*ResolvedPipelineRunTask
func (state PipelineRunState) IsDone() (isDone bool) {
isDone = true
for _, t := range state {
if t.TaskRun == nil {
if t.TaskRun == nil || t.PipelineTask == nil {
return false
}
status := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
retriesDone := len(t.TaskRun.Status.RetriesStatus)
retries := t.PipelineTask.Retries
println(status, retries, retriesDone)
isDone = isDone && (status.IsTrue() || status.IsFalse() && retriesDone > retries)
if !isDone {
return
Expand All @@ -82,15 +81,20 @@ func (state PipelineRunState) IsDone() (isDone bool) {

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state *PipelineRunState) GetNextTasks(candidateTasks map[string]v1alpha1.PipelineTask) []*ResolvedPipelineRunTask {
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]v1alpha1.PipelineTask) []*ResolvedPipelineRunTask {
tasks := []*ResolvedPipelineRunTask{}
for _, t := range *state {
for _, t := range state {
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun == nil {
tasks = append(tasks, t)
}
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun != nil {
if len(t.TaskRun.Status.RetriesStatus) < t.PipelineTask.Retries {
tasks = append(tasks, t)
status := t.TaskRun.Status.GetCondition(apis.ConditionSucceeded)
if status != nil && status.IsFalse() {
if !(t.TaskRun.IsCancelled() || status.Reason == "TaskRunCancelled") {
if len(t.TaskRun.Status.RetriesStatus) < t.PipelineTask.Retries {
tasks = append(tasks, t)
}
}
}
}
}
Expand Down Expand Up @@ -239,6 +243,11 @@ func ResolvePipelineRun(
rprt.ResolvedTaskResources = rtr

taskRun, err := getTaskRun(rprt.TaskRunName)
if err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving TaskRun %s: %s", rprt.TaskRunName, err)
}
}
if taskRun != nil {
rprt.TaskRun = taskRun
}
Expand All @@ -248,24 +257,6 @@ func ResolvePipelineRun(
return state, nil
}

// ResolveTaskRuns will go through all tasks in state and check if there are existing TaskRuns
// for each of them by calling getTaskRun.
func ResolveTaskRuns(getTaskRun GetTaskRun, state PipelineRunState) error {
for _, rprt := range state {
// Check if we have already started a TaskRun for this task
taskRun, err := getTaskRun(rprt.TaskRunName)
if err != nil {
// If the TaskRun isn't found, it just means it hasn't been run yet
if !errors.IsNotFound(err) {
return fmt.Errorf("error retrieving TaskRun %s: %s", rprt.TaskRunName, err)
}
} else {
rprt.TaskRun = taskRun
}
}
return nil
}

// getTaskRunName should return a unique name for a `TaskRun` if one has not already been defined, and the existing one otherwise.
func getTaskRunName(taskRunsStatus map[string]*v1alpha1.PipelineRunTaskRunStatus, ptName, prName string) string {
for k, v := range taskRunsStatus {
Expand Down
Loading

0 comments on commit d986c2f

Please sign in to comment.