Skip to content

Commit

Permalink
Implement fix for PipelineRuns getting stuck in the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaeLeal committed Mar 24, 2023
1 parent 40763d8 commit 3e69fb8
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 11 deletions.
13 changes: 13 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bo
return false
}

// HasTimedOutForALongTime returns true if a pipelinerun has exceeed its spec.Timeout based its status.StartTime
// by a large margin
func (pr *PipelineRun) HasTimedOutForALongTime(ctx context.Context, c clock.PassiveClock) bool {
if !pr.HasTimedOut(ctx, c) {
return false
}
timeout := pr.PipelineTimeout(ctx)
startTime := pr.Status.StartTime
runtime := c.Since(startTime.Time)
// We are arbitrarily defining large margin as doubling the spec.timeout
return runtime >= 2*timeout
}

// HaveTasksTimedOut returns true if a pipelinerun has exceeded its spec.Timeouts.Tasks
func (pr *PipelineRun) HaveTasksTimedOut(ctx context.Context, c clock.PassiveClock) bool {
timeout := pr.TasksTimeout()
Expand Down
61 changes: 61 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,67 @@ func TestPipelineRunHasStarted(t *testing.T) {
}
}

func TestPipelineRunHasTimedOutForALongTime(t *testing.T) {
tcs := []struct {
name string
timeout time.Duration
starttime time.Time
expected bool
}{{
name: "has timed out for a long time",
timeout: 1 * time.Hour,
starttime: now.Add(-2 * time.Hour),
expected: true,
}, {
name: "has timed out for not a long time",
timeout: 1 * time.Hour,
starttime: now.Add(-90 * time.Minute),
expected: false,
}, {
name: "has not timed out",
timeout: 1 * time.Hour,
starttime: now.Add(-30 * time.Minute),
expected: false,
}, {
name: "has no timeout specified",
timeout: 0 * time.Second,
starttime: now.Add(-24 * time.Hour),
expected: false,
}}

for _, tc := range tcs {
t.Run("pipeline.timeout "+tc.name, func(t *testing.T) {
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: v1beta1.PipelineRunSpec{
Timeout: &metav1.Duration{Duration: tc.timeout},
},
Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{Time: tc.starttime},
}},
}
if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected {
t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeout", tc.expected)
}
})
t.Run("pipeline.timeouts.pipeline "+tc.name, func(t *testing.T) {
pr := &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: v1beta1.PipelineRunSpec{
Timeouts: &v1beta1.TimeoutFields{Pipeline: &metav1.Duration{Duration: tc.timeout}},
},
Status: v1beta1.PipelineRunStatus{PipelineRunStatusFields: v1beta1.PipelineRunStatusFields{
StartTime: &metav1.Time{Time: tc.starttime},
}},
}

if pr.HasTimedOutForALongTime(context.Background(), testClock) != tc.expected {
t.Errorf("Expected HasTimedOut to be %t when using pipeline.timeouts.pipeline", tc.expected)
}
})
}
}

func TestPipelineRunHasTimedOut(t *testing.T) {
tcs := []struct {
name string
Expand Down
14 changes: 14 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,20 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

// Check if we are failing to mark this as timed out for a while. If we are, mark immediately and finish the
// reconcile. We are assuming here that if the PipelineRun has timed out for a long time, it had time to run the
// before and it kept failing. One reason that can happen is exceeding etcd request size limit. Finishing it early
// makes sure the request size is manageable
if pr.HasTimedOutForALongTime(ctx, c.Clock) && !isPipelineRunTimeoutConditionSet(pr) {
if err := timeoutPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil {
return err
}
if err := c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil); err != nil {
return err
}
return controller.NewPermanentError(errors.New("PipelineRun has timed out for a long time"))
}

if !pr.HasStarted() && !pr.IsPending() {
pr.Status.InitializeConditions(c.Clock)
// In case node time was not synchronized, when controller has been scheduled to other nodes.
Expand Down
152 changes: 149 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"sigs.k8s.io/yaml"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/go-containerregistry/pkg/registry"
Expand Down Expand Up @@ -1575,7 +1577,7 @@ status:
reason: Running
status: Unknown
type: Succeeded
startTime: "2021-12-31T00:00:00Z"
startTime: "2021-12-31T11:00:00Z"
childReferences:
- name: test-pipeline-run-custom-task-hello-world-1
pipelineTaskName: hello-world-1
Expand Down Expand Up @@ -2229,7 +2231,7 @@ spec:
serviceAccountName: test-sa
timeout: 12h0m0s
status:
startTime: "2021-12-31T00:00:00Z"
startTime: "2021-12-31T11:00:00Z"
`)}
ts := []*v1beta1.Task{simpleHelloWorldTask}

Expand Down Expand Up @@ -2291,7 +2293,7 @@ spec:
timeouts:
pipeline: 12h0m0s
status:
startTime: "2021-12-31T00:00:00Z"
startTime: "2021-12-31T11:00:00Z"
childReferences:
- name: test-pipeline-run-with-timeout-hello-world-1
pipelineTaskName: hello-world-1
Expand Down Expand Up @@ -2351,6 +2353,148 @@ spec:
}
}

func TestReconcileWithTimeoutForALongTimeAndEtcdLimit_Pipeline(t *testing.T) {
timeout := 12 * time.Hour
testCases := []struct {
name string
startTime time.Time
wantError error
}{
{
name: "pipelinerun has timed out for way too much time",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-3 * timeout),
wantError: errors.New("PipelineRun has timed out for a long time"),
},
{
name: "pipelinerun has timed out for a long time",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-2 * timeout),
wantError: errors.New("PipelineRun has timed out for a long time"),
},
{
name: "pipelinerun has timed out for a while",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-(3 / 2) * timeout),
wantError: errors.New("etcdserver: request too large"),
},
{
name: "pipelinerun has just timed out",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-timeout),
wantError: errors.New("etcdserver: request too large"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ps := []*v1beta1.Pipeline{parse.MustParseV1beta1Pipeline(t, `
metadata:
name: test-pipeline
namespace: foo
spec:
tasks:
- name: hello-world-1
taskRef:
name: hello-world
- name: hello-world-2
taskRef:
name: hello-world
`)}
prs := []*v1beta1.PipelineRun{parse.MustParseV1beta1PipelineRun(t, `
metadata:
name: test-pipeline-run-with-timeout
namespace: foo
spec:
pipelineRef:
name: test-pipeline
serviceAccountName: test-sa
timeouts:
pipeline: 12h0m0s
status:
startTime: "2021-12-30T00:00:00Z"
`)}
ts := []*v1beta1.Task{simpleHelloWorldTask}

trs := []*v1beta1.TaskRun{mustParseTaskRunWithObjectMeta(t, taskRunObjectMeta("test-pipeline-run-with-timeout-hello-world-1", "foo", "test-pipeline-run-with-timeout",
"test-pipeline", "hello-world-1", false), `
spec:
resources: {}
serviceAccountName: test-sa
taskRef:
name: hello-world
kind: Task
`)}
start := metav1.NewTime(tc.startTime)
prs[0].Status.StartTime = &start

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{
"Warning Failed PipelineRun \"test-pipeline-run-with-timeout\" failed to finish within \"12h0m0s\"",
}

// this limit is just enough to set the timeout condition, but not enough for extra metadata.
etcdRequestSizeLimit := 650
prt.TestAssets.Clients.Pipeline.PrependReactor("update", "pipelineruns", withEtcdRequestSizeLimit(t, etcdRequestSizeLimit))

c := prt.TestAssets.Controller
clients := prt.TestAssets.Clients
reconcileError := c.Reconciler.Reconcile(prt.TestAssets.Ctx, "foo/test-pipeline-run-with-timeout")
if tc.wantError != nil {
if reconcileError == nil {
t.Fatalf("expected error %q, but got nil", tc.wantError.Error())
}
if reconcileError.Error() != tc.wantError.Error() {
t.Fatalf("Expected error: %s Got: %s", tc.wantError, reconcileError)
}
return
}
if reconcileError != nil {
t.Fatalf("Reconcile error: %s", reconcileError)
}
prt.Test.Logf("Getting reconciled run")
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get(prt.TestAssets.Ctx, "test-pipeline-run-with-timeout", metav1.GetOptions{})
if err != nil {
prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
prt.Test.Logf("Getting events")
// Check generated events match what's expected
if err := k8sevent.CheckEventsOrdered(prt.Test, prt.TestAssets.Recorder.Events, "test-pipeline-run-with-timeout", wantEvents); err != nil {
prt.Test.Errorf(err.Error())
}

// The PipelineRun should be timed out.
if reconciledRun.Status.GetCondition(apis.ConditionSucceeded).Reason != "PipelineRunTimeout" {
t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(apis.ConditionSucceeded))
}
})
}
}

// withEtcdRequestSizeLimit calculates the yaml marshal of the payload and gives an `etcdserver: request too large` when
// the limit is reached
func withEtcdRequestSizeLimit(t *testing.T, limit int) ktesting.ReactionFunc {
t.Helper()
return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(ktesting.UpdateAction).GetObject()
bytes, err := yaml.Marshal(obj)
if err != nil {
t.Fatalf("returned a unserializable status: %+v", obj)
}

if len(bytes) > limit {
t.Logf("request size: %d\nrequest limit: %d\n", len(bytes), limit)
t.Logf("payload:\n%s\n", string(bytes))
return true, nil, errors.New("etcdserver: request too large")
}
return false, nil, nil
}
}

func TestReconcileWithTimeouts_Tasks(t *testing.T) {
// TestReconcileWithTimeouts_Tasks runs "Reconcile" on a PipelineRun with timeouts.tasks configured.
// It verifies that reconcile is successful, no TaskRun is created, the PipelineTask is marked as skipped, and the
Expand Down Expand Up @@ -6349,11 +6493,13 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE
} else if reconcileError != nil {
prt.Test.Fatalf("Error reconciling: %s", reconcileError)
}
prt.Test.Logf("Getting reconciled run")
// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns(namespace).Get(prt.TestAssets.Ctx, pipelineRunName, metav1.GetOptions{})
if err != nil {
prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
prt.Test.Logf("Getting events")

// Check generated events match what's expected
if len(wantEvents) > 0 {
Expand Down
25 changes: 17 additions & 8 deletions pkg/reconciler/pipelinerun/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,29 @@ func init() {
}
}

// setPipelineRunTimeoutCondition sets the status of the PipelineRun to timed out.
func setPipelineRunTimeoutCondition(ctx context.Context, pr *v1beta1.PipelineRun) {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonTimedOut.String(),
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()),
})
}

// isPipelineRunTimeoutConditionSet returns true when the pipelinerun has the pipelinerun timed out reason
func isPipelineRunTimeoutConditionSet(pr *v1beta1.PipelineRun) bool {
condition := pr.Status.GetCondition(apis.ConditionSucceeded)
return condition.IsFalse() && condition.Reason == v1beta1.PipelineRunReasonTimedOut.String()
}

// timeoutPipelineRun marks the PipelineRun as timed out and any resolved TaskRun(s) too.
func timeoutPipelineRun(ctx context.Context, logger *zap.SugaredLogger, pr *v1beta1.PipelineRun, clientSet clientset.Interface) error {
errs := timeoutPipelineTasks(ctx, logger, pr, clientSet)

// If we successfully timed out all the TaskRuns and Runs, we can consider the PipelineRun timed out.
if len(errs) == 0 {
reason := v1beta1.PipelineRunReasonTimedOut.String()

pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()),
})
setPipelineRunTimeoutCondition(ctx, pr)
// update pr completed time
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}
} else {
Expand Down

0 comments on commit 3e69fb8

Please sign in to comment.