Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements on pipeline cancel #2543

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions pkg/reconciler/pipelinerun/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
clientset "github.com/tektoncd/pipeline/pkg/client/clientset/versioned"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/apis"
)

// cancelPipelineRun marks the PipelineRun as cancelled and any resolved TaskRun(s) too.
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipelineState []*resources.ResolvedPipelineRunTask, clientSet clientset.Interface) error {
func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, clientSet clientset.Interface) error {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Expand All @@ -45,23 +44,21 @@ func cancelPipelineRun(logger *zap.SugaredLogger, pr *v1alpha1.PipelineRun, pipe
// update pr completed time
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}
errs := []string{}
for _, rprt := range pipelineState {
if rprt.TaskRun == nil {
// No taskrun yet, pass
continue
}

logger.Infof("cancelling TaskRun %s", rprt.TaskRunName)
// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
// TaskRuns at the same time and trying to update the entire object may cause a race
b, err := getCancelPatch()
if err != nil {
return fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err)
}

// Use Patch to update the TaskRuns since the TaskRun controller may be operating on the
// TaskRuns at the same time and trying to update the entire object may cause a race
b, err := getCancelPatch()
if err != nil {
errs = append(errs, fmt.Errorf("couldn't make patch to update TaskRun cancellation: %v", err).Error())
continue
}
if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(rprt.TaskRunName, types.JSONPatchType, b, ""); err != nil {
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", rprt.TaskRunName, err).Error())
// Loop over the TaskRuns in the PipelineRun status.
// If a TaskRun is not in the status yet we should not cancel it anyways.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just double checking, can we guarantee that all started TaskRuns will be in the status? im guessing we can but wanted to check: i think this would mean that in the same reconcile loop that we create a TaskRun, we always update the status (i'm wondering also if there was an error partway through if we might end up in an inconsistent state? e.g. we create the taskrun, then a transient networking error prevents us from updating the status?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought about that. In my understanding the reconciler would not take the same key from the queue. When we start to reconcile we get the latest version of the pr from the API, that should have all running taskruns. It might be that a previous reconcile failed to update the status of the pipeline, but the same may be true for the taskrun status too, so I feel we are not introducing a new issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation uses the pipelineState which requires fetching the pipeline and taskruns and tasks and pipeline resources and validating and binding all together... I was hoping to moving the catch of cancel earlier in the reconcile and thus avoid calculating the pipelineState.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be that a previous reconcile failed to update the status of the pipeline, but the same may be true for the taskrun status too, so I feel we are not introducing a new issue.

I guess in that case the next reconcile would notice that the status needed to be updated anyway maybe, then update it, then maybe the "is cancelled" block would catch any taskruns that need to be cancelled that werent before 🤔

it seems like eventually it would end up in the right state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, you make a very good point.
I agree that the scenario you describe is not covered.

Looking deeper into the code, it looks like it is not covered today either 😅
ResolvePipelineRun is used to build the pipeline run state. That function behaves as follows:

  • for each task in the spec:
  • look in the pipelinerun status for a matching taskrun
  • when not found, generate a new name
  • try to fetch the taskruns, if error and error is NotFound, ignore

This means that if we fail to sync the pipelinerun status, we never go a list TaskRuns by owner, so the taskruns created during the reconcile cycle where the issue happened become orphaned. A new name is generated for the tasks so presumably dag.GetSchedulable will return the task again and it will end up running the same task(s) twice.

It is a rather unlikely case to happen, since it requires being able to create a taskrun and a few milliseconds later being unable to update the pipelinerun status, however it may well happen.

I think a way forward would be to ensure that the pipelinerun status is valid.
To make sure that the status is in sync with the list of taskruns owned by the pipelinerun, we can run this check before we try and use the pipelinerun status as source of truth. To do we only need the pipelinerun status and the object ref of the pipelinerun, so we do not need to go through the resolution and validation and conversion and re-conversion madness.

I will propose a patch that does that, keep this on hold, and once (if) the other patch is approved, it should then be fine to merge this roughly as it is today. Does it sound like a plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bobcatfish Since - given #2558 - this patch does not change the situation compared to today - I wonder if we could go ahead with this and fix #2558 then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I setup a unit test to verify that we do not recover orphaned taskruns - assuming that they may happen in case the pipelinerun status sync fails.
This WIP fix shows how we could solve the issue: #2558

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that if we fail to sync the pipelinerun status, we never go a list TaskRuns by owner, so the taskruns created during the reconcile cycle where the issue happened become orphaned. A new name is generated for the tasks so presumably dag.GetSchedulable will return the task again and it will end up running the same task(s) twice.

yikes!! nice find :D

To make sure that the status is in sync with the list of taskruns owned by the pipelinerun, we can run this check before we try and use the pipelinerun status as source of truth

kk sounds good to me!

Thanks for looking into this and opening the other issue 🙏

for taskRunName := range pr.Status.TaskRuns {
logger.Infof("cancelling TaskRun %s", taskRunName)

if _, err := clientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Patch(taskRunName, types.JSONPatchType, b, ""); err != nil {
errs = append(errs, fmt.Errorf("Failed to patch TaskRun `%s` with cancellation: %s", taskRunName, err).Error())
continue
}
}
Expand Down
31 changes: 15 additions & 16 deletions pkg/reconciler/pipelinerun/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
tb "github.com/tektoncd/pipeline/test/builder"
test "github.com/tektoncd/pipeline/test/v1alpha1"
Expand All @@ -32,10 +31,9 @@ import (

func TestCancelPipelineRun(t *testing.T) {
testCases := []struct {
name string
pipelineRun *v1alpha1.PipelineRun
pipelineState []*resources.ResolvedPipelineRunTask
taskRuns []*v1alpha1.TaskRun
name string
pipelineRun *v1alpha1.PipelineRun
taskRuns []*v1alpha1.TaskRun
}{{
name: "no-resolved-taskrun",
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
Expand All @@ -44,28 +42,29 @@ func TestCancelPipelineRun(t *testing.T) {
),
),
}, {
name: "1-of-resolved-taskrun",
name: "1-taskrun",
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunCancelled,
),
tb.PipelineRunStatus(
tb.PipelineRunTaskRunsStatus("t1", &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: "task-1",
})),
),
pipelineState: []*resources.ResolvedPipelineRunTask{
{TaskRunName: "t1", TaskRun: tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
{TaskRunName: "t2"},
},
taskRuns: []*v1alpha1.TaskRun{tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
}, {
name: "resolved-taskruns",
name: "multiple-taskruns",
pipelineRun: tb.PipelineRun("test-pipeline-run-cancelled", tb.PipelineRunNamespace("foo"),
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunCancelled,
),
tb.PipelineRunStatus(
tb.PipelineRunTaskRunsStatus(
"t1", &v1alpha1.PipelineRunTaskRunStatus{PipelineTaskName: "task-1"}),
tb.PipelineRunTaskRunsStatus(
"t2", &v1alpha1.PipelineRunTaskRunStatus{PipelineTaskName: "task-2"})),
),
pipelineState: []*resources.ResolvedPipelineRunTask{
{TaskRunName: "t1", TaskRun: tb.TaskRun("t1", tb.TaskRunNamespace("foo"))},
{TaskRunName: "t2", TaskRun: tb.TaskRun("t2", tb.TaskRunNamespace("foo"))},
},
taskRuns: []*v1alpha1.TaskRun{tb.TaskRun("t1", tb.TaskRunNamespace("foo")), tb.TaskRun("t2", tb.TaskRunNamespace("foo"))},
}}
for _, tc := range testCases {
Expand All @@ -79,7 +78,7 @@ func TestCancelPipelineRun(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, _ := test.SeedTestData(t, ctx, d)
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, tc.pipelineState, c.Pipeline)
err := cancelPipelineRun(logtesting.TestLogger(t), tc.pipelineRun, c.Pipeline)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
resyncPeriod = 10 * time.Hour
)

// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(images pipeline.Images) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
logger := logging.FromContext(ctx)
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/pipelinerun/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
stats.UnitDimensionless)
)

// Recorder holds keys for Tekton metrics
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding docs!

type Recorder struct {
initialized bool

Expand Down
24 changes: 11 additions & 13 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {

// In case of reconcile errors, we store the error in a multierror, attempt
// to update, and return the original error combined with any update error
var merr error
var merr *multierror.Error

if pr.IsDone() {
switch {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :D

case pr.IsDone():
// We may be reading a version of the object that was stored at an older version
// and may not have had all of the assumed default specified.
pr.SetDefaults(contexts.WithUpgradeViaDefaulting(ctx))
Expand All @@ -181,7 +182,13 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
c.Logger.Warnf("Failed to log the metrics : %v", err)
}
}(c.metrics)
} else {
case pr.IsCancelled():
// If the pipelinerun is cancelled, cancel tasks and update status
before := pr.Status.GetCondition(apis.ConditionSucceeded)
merr = multierror.Append(merr, cancelPipelineRun(c.Logger, pr, c.PipelineClientSet))
after := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, pr)
default:
if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "Failed to create tracker for TaskRuns for PipelineRun")
Expand Down Expand Up @@ -228,7 +235,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}(c.metrics)
}

return merr
return merr.ErrorOrNil()
}

func (c *Reconciler) updatePipelineResults(ctx context.Context, pr *v1alpha1.PipelineRun) {
Expand Down Expand Up @@ -527,15 +534,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}
}

// If the pipelinerun is cancelled, cancel tasks and update status
if pr.IsCancelled() {
before := pr.Status.GetCondition(apis.ConditionSucceeded)
err := cancelPipelineRun(c.Logger, pr, pipelineState, c.PipelineClientSet)
after := pr.Status.GetCondition(apis.ConditionSucceeded)
reconciler.EmitEvent(c.Recorder, before, after, pr)
return err
bobcatfish marked this conversation as resolved.
Show resolved Hide resolved
}

if pipelineState.IsBeforeFirstTaskRun() && pr.HasVolumeClaimTemplate() {
// create workspace PVC from template
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {
Expand Down