Skip to content

Commit

Permalink
Adds an initial implementation for conditionals
Browse files Browse the repository at this point in the history
In this implementation, condition evaluations aka ConditionChecks are
backed by TaskRuns. All conditionChecks associated with a `PipelineTask`
have to succeed before the task is executed. If a ConditionCheck fails,
the PipelineTask's associated TaskRun is marked failed i.e. its
`Status.ConditionSucceeded` is False. However, the PipelineRun itself
is not marked as failed.

Also, add more comments for condition_types and more tests for condition
validation and status updates

Add more reconcile tests for conditions
  • Loading branch information
dibyom committed Jul 22, 2019
1 parent a1e61c6 commit e3959f0
Show file tree
Hide file tree
Showing 12 changed files with 1,520 additions and 95 deletions.
13 changes: 7 additions & 6 deletions pkg/apis/pipeline/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package pipeline

// GroupName is the Kubernetes resource group name for Pipeline types.
const (
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
GroupName = "tekton.dev"
TaskLabelKey = "/task"
TaskRunLabelKey = "/taskRun"
PipelineLabelKey = "/pipeline"
PipelineRunLabelKey = "/pipelineRun"
PipelineTaskLabelKey = "/pipelineTask"
PipelineRunConditionCheckKey = "/pipelineConditionCheck"
)
3 changes: 2 additions & 1 deletion pkg/apis/pipeline/v1alpha1/condition_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,5 @@ func TestCondition_Invalidate(t *testing.T) {
}
})
}
}
}

3 changes: 3 additions & 0 deletions pkg/reconciler/v1alpha1/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/knative/pkg/tracker"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
clustertaskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/clustertask"
conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition"
pipelineinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipeline"
resourceinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelineresource"
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/pipelinerun"
Expand Down Expand Up @@ -54,6 +55,7 @@ func NewController(
pipelineRunInformer := pipelineruninformer.Get(ctx)
pipelineInformer := pipelineinformer.Get(ctx)
resourceInformer := resourceinformer.Get(ctx)
conditionInformer := conditioninformer.Get(ctx)
timeoutHandler := reconciler.NewTimeoutHandler(ctx.Done(), logger)

opt := reconciler.Options{
Expand All @@ -72,6 +74,7 @@ func NewController(
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
}
impl := controller.NewImpl(c, c.Logger, pipelineRunControllerName)
Expand Down
228 changes: 162 additions & 66 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
apisconfig "github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
artifacts "github.com/tektoncd/pipeline/pkg/artifacts"
"github.com/tektoncd/pipeline/pkg/artifacts"
listers "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/v1alpha1/pipeline/dag"
Expand Down Expand Up @@ -58,6 +58,9 @@ const (
// ReasonCouldntGetResource indicates that the reason for the failure status is that the
// associated PipelineRun's bound PipelineResources couldn't all be retrieved
ReasonCouldntGetResource = "CouldntGetResource"
// ReasonCouldntGetCondition indicates that the reason for the failure status is that the
// associated Pipeline's Conditions couldn't all be retrieved
ReasonCouldntGetCondition = "CouldntGetCondition"
// ReasonFailedValidation indicates that the reason for failure status is
// that pipelinerun failed runtime validation
ReasonFailedValidation = "PipelineValidationFailed"
Expand Down Expand Up @@ -89,6 +92,7 @@ type Reconciler struct {
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister listers.PipelineResourceLister
conditionLister listers.ConditionLister
tracker tracker.Interface
configStore configStore
timeoutHandler *reconciler.TimeoutSet
Expand Down Expand Up @@ -259,6 +263,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return c.clusterTaskLister.Get(name)
},
c.resourceLister.PipelineResources(pr.Namespace).Get,
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
p.Spec.Tasks, providedResources,
)
if err != nil {
Expand All @@ -280,6 +287,14 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
case *resources.ConditionNotFoundError:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: ReasonCouldntGetCondition,
Message: fmt.Sprintf("PipelineRun %s can't be Run; it contains Conditions that don't exist: %s",
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
})
default:
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Expand Down Expand Up @@ -334,6 +349,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
if err != nil {
c.Logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
}

rprts := pipelineState.GetNextTasks(candidateTasks)

var as artifacts.ArtifactStorageInterface
Expand All @@ -344,11 +360,21 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

for _, rprt := range rprts {
if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return xerrors.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return xerrors.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else if !rprt.ResolvedConditionChecks.HasStarted() {
for _, rcc := range rprt.ResolvedConditionChecks {
rcc.ConditionCheck, err = c.makeConditionCheckContainer(c.Logger, rprt, rcc, pr)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "ConditionCheckCreationFailed", "Failed to create TaskRun %q: %v", rcc.ConditionCheckName, err)
return xerrors.Errorf("error creating ConditionCheck container called %s for PipelineTask %s from PipelineRun %s: %w", rcc.ConditionCheckName, rprt.PipelineTask.Name, pr.Name, err)
}
}
}
}
}
Expand All @@ -365,21 +391,54 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask) {
for _, rprt := range pipelineState {
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
continue
}
var prtrs *v1alpha1.PipelineRunTaskRunStatus
if rprt.TaskRun != nil {
prtrs := pr.Status.TaskRuns[rprt.TaskRun.Name]
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
pr.Status.TaskRuns[rprt.TaskRun.Name] = prtrs
prtrs = pr.Status.TaskRuns[rprt.TaskRun.Name]
}
if prtrs == nil {
prtrs = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
}

if rprt.TaskRun != nil {
prtrs.Status = &rprt.TaskRun.Status
}

if len(rprt.ResolvedConditionChecks) > 0 {
cStatus := make(map[string]*v1alpha1.PipelineRunConditionCheckStatus)
for _, c := range rprt.ResolvedConditionChecks {
cStatus[c.ConditionCheckName] = &v1alpha1.PipelineRunConditionCheckStatus{
ConditionName: c.Condition.Name,
}
if c.ConditionCheck != nil {
ccStatus := v1alpha1.ConditionCheckStatus(c.ConditionCheck.Status)
cStatus[c.ConditionCheckName].Status = &ccStatus
}
}
prtrs.ConditionChecks = cStatus
if rprt.ResolvedConditionChecks.IsComplete() && !rprt.ResolvedConditionChecks.IsSuccess() {
if prtrs.Status == nil {
prtrs.Status = &v1alpha1.TaskRunStatus{}
}
prtrs.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: resources.ReasonConditionCheckFailed,
Message: fmt.Sprintf("ConditionChecks failed for Task %s in PipelineRun %s", rprt.TaskRunName, pr.Name),
})
}
}
pr.Status.TaskRuns[rprt.TaskRunName] = prtrs
}
}

func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) error {
for taskRunName := range pr.Status.TaskRuns {
// TODO(dibyom): Add conditionCheck statuses here
prtrs := pr.Status.TaskRuns[taskRunName]
tr, err := c.taskRunLister.TaskRuns(pr.Namespace).Get(taskRunName)
if err != nil {
Expand All @@ -391,61 +450,11 @@ func (c *Reconciler) updateTaskRunsStatusDirectly(pr *v1alpha1.PipelineRun) erro
prtrs.Status = &tr.Status
}
}

return nil
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, storageBasePath string) (*v1alpha1.TaskRun, error) {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration
if pr.Spec.Timeout == nil {
timeout = config.DefaultTimeoutMinutes
} else {
timeout = pr.Spec.Timeout.Duration
}
// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
if timeout != apisconfig.NoTimeoutDuration {
pTimeoutTime := pr.Status.StartTime.Add(timeout)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set the timeout to 1 second.
taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 1 * time.Second}
}
} else {
taskRunTimeout = &metav1.Duration{Duration: timeout}
}
}

// If service account is configured for a given PipelineTask, override PipelineRun's seviceAccount
serviceAccount := pr.Spec.ServiceAccount
for _, sa := range pr.Spec.ServiceAccounts {
if sa.TaskName == rprt.PipelineTask.Name {
serviceAccount = sa.ServiceAccount
}
}

// Propagate labels from PipelineRun to TaskRun.
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name
if rprt.PipelineTask.Name != "" {
labels[pipeline.GroupName+pipeline.PipelineTaskLabelKey] = rprt.PipelineTask.Name
}

// Propagate annotations from PipelineRun to TaskRun.
annotations := make(map[string]string, len(pr.ObjectMeta.Annotations)+1)
for key, val := range pr.ObjectMeta.Annotations {
annotations[key] = val
}

tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)

if tr != nil {
//is a retry
addRetryHistory(tr)
Expand All @@ -463,8 +472,8 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
Name: rprt.TaskRunName,
Namespace: pr.Namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: labels,
Annotations: annotations,
Labels: getTaskrunLabels(pr, rprt.PipelineTask.Name), // Propagate labels from PipelineRun to TaskRun.
Annotations: getTaskrunAnnotations(pr), // Propagate annotations from PipelineRun to TaskRun.
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Expand All @@ -474,8 +483,11 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
Inputs: v1alpha1.TaskRunInputs{
Params: rprt.PipelineTask.Params,
},
ServiceAccount: serviceAccount,
Timeout: taskRunTimeout,
ServiceAccount: getServiceAccount(pr, rprt.PipelineTask.Name),
NodeSelector: pr.Spec.NodeSelector,
Tolerations: pr.Spec.Tolerations,
Affinity: pr.Spec.Affinity,
Timeout: getTaskRunTimeout(pr),
PodTemplate: podTemplate,
}}

Expand All @@ -484,6 +496,26 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

func getTaskrunAnnotations(pr *v1alpha1.PipelineRun) map[string]string {
annotations := make(map[string]string, len(pr.ObjectMeta.Annotations)+1)
for key, val := range pr.ObjectMeta.Annotations {
annotations[key] = val
}
return annotations
}

func getTaskrunLabels(pr *v1alpha1.PipelineRun, pipelineTaskName string) map[string]string {
labels := make(map[string]string, len(pr.ObjectMeta.Labels)+1)
for key, val := range pr.ObjectMeta.Labels {
labels[key] = val
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name
if pipelineTaskName != "" {
labels[pipeline.GroupName+pipeline.PipelineTaskLabelKey] = pipelineTaskName
}
return labels
}

func addRetryHistory(tr *v1alpha1.TaskRun) {
newStatus := *tr.Status.DeepCopy()
newStatus.RetriesStatus = nil
Expand Down Expand Up @@ -527,3 +559,67 @@ func (c *Reconciler) updateLabelsAndAnnotations(pr *v1alpha1.PipelineRun) (*v1al
}
return newPr, nil
}

func (c *Reconciler) makeConditionCheckContainer(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, rcc *resources.ResolvedConditionCheck, pr *v1alpha1.PipelineRun) (*v1alpha1.ConditionCheck, error) {
labels := getTaskrunLabels(pr, rprt.PipelineTask.Name)
labels[pipeline.GroupName+pipeline.PipelineRunConditionCheckKey] = rcc.ConditionCheckName

tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rcc.ConditionCheckName,
Namespace: pr.Namespace,
OwnerReferences: pr.GetOwnerReference(),
Labels: labels,
Annotations: getTaskrunAnnotations(pr), // Propagate annotations from PipelineRun to TaskRun.
},
Spec: v1alpha1.TaskRunSpec{
TaskSpec: rcc.ConditionToTaskSpec(),
ServiceAccount: getServiceAccount(pr, rprt.PipelineTask.Name),
Timeout: getTaskRunTimeout(pr),
NodeSelector: pr.Spec.NodeSelector,
Tolerations: pr.Spec.Tolerations,
Affinity: pr.Spec.Affinity,
}}

cctr, err := c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
cc := v1alpha1.ConditionCheck(*cctr)
return &cc, err
}

func getTaskRunTimeout(pr *v1alpha1.PipelineRun) *metav1.Duration {
var taskRunTimeout = &metav1.Duration{Duration: apisconfig.NoTimeoutDuration}

var timeout time.Duration
if pr.Spec.Timeout == nil {
timeout = config.DefaultTimeoutMinutes
} else {
timeout = pr.Spec.Timeout.Duration
}
// If the value of the timeout is 0 for any resource, there is no timeout.
// It is impossible for pr.Spec.Timeout to be nil, since SetDefault always assigns it with a value.
if timeout != apisconfig.NoTimeoutDuration {
pTimeoutTime := pr.Status.StartTime.Add(timeout)
if time.Now().After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set the timeout to 1 second.
taskRunTimeout = &metav1.Duration{Duration: time.Until(pTimeoutTime)}
if taskRunTimeout.Duration < 0 {
taskRunTimeout = &metav1.Duration{Duration: 1 * time.Second}
}
} else {
taskRunTimeout = &metav1.Duration{Duration: timeout}
}
}
return taskRunTimeout
}

func getServiceAccount(pr *v1alpha1.PipelineRun, pipelineTaskName string) string {
// If service account is configured for a given PipelineTask, override PipelineRun's seviceAccount
serviceAccount := pr.Spec.ServiceAccount
for _, sa := range pr.Spec.ServiceAccounts {
if sa.TaskName == pipelineTaskName {
serviceAccount = sa.ServiceAccount
}
}
return serviceAccount
}
Loading

0 comments on commit e3959f0

Please sign in to comment.