Skip to content

Commit

Permalink
Feat: add retry failed step operation
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Dec 12, 2022
1 parent a8868ee commit 84f71c7
Show file tree
Hide file tree
Showing 2 changed files with 829 additions and 0 deletions.
221 changes: 221 additions & 0 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ package utils

import (
"context"
"encoding/json"
"fmt"
"io"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
wfTypes "github.com/kubevela/workflow/pkg/types"
)

Expand All @@ -35,6 +39,7 @@ type WorkflowOperator interface {
Rollback(ctx context.Context) error
Restart(ctx context.Context) error
Terminate(ctx context.Context) error
RetryStep(ctx context.Context, step string) error
}

type workflowRunOperator struct {
Expand Down Expand Up @@ -166,6 +171,222 @@ func TerminateWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.Wor
return nil
}

// RetryStep retry a failed step
func (wo workflowRunOperator) RetryStep(ctx context.Context, step string) error {
run := wo.run
if err := RetryWorkflowStep(ctx, wo.cli, run, step); err != nil {
return err
}
return wo.writeOutputF("Successfully retry step[%s] in workflow %s\n", step, run.Name)
}

// RetryWorkflowStep retry a failed workflow step
func RetryWorkflowStep(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error {
if stepName == "" {
return fmt.Errorf("step name can not be empty")
}
run.Status.Terminated = false
run.Status.Suspend = false
run.Status.Finished = false
if !run.Status.EndTime.IsZero() {
run.Status.EndTime = metav1.Time{}
}
stepStatus := run.Status.Steps
mode := run.Status.Mode
found := false

var steps []v1alpha1.WorkflowStep
if run.Spec.WorkflowSpec != nil {
steps = run.Spec.WorkflowSpec.Steps
} else {
workflow := &v1alpha1.Workflow{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: run.Namespace, Name: run.Spec.WorkflowRef}, workflow); err != nil {
return err
}
steps = workflow.Steps
}

dependency := getStepDependency(ctx, cli, steps, stepName)
for i, step := range stepStatus {
if step.Name == stepName {
if step.Phase != v1alpha1.WorkflowStepPhaseFailed {
return fmt.Errorf("can not retry a non-failed step")
}
if mode.Steps == v1alpha1.WorkflowModeDAG {
run.Status.Steps = deleteStepStatus(dependency, stepStatus, stepName)
} else {
run.Status.Steps = run.Status.Steps[:i]
}
found = true
break
}
for j, sub := range step.SubStepsStatus {
if sub.Name == stepName {
if sub.Phase != v1alpha1.WorkflowStepPhaseFailed {
return fmt.Errorf("can not retry a non-failed step")
}
if mode.SubSteps == v1alpha1.WorkflowModeDAG {
stepStatus[i].SubStepsStatus = deleteSubStepStatus(dependency, step.SubStepsStatus, stepName)
} else {
stepStatus[i].SubStepsStatus = stepStatus[i].SubStepsStatus[:j]
}
found = true
break
}
}
}
if !found {
return fmt.Errorf("step %s not found", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Status().Update(ctx, run)
}); err != nil {
return err
}

if run.Status.ContextBackend != nil {
cm := &corev1.ConfigMap{}
if err := cli.Get(ctx, client.ObjectKey{Namespace: run.Namespace, Name: run.Status.ContextBackend.Name}, cm); err != nil {
return err
}
vars := make(map[string]any)
if err := json.Unmarshal([]byte(cm.Data[wfContext.ConfigMapKeyVars]), &vars); err != nil {
return err
}
clearContextVars(steps, vars, stepName, dependency)
b, err := json.Marshal(vars)
if err != nil {
return err
}
cm.Data[wfContext.ConfigMapKeyVars] = string(b)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Update(ctx, cm)
}); err != nil {
return err
}
}
return nil
}

func clearContextVars(steps []v1alpha1.WorkflowStep, vars map[string]any, stepName string, dependency []string) {
for _, step := range steps {
if step.Name == stepName || stringsContain(dependency, step.Name) {
for _, output := range step.Outputs {
delete(vars, output.Name)
}
}
for _, sub := range step.SubSteps {
if sub.Name == stepName || stringsContain(dependency, sub.Name) {
for _, output := range sub.Outputs {
delete(vars, output.Name)
}
}
}
}
}

func deleteStepStatus(dependency []string, steps []v1alpha1.WorkflowStepStatus, stepName string) []v1alpha1.WorkflowStepStatus {
status := make([]v1alpha1.WorkflowStepStatus, 0)
for _, step := range steps {
if !stringsContain(dependency, step.Name) && step.Name != stepName {
status = append(status, step)
}
}
return status
}

func deleteSubStepStatus(dependency []string, subSteps []v1alpha1.StepStatus, stepName string) []v1alpha1.StepStatus {
status := make([]v1alpha1.StepStatus, 0)
for _, step := range subSteps {
if !stringsContain(dependency, step.Name) && step.Name != stepName {
status = append(status, step)
}
}
return status
}

func stringsContain(items []string, source string) bool {
for _, item := range items {
if item == source {
return true
}
}
return false
}

func getStepDependency(ctx context.Context, cli client.Client, steps []v1alpha1.WorkflowStep, stepName string) []string {
dependsOn := make(map[string][]string)
stepOutputs := make(map[string]string)
for _, step := range steps {
for _, output := range step.Outputs {
stepOutputs[output.Name] = step.Name
}
dependsOn[step.Name] = step.DependsOn
for _, sub := range step.SubSteps {
for _, output := range sub.Outputs {
stepOutputs[output.Name] = sub.Name
}
dependsOn[sub.Name] = sub.DependsOn
}
}
for _, step := range steps {
for _, input := range step.Inputs {
if name, ok := stepOutputs[input.From]; ok && !stringsContain(dependsOn[step.Name], name) {
dependsOn[step.Name] = append(dependsOn[step.Name], name)
}
}
for _, sub := range step.SubSteps {
for _, input := range sub.Inputs {
if name, ok := stepOutputs[input.From]; ok && !stringsContain(dependsOn[sub.Name], name) {
dependsOn[sub.Name] = append(dependsOn[sub.Name], name)
}
}
}
}
return findDependency(stepName, dependsOn)
}

func mergeUniqueStringSlice(a, b []string) []string {
for _, item := range b {
if !stringsContain(a, item) {
a = append(a, item)
}
}
return a
}

func findOutputsDependency(outputs []string, steps []v1alpha1.WorkflowStep) []string {
dependency := make([]string, 0)
for _, step := range steps {
for _, input := range step.Inputs {
if stringsContain(outputs, input.From) {
dependency = append(dependency, step.Name)
}
}
for _, sub := range step.SubSteps {
for _, input := range sub.Inputs {
if stringsContain(outputs, input.From) {
dependency = append(dependency, sub.Name)
}
}
}
}
return dependency
}

func findDependency(stepName string, dependsOn map[string][]string) []string {
dependency := make([]string, 0)
for step, deps := range dependsOn {
for _, dep := range deps {
if dep == stepName {
dependency = append(dependency, step)
dependency = append(dependency, findDependency(step, dependsOn)...)
}
}
}
return dependency
}

func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error {
if wo.outputWriter == nil {
return nil
Expand Down
Loading

0 comments on commit 84f71c7

Please sign in to comment.