Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support resume a specific suspend step in workflow #133

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
// WorkflowOperator is operation handler for workflow's suspend/resume/rollback/restart/terminate
type WorkflowOperator interface {
Suspend(ctx context.Context) error
Resume(ctx context.Context) error
Resume(ctx context.Context, step string) error
FogDong marked this conversation as resolved.
Show resolved Hide resolved
Rollback(ctx context.Context) error
Restart(ctx context.Context, step string) error
Terminate(ctx context.Context) error
Expand Down Expand Up @@ -81,34 +81,51 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error {
}

// Resume resume a suspended workflow
func (wo workflowRunOperator) Resume(ctx context.Context) error {
func (wo workflowRunOperator) Resume(ctx context.Context, step string) error {
run := wo.run
if run.Status.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run); err != nil {
if err := ResumeWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
}
return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name)
}

// ResumeWorkflow resume workflow
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun) error {
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error {
run.Status.Suspend = false
steps := run.Status.Steps
found := stepName == ""

for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == step.Name {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == sub.Name {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Status().Patch(ctx, run, client.Merge)
}); err != nil {
Expand Down
121 changes: 117 additions & 4 deletions pkg/utils/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,10 @@ func TestResumeWorkflowRun(t *testing.T) {
ctx := context.Background()

testCases := map[string]struct {
run *v1alpha1.WorkflowRun
expected *v1alpha1.WorkflowRun
run *v1alpha1.WorkflowRun
step string
expected *v1alpha1.WorkflowRun
expectedErr string
}{
"not suspend": {
run: &v1alpha1.WorkflowRun{
Expand Down Expand Up @@ -258,6 +260,18 @@ func TestResumeWorkflowRun(t *testing.T) {
},
},
},
"step not found": {
run: &v1alpha1.WorkflowRun{
ObjectMeta: metav1.ObjectMeta{
Name: "suspend",
},
Status: v1alpha1.WorkflowRunStatus{
Suspend: true,
},
},
step: "not-found",
expectedErr: "can not find step not-found",
},
"suspend step": {
run: &v1alpha1.WorkflowRun{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -300,6 +314,100 @@ func TestResumeWorkflowRun(t *testing.T) {
},
},
},
"resume the specific step": {
step: "step1",
run: &v1alpha1.WorkflowRun{
ObjectMeta: metav1.ObjectMeta{
Name: "resume-specific-step",
},
Status: v1alpha1.WorkflowRunStatus{
Suspend: true,
Steps: []v1alpha1.WorkflowStepStatus{
{
StepStatus: v1alpha1.StepStatus{
Name: "step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
},
{
StepStatus: v1alpha1.StepStatus{
Name: "step2",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
},
},
},
},
expected: &v1alpha1.WorkflowRun{
Status: v1alpha1.WorkflowRunStatus{
Steps: []v1alpha1.WorkflowStepStatus{
{
StepStatus: v1alpha1.StepStatus{
Name: "step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseSucceeded,
},
},
{
StepStatus: v1alpha1.StepStatus{
Name: "step2",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
},
},
},
},
},
"resume the specific sub step": {
step: "sub-step1",
run: &v1alpha1.WorkflowRun{
ObjectMeta: metav1.ObjectMeta{
Name: "resume-specific-sub-step",
},
Status: v1alpha1.WorkflowRunStatus{
Suspend: true,
Steps: []v1alpha1.WorkflowStepStatus{
{
StepStatus: v1alpha1.StepStatus{
Name: "step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []v1alpha1.StepStatus{
{
Name: "sub-step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
},
},
},
},
},
expected: &v1alpha1.WorkflowRun{
Status: v1alpha1.WorkflowRunStatus{
Steps: []v1alpha1.WorkflowStepStatus{
{
StepStatus: v1alpha1.StepStatus{
Name: "step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseRunning,
},
SubStepsStatus: []v1alpha1.StepStatus{
{
Name: "sub-step1",
Type: wfTypes.WorkflowStepTypeSuspend,
Phase: v1alpha1.WorkflowStepPhaseSucceeded,
},
},
},
},
},
},
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
Expand All @@ -311,13 +419,18 @@ func TestResumeWorkflowRun(t *testing.T) {
r.NoError(err)
}()
operator := NewWorkflowRunOperator(cli, nil, tc.run)
err = operator.Resume(ctx)
err = operator.Resume(ctx, tc.step)
if tc.expectedErr != "" {
r.Error(err)
r.Equal(tc.expectedErr, err.Error())
return
}
r.NoError(err)
run := &v1alpha1.WorkflowRun{}
err = cli.Get(ctx, client.ObjectKey{Name: tc.run.Name}, run)
r.NoError(err)
r.Equal(false, run.Status.Suspend)
r.Equal(tc.expected.Status, run.Status)
r.Equal(tc.expected.Status, run.Status, name)
})
}
}
Expand Down