Skip to content

Commit

Permalink
Implement fix for PipelineRuns getting stuck in the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
RafaeLeal committed Feb 1, 2023
1 parent bcabd01 commit bcfc8b2
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 8 deletions.
13 changes: 13 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ func (pr *PipelineRun) HasTimedOut(ctx context.Context, c clock.PassiveClock) bo
return false
}

// HasTimedOutForALongTime returns true if a pipelinerun has exceeed its spec.Timeout based its status.StartTime
// by a large margin
func (pr *PipelineRun) HasTimedOutForALongTime(ctx context.Context, c clock.PassiveClock) bool {
if !pr.HasTimedOut(ctx, c) {
return false
}
timeout := pr.PipelineTimeout(ctx)
startTime := pr.Status.StartTime
runtime := c.Since(startTime.Time)
// We are arbitrarily defining large margin as doubling the spec.timeout
return runtime >= 2*timeout
}

// HaveTasksTimedOut returns true if a pipelinerun has exceeded its spec.Timeouts.Tasks
func (pr *PipelineRun) HaveTasksTimedOut(ctx context.Context, c clock.PassiveClock) bool {
timeout := pr.TasksTimeout()
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,18 @@ func (c *Reconciler) ReconcileKind(ctx context.Context, pr *v1beta1.PipelineRun)
// Read the initial condition
before := pr.Status.GetCondition(apis.ConditionSucceeded)

// Check if we are failing to mark this as timed out for a while. If we are, mark immediately and finish the
// reconcile
if pr.HasTimedOutForALongTime(ctx, c.Clock) && !isPipelineRunTimeoutConditionSet(pr) {
setPipelineRunTimeoutCondition(ctx, pr)

if err := c.finishReconcileUpdateEmitEvents(ctx, pr, before, nil); err != nil {
return err
}

return nil
}

if !pr.HasStarted() && !pr.IsPending() {
pr.Status.InitializeConditions(c.Clock)
// In case node time was not synchronized, when controller has been scheduled to other nodes.
Expand Down
150 changes: 150 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"testing"
"time"

"sigs.k8s.io/yaml"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/go-containerregistry/pkg/registry"
Expand Down Expand Up @@ -2783,6 +2785,152 @@ spec:
}
}

func TestReconcileWithTimeoutForALongTimeAndEtcdLimit_Pipeline(t *testing.T) {
timeout := 12 * time.Hour
testCases := []struct {
name string
startTime time.Time
wantError error
}{
{
name: "pipelinerun has timed out for way too much time",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-3 * timeout),
},
{
name: "pipelinerun has timed out for a long time",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-2 * timeout),
},
{
name: "pipelinerun has timed out for a while",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-(3 / 2) * timeout),
wantError: errors.New("etcdserver: request too large"),
},
{
name: "pipelinerun has just timed out",
startTime: time.Date(2022, time.January, 1, 0, 0, 0, 0, time.UTC).Add(-timeout),
wantError: errors.New("etcdserver: request too large"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

ps := []*v1beta1.Pipeline{parse.MustParseV1beta1Pipeline(t, `
metadata:
name: test-pipeline
namespace: foo
spec:
tasks:
- name: hello-world-1
taskRef:
name: hello-world
- name: hello-world-2
taskRef:
name: hello-world
`)}
prs := []*v1beta1.PipelineRun{parse.MustParseV1beta1PipelineRun(t, `
metadata:
name: test-pipeline-run-with-timeout
namespace: foo
spec:
pipelineRef:
name: test-pipeline
serviceAccountName: test-sa
timeouts:
pipeline: 12h0m0s
status:
startTime: "2021-12-30T00:00:00Z"
`)}
ts := []*v1beta1.Task{simpleHelloWorldTask}

trs := []*v1beta1.TaskRun{mustParseTaskRunWithObjectMeta(t, taskRunObjectMeta("test-pipeline-run-with-timeout-hello-world-1", "foo", "test-pipeline-run-with-timeout",
"test-pipeline", "hello-world-1", false), `
spec:
resources: {}
serviceAccountName: test-sa
taskRef:
name: hello-world
kind: Task
`)}
start := metav1.NewTime(tc.startTime)
prs[0].Status.StartTime = &start

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

wantEvents := []string{
"Warning Failed PipelineRun \"test-pipeline-run-with-timeout\" failed to finish within \"12h0m0s\"",
}

// this limit is just enough to set the timeout condition, but not enough for extra metadata.
etcdRequestSizeLimit := 650
prt.TestAssets.Clients.Pipeline.PrependReactor("update", "pipelineruns", withEtcdRequestSizeLimit(t, etcdRequestSizeLimit))

c := prt.TestAssets.Controller
clients := prt.TestAssets.Clients
reconcileError := c.Reconciler.Reconcile(prt.TestAssets.Ctx, "foo/test-pipeline-run-with-timeout")
if tc.wantError != nil {
if reconcileError == nil {
t.Fatalf("expected error %q, but got nil", tc.wantError.Error())
}
if reconcileError.Error() != tc.wantError.Error() {
t.Fatalf("Expected error: %s Got: %s", tc.wantError, reconcileError)
}
return
}
if reconcileError != nil {
t.Fatalf("Reconcile error: %s", reconcileError)
}
prt.Test.Logf("Getting reconciled run")
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns("foo").Get(prt.TestAssets.Ctx, "test-pipeline-run-with-timeout", metav1.GetOptions{})
if err != nil {
prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
prt.Test.Logf("Getting events")
// Check generated events match what's expected
if err := k8sevent.CheckEventsOrdered(prt.Test, prt.TestAssets.Recorder.Events, "test-pipeline-run-with-timeout", wantEvents); err != nil {
prt.Test.Errorf(err.Error())
}

// The PipelineRun should be timed out.
if reconciledRun.Status.GetCondition(apis.ConditionSucceeded).Reason != "PipelineRunTimeout" {
t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(apis.ConditionSucceeded))
}
})
}
}

// withEtcdRequestSizeLimit calculates the yaml marshal of the payload and gives an `etcdserver: request too large` when
// the limit is reached
func withEtcdRequestSizeLimit(t *testing.T, limit int) ktesting.ReactionFunc {
return func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
obj := action.(ktesting.UpdateAction).GetObject()
bytes, err := yaml.Marshal(obj)
if err != nil {
t.Fatalf("returned a unserializable status: %+v", obj)
}

if len(bytes) > limit {
t.Logf("request size: %d\nrequest limit: %d\n", len(bytes), limit)
t.Logf("payload:\n%s\n", string(bytes))
// actionBytes, err := yaml.Marshal(action)
// if err != nil {
// t.Fatalf("returned a unserializable action: %+v", action)
// }
// t.Logf("action:\n%s\n", string(actionBytes))
return true, nil, errors.New("etcdserver: request too large")
}
// return true, obj, nil
return false, nil, nil
}
}

func TestReconcileWithTimeouts_Tasks(t *testing.T) {
// TestReconcileWithTimeouts_Tasks runs "Reconcile" on a PipelineRun with timeouts.tasks configured.
// It verifies that reconcile is successful, no TaskRun is created, the PipelineTask is marked as skipped, and the
Expand Down Expand Up @@ -7243,11 +7391,13 @@ func (prt PipelineRunTest) reconcileRun(namespace, pipelineRunName string, wantE
} else if reconcileError != nil {
prt.Test.Fatalf("Error reconciling: %s", reconcileError)
}
prt.Test.Logf("Getting reconciled run")
// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.TektonV1beta1().PipelineRuns(namespace).Get(prt.TestAssets.Ctx, pipelineRunName, metav1.GetOptions{})
if err != nil {
prt.Test.Fatalf("Somehow had error getting reconciled run out of fake client: %s", err)
}
prt.Test.Logf("Getting events")

// Check generated events match what's expected
if len(wantEvents) > 0 {
Expand Down
25 changes: 17 additions & 8 deletions pkg/reconciler/pipelinerun/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,29 @@ func init() {
}
}

// setPipelineRunTimeoutCondition sets the status of the PipelineRun to timed out.
func setPipelineRunTimeoutCondition(ctx context.Context, pr *v1beta1.PipelineRun) {
pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: v1beta1.PipelineRunReasonTimedOut.String(),
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()),
})
}

// isPipelineRunTimeoutConditionSet returns true when the pipelinerun has the pipelinerun timed out reason
func isPipelineRunTimeoutConditionSet(pr *v1beta1.PipelineRun) bool {
condition := pr.Status.GetCondition(apis.ConditionSucceeded)
return condition.IsFalse() && condition.Reason == v1beta1.PipelineRunReasonTimedOut.String()
}

// timeoutPipelineRun marks the PipelineRun as timed out and any resolved TaskRun(s) too.
func timeoutPipelineRun(ctx context.Context, logger *zap.SugaredLogger, pr *v1beta1.PipelineRun, clientSet clientset.Interface) error {
errs := timeoutPipelineTasks(ctx, logger, pr, clientSet)

// If we successfully timed out all the TaskRuns and Runs, we can consider the PipelineRun timed out.
if len(errs) == 0 {
reason := v1beta1.PipelineRunReasonTimedOut.String()

pr.Status.SetCondition(&apis.Condition{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: reason,
Message: fmt.Sprintf("PipelineRun %q failed to finish within %q", pr.Name, pr.PipelineTimeout(ctx).String()),
})
setPipelineRunTimeoutCondition(ctx, pr)
// update pr completed time
pr.Status.CompletionTime = &metav1.Time{Time: time.Now()}
} else {
Expand Down

0 comments on commit bcfc8b2

Please sign in to comment.