Skip to content

Commit

Permalink
Feat: add retry failed step operation
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Dec 9, 2022
1 parent 418d0a8 commit dec6859
Show file tree
Hide file tree
Showing 2 changed files with 436 additions and 0 deletions.
64 changes: 64 additions & 0 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -35,6 +36,7 @@ type WorkflowOperator interface {
Rollback(ctx context.Context) error
Restart(ctx context.Context) error
Terminate(ctx context.Context) error
RetryStep(ctx context.Context, step string) error
}

type workflowRunOperator struct {
Expand Down Expand Up @@ -166,6 +168,68 @@ func TerminateWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.Wor
return nil
}

// RetryStep retry a failed step
func (wo workflowRunOperator) RetryStep(ctx context.Context, step string) error {
run := wo.run
if err := RetryWorkflowStep(ctx, wo.cli, run, step); err != nil {
return err
}
return wo.writeOutputF("Successfully retry step[%s] in workflow %s\n", step, run.Name)
}

// RetryWorkflowStep retry a failed workflow step
func RetryWorkflowStep(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error {
if stepName == "" {
return fmt.Errorf("step name can not be empty")
}
run.Status.Terminated = false
run.Status.Suspend = false
run.Status.Finished = false
if !run.Status.EndTime.IsZero() {
run.Status.EndTime = metav1.Time{}
}
steps := run.Status.Steps
mode := run.Status.Mode
found := false
for i, step := range steps {
if step.Name == stepName {
if step.Phase != v1alpha1.WorkflowStepPhaseFailed {
return fmt.Errorf("can not retry a non-failed step")
}
if mode.Steps == v1alpha1.WorkflowModeDAG {
run.Status.Steps = append(run.Status.Steps[:i], run.Status.Steps[i+1:]...)
} else {
run.Status.Steps = run.Status.Steps[:i]
}
found = true
break
}
for j, sub := range step.SubStepsStatus {
if sub.Name == stepName {
if sub.Phase != v1alpha1.WorkflowStepPhaseFailed {
return fmt.Errorf("can not retry a non-failed step")
}
if mode.SubSteps == v1alpha1.WorkflowModeDAG {
steps[i].SubStepsStatus = append(steps[i].SubStepsStatus[:j], steps[i].SubStepsStatus[j+1:]...)
} else {
steps[i].SubStepsStatus = steps[i].SubStepsStatus[:j]
}
found = true
break
}
}
}
if !found {
return fmt.Errorf("step %s not found", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Status().Patch(ctx, run, client.Merge)
}); err != nil {
return err
}
return nil
}

func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error {
if wo.outputWriter == nil {
return nil
Expand Down
Loading

0 comments on commit dec6859

Please sign in to comment.