Skip to content

Commit

Permalink
Add a TaskTimeout optional field to pipelinerun type
Browse files Browse the repository at this point in the history
TaskTimeout is used to timeout all dag tasks, finally tasks excluded

Validates a TasksTimeout if:
	- TasksTimeout > 0
	- Timeout is specified and TasksTimeout <= Timeout
 	- Timeout not specified and TasksTimeout <= default Timeout

Add a builder function for taskTimeout

Defines 2 functions to get taskrun timeout
One for dag tasks and one specifically for finally tasks
  • Loading branch information
souleb committed Mar 18, 2021
1 parent 02f5f42 commit b7f3bf9
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 14 deletions.
7 changes: 7 additions & 0 deletions internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,13 @@ func PipelineRunNilTimeout(prs *v1beta1.PipelineRunSpec) {
prs.Timeout = nil
}

// PipelineRunTasksTimeout sets the timeout to the PipelineRunSpec.
func PipelineRunTasksTimeout(duration time.Duration) PipelineRunSpecOp {
return func(prs *v1beta1.PipelineRunSpec) {
prs.TasksTimeout = &metav1.Duration{Duration: duration}
}
}

// PipelineRunNodeSelector sets the Node selector to the PipelineRunSpec.
func PipelineRunNodeSelector(values map[string]string) PipelineRunSpecOp {
return func(prs *v1beta1.PipelineRunSpec) {
Expand Down
12 changes: 9 additions & 3 deletions internal/builder/v1beta1/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestPipelineRun(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource", tb.PipelineResourceBindingRef("my-special-resource")),
tb.PipelineRunServiceAccountNameTask("foo", "sa-2"),
tb.PipelineRunPipelineRefBundle("/some/registry"),
Expand Down Expand Up @@ -209,7 +210,8 @@ func TestPipelineRun(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceRef: &v1beta1.PipelineResourceRef{
Expand Down Expand Up @@ -244,6 +246,7 @@ func TestPipelineRunWithPodTemplate(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource", tb.PipelineResourceBindingRef("my-special-resource")),
tb.PipelineRunServiceAccountNameTask("foo", "sa-2"),
tb.PipelineRunNodeSelector(map[string]string{
Expand Down Expand Up @@ -276,7 +279,8 @@ func TestPipelineRunWithPodTemplate(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceRef: &v1beta1.PipelineResourceRef{
Expand Down Expand Up @@ -316,6 +320,7 @@ func TestPipelineRunWithResourceSpec(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource",
tb.PipelineResourceBindingResourceSpec(&resource.PipelineResourceSpec{
Type: v1beta1.PipelineResourceTypeGit,
Expand Down Expand Up @@ -351,7 +356,8 @@ func TestPipelineRunWithResourceSpec(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceSpec: &resource.PipelineResourceSpec{
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type PipelineRunSpec struct {
// Used for cancelling a pipelinerun (and maybe more later on)
// +optional
Status PipelineRunSpecStatus `json:"status,omitempty"`
// Time after which the Pipeline dag tasks time out.
// +optional
TasksTimeout *metav1.Duration `json:"tasksTimeout,omitempty"`
// Time after which the Pipeline times out. Defaults to never.
// Refer to Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration
// +optional
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"context"
"fmt"
"time"

"github.com/google/go-containerregistry/pkg/name"
"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -82,6 +83,24 @@ func (ps *PipelineRunSpec) Validate(ctx context.Context) (errs *apis.FieldError)
}
}

if ps.TasksTimeout != nil {
// tasksTimeout should be a valid duration of at least 0.
if ps.TasksTimeout.Duration < 0 {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be >= 0", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}

if ps.Timeout != nil {
if ps.TasksTimeout.Duration > ps.Timeout.Duration {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be <= Timeout duration", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
if ps.TasksTimeout.Duration > defaultTimeout {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be <= default Timeout duration", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}
}
}

if ps.Status != "" {
if ps.Status != PipelineRunSpecStatusCancelled && ps.Status != PipelineRunSpecStatusPending {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ps.Status, PipelineRunSpecStatusCancelled, PipelineRunSpecStatusPending), "status"))
Expand Down
42 changes: 42 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,35 @@ func TestPipelineRun_Invalid(t *testing.T) {
},
},
want: apis.ErrInvalidValue("-48h0m0s should be >= 0", "spec.timeout"),
}, {
name: "negative pipeline tasksTimeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: -48 * time.Hour},
},
},
want: apis.ErrInvalidValue("-48h0m0s should be >= 0", "spec.tasksTimeout"),
}, {
name: "pipeline tasksTimeout > pipeline Timeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 25 * time.Minute},
},
},
want: apis.ErrInvalidValue("1h0m0s should be <= Timeout duration", "spec.tasksTimeout"),
}, {
name: "wrong pipelinerun cancel",
pr: v1beta1.PipelineRun{
Expand Down Expand Up @@ -206,6 +235,19 @@ func TestPipelineRun_Validate(t *testing.T) {
Timeout: &metav1.Duration{Duration: 0},
},
},
}, {
name: "no tasksTimeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: 0},
},
},
}, {
name: "array param with pipelinespec and taskspec",
pr: v1beta1.PipelineRun{
Expand Down
60 changes: 50 additions & 10 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,11 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return fmt.Errorf("error creating Run called %s for PipelineTask %s from PipelineRun %s: %w", rprt.RunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRun, err = c.createFinallyTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
Expand Down Expand Up @@ -692,7 +696,17 @@ func (c *Reconciler) updateRunsStatusDirectly(pr *v1beta1.PipelineRun) error {
return nil
}

type getTimeoutFunc func(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration

func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string) (*v1beta1.TaskRun, error) {
return c.createTaskRunHelper(ctx, rprt, pr, storageBasePath, getTaskRunTimeout)
}

func (c *Reconciler) createFinallyTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string) (*v1beta1.TaskRun, error) {
return c.createTaskRunHelper(ctx, rprt, pr, storageBasePath, getFinallyTaskRunTimeout)
}

func (c *Reconciler) createTaskRunHelper(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
Expand All @@ -719,7 +733,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
Spec: v1beta1.TaskRunSpec{
Params: rprt.PipelineTask.Params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
Timeout: getTaskRunTimeout(ctx, pr, rprt),
Timeout: getTimeoutFunc(ctx, pr, rprt),
PodTemplate: taskRunSpec.TaskPodTemplate,
}}

Expand Down Expand Up @@ -926,30 +940,58 @@ func combineTaskRunAndTaskSpecAnnotations(pr *v1beta1.PipelineRun, pipelineTask
return annotations
}

func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
func getFinallyTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration
if pr.Spec.Timeout == nil {

if pr.Spec.Timeout != nil {
timeout = pr.Spec.Timeout.Duration
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
timeout = defaultTimeout * time.Minute
} else {
}

// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
taskRunTimeout = taskRunTimeoutHelper(timeout, pr, taskRunTimeout, rprt)

return taskRunTimeout
}

func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration

// TODO @souleb wrap into alpha feature flag
if pr.Spec.TasksTimeout != nil {
timeout = pr.Spec.TasksTimeout.Duration
} else if pr.Spec.Timeout != nil {
timeout = pr.Spec.Timeout.Duration
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
timeout = defaultTimeout * time.Minute
}

// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
taskRunTimeout = taskRunTimeoutHelper(timeout, pr, taskRunTimeout, rprt)

return taskRunTimeout
}

func taskRunTimeoutHelper(timeout time.Duration, pr *v1beta1.PipelineRun, taskRunTimeout *metav1.Duration, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
if timeout != apisconfig.NoTimeoutDuration {
pTimeoutTime := pr.Status.StartTime.Add(timeout)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set the timeout to 1 second.

taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 1 * time.Second}
}
} else {
// check if PipelineTask has a timeout specified

if rprt.PipelineTask.Timeout != nil {
taskRunTimeout = &metav1.Duration{Duration: rprt.PipelineTask.Timeout.Duration}
} else {
Expand All @@ -958,11 +1000,9 @@ func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resou
}
}

// check if PipelineTask has a timeout specified even if PipelineRun doesn't have a timeout
if timeout == apisconfig.NoTimeoutDuration && rprt.PipelineTask.Timeout != nil {
taskRunTimeout = &metav1.Duration{Duration: rprt.PipelineTask.Timeout.Duration}
}

return taskRunTimeout
}

Expand Down
100 changes: 99 additions & 1 deletion pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,7 +2071,44 @@ func TestGetTaskRunTimeout(t *testing.T) {
},
},
expected: &metav1.Duration{Duration: 2 * time.Minute},
}}
}, {
name: "taskstimeout specified in pr",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTasksTimeout(20*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Timeout: nil,
},
},
expected: &metav1.Duration{Duration: 20 * time.Minute},
}, {
name: "40m timeout duration, 20m taskstimeout duration",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTimeout(40*time.Minute), tb.PipelineRunTasksTimeout(20*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Timeout: nil,
},
},
expected: &metav1.Duration{Duration: 20 * time.Minute},
}, {
name: "taskrun being created with taskstimeout for PipelineTask",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTasksTimeout(20*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Timeout: &metav1.Duration{Duration: 2 * time.Minute},
},
},
expected: &metav1.Duration{Duration: 2 * time.Minute},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -2082,6 +2119,67 @@ func TestGetTaskRunTimeout(t *testing.T) {
}
}

func TestGetFinallyTaskRunTimeout(t *testing.T) {
prName := "pipelinerun-finallyTimeouts"
ns := "foo"
p := "pipeline"

tcs := []struct {
name string
pr *v1beta1.PipelineRun
rprt *resources.ResolvedPipelineRunTask
expected *metav1.Duration
}{{
name: "nil timeout duration",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunNilTimeout),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{},
},
expected: &metav1.Duration{Duration: 60 * time.Minute},
}, {
name: "timeout specified in pr",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTimeout(20*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{},
},
expected: &metav1.Duration{Duration: 20 * time.Minute},
}, {
name: "taskrun being created after timeout expired",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTimeout(1*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now().Add(-2*time.Minute)))),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{},
},
expected: &metav1.Duration{Duration: 1 * time.Second},
}, {
name: "40m timeout duration, 20m taskstimeout duration",
pr: tb.PipelineRun(prName, tb.PipelineRunNamespace(ns),
tb.PipelineRunSpec(p, tb.PipelineRunTimeout(40*time.Minute), tb.PipelineRunTasksTimeout(20*time.Minute)),
tb.PipelineRunStatus(tb.PipelineRunStartTime(time.Now())),
),
rprt: &resources.ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{},
},
expected: &metav1.Duration{Duration: 40 * time.Minute},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
if d := cmp.Diff(getFinallyTaskRunTimeout(context.TODO(), tc.pr, tc.rprt), tc.expected); d != "" {
t.Errorf("Unexpected finally task run timeout. Diff %s", diff.PrintWantGot(d))
}
})
}
}

// TestReconcileAndPropagateCustomPipelineTaskRunSpec tests that custom PipelineTaskRunSpec declared
// in PipelineRun is propagated to created TaskRuns
func TestReconcileAndPropagateCustomPipelineTaskRunSpec(t *testing.T) {
Expand Down
Loading

0 comments on commit b7f3bf9

Please sign in to comment.