Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: support resume a specific suspend step in workflow #133

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ spec:
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
args:
- "-test.coverprofile=/workspace/data/e2e-profile.out"
- "__DEVEL__E2E"
- "-test.run=E2EMain"
- "-test.coverpkg=$(go list ./pkg/...| tr '
' ','| sed 's/,$//g')"
{{ if .Values.admissionWebhooks.enabled }}
- "--use-webhook=true"
- "--webhook-port={{ .Values.webhookService.port }}"
Expand All @@ -132,8 +137,8 @@ spec:
- "--max-workflow-wait-backoff-time={{ .Values.workflow.backoff.maxTime.waitState }}"
- "--max-workflow-failed-backoff-time={{ .Values.workflow.backoff.maxTime.failedState }}"
- "--max-workflow-step-error-retry-times={{ .Values.workflow.step.errorRetryTimes }}"
- "--feature-gates=EnableWatchEventListener={{- .Values.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableWatchEventListener={{- .Values.workflow.enableWatchEventListener | toString -}}"
- "--feature-gates=EnablePatchStatusAtOnce={{- .Values.workflow.enablePatchStatusAtOnce | toString -}}"
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
{{ if .Values.backup.enable }}
Expand Down
3 changes: 2 additions & 1 deletion makefiles/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ e2e-setup-controller:
--set image.tag=latest \
--set image.pullPolicy=IfNotPresent \
--wait vela-workflow \
./charts/vela-workflow
./charts/vela-workflow \
--debug

.PHONY: end-e2e
end-e2e:
Expand Down
5 changes: 5 additions & 0 deletions pkg/cue/model/sets/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ func strategyUnify(base cue.Value, patch cue.Value, params *UnifyParams, patchOp
baseInst := cuecontext.New().BuildFile(openBase)
patchInst := cuecontext.New().BuildFile(patchFile)

// s, _ := ToString(patchInst)
// fmt.Println("======patch", s)
// s, _ = ToString(baseInst)
// fmt.Println("======base", s)

ret := baseInst.Unify(patchInst)

_, err = toString(ret, removeTmpVar)
Expand Down
19 changes: 19 additions & 0 deletions pkg/providers/kube/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ patch: {
sub, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub.Error()).To(BeNil())
v, err = v.MakeValue(`
cluster: ""
patch: {
metadata: name: "test-app-1"
spec: {
containers: [{
// +patchStrategy=retainKeys
image: "nginx:latest"
}]
}
}`)
Expect(err).ToNot(HaveOccurred())
err = v.FillObject(sub, "value")
Expect(err).ToNot(HaveOccurred())
err = p.Apply(mCtx, ctx, v, nil)
Expect(err).ToNot(HaveOccurred())
sub2, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub2.Error()).To(BeNil())

pod := &corev1.Pod{}
Expect(err).ToNot(HaveOccurred())
Expand Down
96 changes: 82 additions & 14 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,29 @@ type WorkflowOperator interface {
Suspend(ctx context.Context) error
Resume(ctx context.Context) error
Rollback(ctx context.Context) error
Restart(ctx context.Context, step string) error
Restart(ctx context.Context) error
Terminate(ctx context.Context) error
}

// WorkflowStepOperator is operation handler for workflow steps' operations
type WorkflowStepOperator interface {
Resume(ctx context.Context, step string) error
Restart(ctx context.Context, step string) error
}

type workflowRunOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application
type workflowRunStepOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowOperator {
return workflowRunOperator{
cli: cli,
Expand All @@ -62,6 +74,15 @@ func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.Workfl
}
}

// NewWorkflowRunStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunStepOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowStepOperator {
return workflowRunStepOperator{
cli: cli,
outputWriter: w,
run: run,
}
}

// Suspend suspend workflow
func (wo workflowRunOperator) Suspend(ctx context.Context) error {
run := wo.run
Expand All @@ -77,7 +98,7 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error {
return err
}

return wo.writeOutputF("Successfully suspend workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow
Expand All @@ -88,27 +109,62 @@ func (wo workflowRunOperator) Resume(ctx context.Context) error {
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run); err != nil {
if err := ResumeWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow from a specific step
func (wo workflowRunStepOperator) Resume(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if run.Status.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
}
return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s\n", run.Name, step)
}

// ResumeWorkflow resume workflow
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun) error {
func ResumeWorkflow(ctx context.Context, cli client.Client, run *v1alpha1.WorkflowRun, stepName string) error {
run.Status.Suspend = false
steps := run.Status.Steps
found := stepName == ""

for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == step.Name {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
if stepName == "" {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
} else if stepName == sub.Name {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
found = true
break
}
}
}
}
if !found {
return fmt.Errorf("can not find step %s", stepName)
}
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return cli.Status().Patch(ctx, run, client.Merge)
}); err != nil {
Expand All @@ -123,12 +179,24 @@ func (wo workflowRunOperator) Rollback(ctx context.Context) error {
}

// Restart restart workflow
func (wo workflowRunOperator) Restart(ctx context.Context, step string) error {
func (wo workflowRunOperator) Restart(ctx context.Context) error {
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", run.Name)
}

// Restart restart workflow from a specific step
func (wo workflowRunStepOperator) Restart(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
return wo.writeOutputF("Successfully restart workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", run.Name, step)
}

// RestartWorkflow restart workflow
Expand Down Expand Up @@ -162,7 +230,7 @@ func (wo workflowRunOperator) Terminate(ctx context.Context) error {
if err := TerminateWorkflow(ctx, wo.cli, run); err != nil {
return err
}
return wo.writeOutputF("Successfully terminate workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", run.Name)
}

// TerminateWorkflow terminate workflow
Expand Down Expand Up @@ -448,10 +516,10 @@ func findDependency(stepName string, dependsOn map[string][]string) []string {
return dependency
}

func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error {
if wo.outputWriter == nil {
func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error {
if outputWriter == nil {
return nil
}
_, err := fmt.Fprintf(wo.outputWriter, format, a...)
_, err := fmt.Fprintf(outputWriter, format, a...)
return err
}
Loading