From e87f227914125389f29ff4f1fb681ff459f7a4eb Mon Sep 17 00:00:00 2001 From: FogDong Date: Mon, 13 Feb 2023 12:10:51 +0800 Subject: [PATCH] resolve the comment Signed-off-by: FogDong --- makefiles/e2e.mk | 3 +- pkg/cue/model/sets/operation.go | 5 +++ pkg/providers/kube/handle_test.go | 19 ++++++++ pkg/utils/operation.go | 75 ++++++++++++++++++++++++++----- pkg/utils/operation_test.go | 44 +++++++++++++----- 5 files changed, 121 insertions(+), 25 deletions(-) diff --git a/makefiles/e2e.mk b/makefiles/e2e.mk index f37ad43..6d2dea2 100644 --- a/makefiles/e2e.mk +++ b/makefiles/e2e.mk @@ -12,7 +12,8 @@ e2e-setup-controller: --set image.tag=latest \ --set image.pullPolicy=IfNotPresent \ --wait vela-workflow \ - ./charts/vela-workflow + ./charts/vela-workflow \ + --debug .PHONY: end-e2e end-e2e: diff --git a/pkg/cue/model/sets/operation.go b/pkg/cue/model/sets/operation.go index 8fc0e78..5966336 100644 --- a/pkg/cue/model/sets/operation.go +++ b/pkg/cue/model/sets/operation.go @@ -298,6 +298,11 @@ func strategyUnify(base cue.Value, patch cue.Value, params *UnifyParams, patchOp baseInst := cuecontext.New().BuildFile(openBase) patchInst := cuecontext.New().BuildFile(patchFile) + // s, _ := ToString(patchInst) + // fmt.Println("======patch", s) + // s, _ = ToString(baseInst) + // fmt.Println("======base", s) + ret := baseInst.Unify(patchInst) _, err = toString(ret, removeTmpVar) diff --git a/pkg/providers/kube/handle_test.go b/pkg/providers/kube/handle_test.go index 6c78fec..04cf461 100644 --- a/pkg/providers/kube/handle_test.go +++ b/pkg/providers/kube/handle_test.go @@ -192,6 +192,25 @@ patch: { sub, err := v.LookupValue("value") Expect(err).ToNot(HaveOccurred()) Expect(sub.Error()).To(BeNil()) + v, err = v.MakeValue(` + cluster: "" + patch: { + metadata: name: "test-app-1" + spec: { + containers: [{ + // +patchStrategy=retainKeys + image: "nginx:latest" + }] + } + }`) + Expect(err).ToNot(HaveOccurred()) + err = v.FillObject(sub, "value") + Expect(err).ToNot(HaveOccurred()) + err = p.Apply(mCtx, ctx, v, nil) + Expect(err).ToNot(HaveOccurred()) + sub2, err := v.LookupValue("value") + Expect(err).ToNot(HaveOccurred()) + Expect(sub2.Error()).To(BeNil()) pod := &corev1.Pod{} Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/utils/operation.go b/pkg/utils/operation.go index d520aac..3bff32b 100644 --- a/pkg/utils/operation.go +++ b/pkg/utils/operation.go @@ -41,19 +41,31 @@ import ( // WorkflowOperator is operation handler for workflow's suspend/resume/rollback/restart/terminate type WorkflowOperator interface { Suspend(ctx context.Context) error - Resume(ctx context.Context, step string) error + Resume(ctx context.Context) error Rollback(ctx context.Context) error - Restart(ctx context.Context, step string) error + Restart(ctx context.Context) error Terminate(ctx context.Context) error } +// WorkflowStepOperator is operation handler for workflow steps' operations +type WorkflowStepOperator interface { + Resume(ctx context.Context, step string) error + Restart(ctx context.Context, step string) error +} + type workflowRunOperator struct { cli client.Client outputWriter io.Writer run *v1alpha1.WorkflowRun } -// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application +type workflowRunStepOperator struct { + cli client.Client + outputWriter io.Writer + run *v1alpha1.WorkflowRun +} + +// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and workflow run func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowOperator { return workflowRunOperator{ cli: cli, @@ -62,6 +74,15 @@ func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.Workfl } } +// NewWorkflowRunStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and workflow run +func NewWorkflowRunStepOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowStepOperator { + return workflowRunStepOperator{ + cli: cli, + outputWriter: w, + run: run, + } +} + // Suspend suspend workflow func (wo workflowRunOperator) Suspend(ctx context.Context) error { run := wo.run @@ -77,11 +98,29 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error { return err } - return wo.writeOutputF("Successfully suspend workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", run.Name) } // Resume resume a suspended workflow -func (wo workflowRunOperator) Resume(ctx context.Context, step string) error { +func (wo workflowRunOperator) Resume(ctx context.Context) error { + run := wo.run + if run.Status.Terminated { + return fmt.Errorf("can not resume a terminated workflow") + } + + if run.Status.Suspend { + if err := ResumeWorkflow(ctx, wo.cli, run, ""); err != nil { + return err + } + } + return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", run.Name) +} + +// Resume resume a suspended workflow from a specific step +func (wo workflowRunStepOperator) Resume(ctx context.Context, step string) error { + if step == "" { + return fmt.Errorf("step can not be empty") + } run := wo.run if run.Status.Terminated { return fmt.Errorf("can not resume a terminated workflow") @@ -92,7 +131,7 @@ func (wo workflowRunOperator) Resume(ctx context.Context, step string) error { return err } } - return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s\n", run.Name, step) } // ResumeWorkflow resume workflow @@ -140,12 +179,24 @@ func (wo workflowRunOperator) Rollback(ctx context.Context) error { } // Restart restart workflow -func (wo workflowRunOperator) Restart(ctx context.Context, step string) error { +func (wo workflowRunOperator) Restart(ctx context.Context) error { + run := wo.run + if err := RestartWorkflow(ctx, wo.cli, run, ""); err != nil { + return err + } + return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", run.Name) +} + +// Restart restart workflow from a specific step +func (wo workflowRunStepOperator) Restart(ctx context.Context, step string) error { + if step == "" { + return fmt.Errorf("step can not be empty") + } run := wo.run if err := RestartWorkflow(ctx, wo.cli, run, step); err != nil { return err } - return wo.writeOutputF("Successfully restart workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", run.Name, step) } // RestartWorkflow restart workflow @@ -179,7 +230,7 @@ func (wo workflowRunOperator) Terminate(ctx context.Context) error { if err := TerminateWorkflow(ctx, wo.cli, run); err != nil { return err } - return wo.writeOutputF("Successfully terminate workflow: %s\n", run.Name) + return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", run.Name) } // TerminateWorkflow terminate workflow @@ -465,10 +516,10 @@ func findDependency(stepName string, dependsOn map[string][]string) []string { return dependency } -func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error { - if wo.outputWriter == nil { +func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error { + if outputWriter == nil { return nil } - _, err := fmt.Fprintf(wo.outputWriter, format, a...) + _, err := fmt.Fprintf(outputWriter, format, a...) return err } diff --git a/pkg/utils/operation_test.go b/pkg/utils/operation_test.go index 159549e..d533a91 100644 --- a/pkg/utils/operation_test.go +++ b/pkg/utils/operation_test.go @@ -418,12 +418,22 @@ func TestResumeWorkflowRun(t *testing.T) { err = cli.Delete(ctx, tc.run) r.NoError(err) }() - operator := NewWorkflowRunOperator(cli, nil, tc.run) - err = operator.Resume(ctx, tc.step) - if tc.expectedErr != "" { - r.Error(err) - r.Equal(tc.expectedErr, err.Error()) - return + if tc.step == "" { + operator := NewWorkflowRunOperator(cli, nil, tc.run) + err = operator.Resume(ctx) + if tc.expectedErr != "" { + r.Error(err) + r.Equal(tc.expectedErr, err.Error()) + return + } + } else { + operator := NewWorkflowRunStepOperator(cli, nil, tc.run) + err = operator.Resume(ctx, tc.step) + if tc.expectedErr != "" { + r.Error(err) + r.Equal(tc.expectedErr, err.Error()) + return + } } r.NoError(err) run := &v1alpha1.WorkflowRun{} @@ -1037,13 +1047,23 @@ func TestRestartRunStep(t *testing.T) { r.NoError(err) }() } - operator := NewWorkflowRunOperator(cli, nil, tc.run) - err = operator.Restart(ctx, tc.stepName) - if tc.expectedErr != "" { - r.Contains(err.Error(), tc.expectedErr) - return + if tc.stepName == "" { + operator := NewWorkflowRunOperator(cli, nil, tc.run) + err = operator.Restart(ctx) + if tc.expectedErr != "" { + r.Contains(err.Error(), tc.expectedErr) + return + } + r.NoError(err) + } else { + operator := NewWorkflowRunStepOperator(cli, nil, tc.run) + err = operator.Restart(ctx, tc.stepName) + if tc.expectedErr != "" { + r.Contains(err.Error(), tc.expectedErr) + return + } + r.NoError(err) } - r.NoError(err) run := &v1alpha1.WorkflowRun{} err = cli.Get(ctx, client.ObjectKey{Name: tc.run.Name}, run) r.NoError(err)