Skip to content

Commit

Permalink
Add a TaskTimeout optional field to pipelinerun type
Browse files Browse the repository at this point in the history
TaskTimeout is used to timeout all dag tasks, finally tasks excluded

Validates a TasksTimeout if:
	- TasksTimeout > 0
	- Timeout is specified and TasksTimeout <= Timeout
 	- Timeout not specified and TasksTimeout <= default Timeout

Add a builder function for taskTimeout

Defines 2 functions to get taskrun timeout
One for dag tasks and one specifically for finally tasks
  • Loading branch information
souleb committed Mar 22, 2021
1 parent 02f5f42 commit 9d5768a
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 14 deletions.
7 changes: 7 additions & 0 deletions internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,13 @@ func PipelineRunNilTimeout(prs *v1beta1.PipelineRunSpec) {
prs.Timeout = nil
}

// PipelineRunTasksTimeout sets the timeout to the PipelineRunSpec.
func PipelineRunTasksTimeout(duration time.Duration) PipelineRunSpecOp {
return func(prs *v1beta1.PipelineRunSpec) {
prs.TasksTimeout = &metav1.Duration{Duration: duration}
}
}

// PipelineRunNodeSelector sets the Node selector to the PipelineRunSpec.
func PipelineRunNodeSelector(values map[string]string) PipelineRunSpecOp {
return func(prs *v1beta1.PipelineRunSpec) {
Expand Down
12 changes: 9 additions & 3 deletions internal/builder/v1beta1/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func TestPipelineRun(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource", tb.PipelineResourceBindingRef("my-special-resource")),
tb.PipelineRunServiceAccountNameTask("foo", "sa-2"),
tb.PipelineRunPipelineRefBundle("/some/registry"),
Expand Down Expand Up @@ -209,7 +210,8 @@ func TestPipelineRun(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceRef: &v1beta1.PipelineResourceRef{
Expand Down Expand Up @@ -244,6 +246,7 @@ func TestPipelineRunWithPodTemplate(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource", tb.PipelineResourceBindingRef("my-special-resource")),
tb.PipelineRunServiceAccountNameTask("foo", "sa-2"),
tb.PipelineRunNodeSelector(map[string]string{
Expand Down Expand Up @@ -276,7 +279,8 @@ func TestPipelineRunWithPodTemplate(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceRef: &v1beta1.PipelineResourceRef{
Expand Down Expand Up @@ -316,6 +320,7 @@ func TestPipelineRunWithResourceSpec(t *testing.T) {
tb.PipelineRunParam("first-param-string", "first-value"),
tb.PipelineRunParam("second-param-array", "some", "array"),
tb.PipelineRunTimeout(1*time.Hour),
tb.PipelineRunTasksTimeout(50*time.Minute),
tb.PipelineRunResourceBinding("some-resource",
tb.PipelineResourceBindingResourceSpec(&resource.PipelineResourceSpec{
Type: v1beta1.PipelineResourceTypeGit,
Expand Down Expand Up @@ -351,7 +356,8 @@ func TestPipelineRunWithResourceSpec(t *testing.T) {
Name: "second-param-array",
Value: *v1beta1.NewArrayOrString("some", "array"),
}},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 1 * time.Hour},
TasksTimeout: &metav1.Duration{Duration: 50 * time.Minute},
Resources: []v1beta1.PipelineResourceBinding{{
Name: "some-resource",
ResourceSpec: &resource.PipelineResourceSpec{
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ type PipelineRunSpec struct {
// Used for cancelling a pipelinerun (and maybe more later on)
// +optional
Status PipelineRunSpecStatus `json:"status,omitempty"`
// Time after which the Pipeline tasks time out.
// Finally tasks can run beyond this as they are bound to the pipeline timeout.
// +optional
TasksTimeout *metav1.Duration `json:"tasksTimeout,omitempty"`
// Time after which the Pipeline times out. Defaults to never.
// Refer to Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration
// +optional
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1beta1
import (
"context"
"fmt"
"time"

"github.com/google/go-containerregistry/pkg/name"
"github.com/tektoncd/pipeline/pkg/apis/config"
Expand Down Expand Up @@ -82,6 +83,24 @@ func (ps *PipelineRunSpec) Validate(ctx context.Context) (errs *apis.FieldError)
}
}

if ps.TasksTimeout != nil {
// tasksTimeout should be a valid duration of at least 0.
if ps.TasksTimeout.Duration < 0 {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be >= 0", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}

if ps.Timeout != nil {
if ps.TasksTimeout.Duration > ps.Timeout.Duration {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be <= timeout duration", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
if ps.TasksTimeout.Duration > defaultTimeout {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be <= default timeout duration", ps.TasksTimeout.Duration.String()), "tasksTimeout"))
}
}
}

if ps.Status != "" {
if ps.Status != PipelineRunSpecStatusCancelled && ps.Status != PipelineRunSpecStatusPending {
errs = errs.Also(apis.ErrInvalidValue(fmt.Sprintf("%s should be %s or %s", ps.Status, PipelineRunSpecStatusCancelled, PipelineRunSpecStatusPending), "status"))
Expand Down
42 changes: 42 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,35 @@ func TestPipelineRun_Invalid(t *testing.T) {
},
},
want: apis.ErrInvalidValue("-48h0m0s should be >= 0", "spec.timeout"),
}, {
name: "negative pipeline tasksTimeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: -48 * time.Hour},
},
},
want: apis.ErrInvalidValue("-48h0m0s should be >= 0", "spec.tasksTimeout"),
}, {
name: "pipeline tasksTimeout > pipeline Timeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: 1 * time.Hour},
Timeout: &metav1.Duration{Duration: 25 * time.Minute},
},
},
want: apis.ErrInvalidValue("1h0m0s should be <= Timeout duration", "spec.tasksTimeout"),
}, {
name: "wrong pipelinerun cancel",
pr: v1beta1.PipelineRun{
Expand Down Expand Up @@ -206,6 +235,19 @@ func TestPipelineRun_Validate(t *testing.T) {
Timeout: &metav1.Duration{Duration: 0},
},
},
}, {
name: "no tasksTimeout",
pr: v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinelineName",
},
Spec: v1beta1.PipelineRunSpec{
PipelineRef: &v1beta1.PipelineRef{
Name: "prname",
},
TasksTimeout: &metav1.Duration{Duration: 0},
},
},
}, {
name: "array param with pipelinespec and taskspec",
pr: v1beta1.PipelineRun{
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/pipeline/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 50 additions & 10 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,11 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
return fmt.Errorf("error creating Run called %s for PipelineTask %s from PipelineRun %s: %w", rprt.RunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
if rprt.IsFinalTask(pipelineRunFacts) {
rprt.TaskRun, err = c.createFinallyTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
} else {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
}
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
Expand Down Expand Up @@ -692,7 +696,17 @@ func (c *Reconciler) updateRunsStatusDirectly(pr *v1beta1.PipelineRun) error {
return nil
}

type getTimeoutFunc func(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration

func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string) (*v1beta1.TaskRun, error) {
return c.createTaskRunHelper(ctx, rprt, pr, storageBasePath, getTaskRunTimeout)
}

func (c *Reconciler) createFinallyTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string) (*v1beta1.TaskRun, error) {
return c.createTaskRunHelper(ctx, rprt, pr, storageBasePath, getFinallyTaskRunTimeout)
}

func (c *Reconciler) createTaskRunHelper(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
Expand All @@ -719,7 +733,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
Spec: v1beta1.TaskRunSpec{
Params: rprt.PipelineTask.Params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
Timeout: getTaskRunTimeout(ctx, pr, rprt),
Timeout: getTimeoutFunc(ctx, pr, rprt),
PodTemplate: taskRunSpec.TaskPodTemplate,
}}

Expand Down Expand Up @@ -926,30 +940,58 @@ func combineTaskRunAndTaskSpecAnnotations(pr *v1beta1.PipelineRun, pipelineTask
return annotations
}

func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
func getFinallyTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration
if pr.Spec.Timeout == nil {

if pr.Spec.Timeout != nil {
timeout = pr.Spec.Timeout.Duration
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
timeout = defaultTimeout * time.Minute
} else {
}

// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
taskRunTimeout = taskRunTimeoutHelper(timeout, pr, taskRunTimeout, rprt)

return taskRunTimeout
}

func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration

// TODO @souleb wrap into alpha feature flag
if pr.Spec.TasksTimeout != nil {
timeout = pr.Spec.TasksTimeout.Duration
} else if pr.Spec.Timeout != nil {
timeout = pr.Spec.Timeout.Duration
} else {
defaultTimeout := time.Duration(config.FromContextOrDefaults(ctx).Defaults.DefaultTimeoutMinutes)
timeout = defaultTimeout * time.Minute
}

// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
taskRunTimeout = taskRunTimeoutHelper(timeout, pr, taskRunTimeout, rprt)

return taskRunTimeout
}

func taskRunTimeoutHelper(timeout time.Duration, pr *v1beta1.PipelineRun, taskRunTimeout *metav1.Duration, rprt *resources.ResolvedPipelineRunTask) *metav1.Duration {
if timeout != apisconfig.NoTimeoutDuration {
pTimeoutTime := pr.Status.StartTime.Add(timeout)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set the timeout to 1 second.

taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 1 * time.Second}
}
} else {
// check if PipelineTask has a timeout specified

if rprt.PipelineTask.Timeout != nil {
taskRunTimeout = &metav1.Duration{Duration: rprt.PipelineTask.Timeout.Duration}
} else {
Expand All @@ -958,11 +1000,9 @@ func getTaskRunTimeout(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resou
}
}

// check if PipelineTask has a timeout specified even if PipelineRun doesn't have a timeout
if timeout == apisconfig.NoTimeoutDuration && rprt.PipelineTask.Timeout != nil {
taskRunTimeout = &metav1.Duration{Duration: rprt.PipelineTask.Timeout.Duration}
}

return taskRunTimeout
}

Expand Down
Loading

0 comments on commit 9d5768a

Please sign in to comment.