Skip to content

Commit

Permalink
resolve the comment
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Feb 14, 2023
1 parent 43e1f86 commit e87f227
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 25 deletions.
3 changes: 2 additions & 1 deletion makefiles/e2e.mk
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ e2e-setup-controller:
--set image.tag=latest \
--set image.pullPolicy=IfNotPresent \
--wait vela-workflow \
./charts/vela-workflow
./charts/vela-workflow \
--debug

.PHONY: end-e2e
end-e2e:
Expand Down
5 changes: 5 additions & 0 deletions pkg/cue/model/sets/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ func strategyUnify(base cue.Value, patch cue.Value, params *UnifyParams, patchOp
baseInst := cuecontext.New().BuildFile(openBase)
patchInst := cuecontext.New().BuildFile(patchFile)

// s, _ := ToString(patchInst)
// fmt.Println("======patch", s)
// s, _ = ToString(baseInst)
// fmt.Println("======base", s)

ret := baseInst.Unify(patchInst)

_, err = toString(ret, removeTmpVar)
Expand Down
19 changes: 19 additions & 0 deletions pkg/providers/kube/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,25 @@ patch: {
sub, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub.Error()).To(BeNil())
v, err = v.MakeValue(`
cluster: ""
patch: {
metadata: name: "test-app-1"
spec: {
containers: [{
// +patchStrategy=retainKeys
image: "nginx:latest"
}]
}
}`)
Expect(err).ToNot(HaveOccurred())
err = v.FillObject(sub, "value")
Expect(err).ToNot(HaveOccurred())
err = p.Apply(mCtx, ctx, v, nil)
Expect(err).ToNot(HaveOccurred())
sub2, err := v.LookupValue("value")
Expect(err).ToNot(HaveOccurred())
Expect(sub2.Error()).To(BeNil())

pod := &corev1.Pod{}
Expect(err).ToNot(HaveOccurred())
Expand Down
75 changes: 63 additions & 12 deletions pkg/utils/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,31 @@ import (
// WorkflowOperator is operation handler for workflow's suspend/resume/rollback/restart/terminate
type WorkflowOperator interface {
Suspend(ctx context.Context) error
Resume(ctx context.Context, step string) error
Resume(ctx context.Context) error
Rollback(ctx context.Context) error
Restart(ctx context.Context, step string) error
Restart(ctx context.Context) error
Terminate(ctx context.Context) error
}

// WorkflowStepOperator is operation handler for workflow steps' operations
type WorkflowStepOperator interface {
Resume(ctx context.Context, step string) error
Restart(ctx context.Context, step string) error
}

type workflowRunOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and application
type workflowRunStepOperator struct {
cli client.Client
outputWriter io.Writer
run *v1alpha1.WorkflowRun
}

// NewWorkflowRunOperator get an workflow operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowOperator {
return workflowRunOperator{
cli: cli,
Expand All @@ -62,6 +74,15 @@ func NewWorkflowRunOperator(cli client.Client, w io.Writer, run *v1alpha1.Workfl
}
}

// NewWorkflowRunStepOperator get an workflow step operator with k8sClient, ioWriter(optional, useful for cli) and workflow run
func NewWorkflowRunStepOperator(cli client.Client, w io.Writer, run *v1alpha1.WorkflowRun) WorkflowStepOperator {
return workflowRunStepOperator{
cli: cli,
outputWriter: w,
run: run,
}
}

// Suspend suspend workflow
func (wo workflowRunOperator) Suspend(ctx context.Context) error {
run := wo.run
Expand All @@ -77,11 +98,29 @@ func (wo workflowRunOperator) Suspend(ctx context.Context) error {
return err
}

return wo.writeOutputF("Successfully suspend workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully suspend workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow
func (wo workflowRunOperator) Resume(ctx context.Context, step string) error {
func (wo workflowRunOperator) Resume(ctx context.Context) error {
run := wo.run
if run.Status.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
}

if run.Status.Suspend {
if err := ResumeWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
}
return writeOutputF(wo.outputWriter, "Successfully resume workflow: %s\n", run.Name)
}

// Resume resume a suspended workflow from a specific step
func (wo workflowRunStepOperator) Resume(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if run.Status.Terminated {
return fmt.Errorf("can not resume a terminated workflow")
Expand All @@ -92,7 +131,7 @@ func (wo workflowRunOperator) Resume(ctx context.Context, step string) error {
return err
}
}
return wo.writeOutputF("Successfully resume workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully resume workflow %s from step %s\n", run.Name, step)
}

// ResumeWorkflow resume workflow
Expand Down Expand Up @@ -140,12 +179,24 @@ func (wo workflowRunOperator) Rollback(ctx context.Context) error {
}

// Restart restart workflow
func (wo workflowRunOperator) Restart(ctx context.Context, step string) error {
func (wo workflowRunOperator) Restart(ctx context.Context) error {
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, ""); err != nil {
return err
}
return writeOutputF(wo.outputWriter, "Successfully restart workflow: %s\n", run.Name)
}

// Restart restart workflow from a specific step
func (wo workflowRunStepOperator) Restart(ctx context.Context, step string) error {
if step == "" {
return fmt.Errorf("step can not be empty")
}
run := wo.run
if err := RestartWorkflow(ctx, wo.cli, run, step); err != nil {
return err
}
return wo.writeOutputF("Successfully restart workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully restart workflow %s from step %s\n", run.Name, step)
}

// RestartWorkflow restart workflow
Expand Down Expand Up @@ -179,7 +230,7 @@ func (wo workflowRunOperator) Terminate(ctx context.Context) error {
if err := TerminateWorkflow(ctx, wo.cli, run); err != nil {
return err
}
return wo.writeOutputF("Successfully terminate workflow: %s\n", run.Name)
return writeOutputF(wo.outputWriter, "Successfully terminate workflow: %s\n", run.Name)
}

// TerminateWorkflow terminate workflow
Expand Down Expand Up @@ -465,10 +516,10 @@ func findDependency(stepName string, dependsOn map[string][]string) []string {
return dependency
}

func (wo workflowRunOperator) writeOutputF(format string, a ...interface{}) error {
if wo.outputWriter == nil {
func writeOutputF(outputWriter io.Writer, format string, a ...interface{}) error {
if outputWriter == nil {
return nil
}
_, err := fmt.Fprintf(wo.outputWriter, format, a...)
_, err := fmt.Fprintf(outputWriter, format, a...)
return err
}
44 changes: 32 additions & 12 deletions pkg/utils/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,22 @@ func TestResumeWorkflowRun(t *testing.T) {
err = cli.Delete(ctx, tc.run)
r.NoError(err)
}()
operator := NewWorkflowRunOperator(cli, nil, tc.run)
err = operator.Resume(ctx, tc.step)
if tc.expectedErr != "" {
r.Error(err)
r.Equal(tc.expectedErr, err.Error())
return
if tc.step == "" {
operator := NewWorkflowRunOperator(cli, nil, tc.run)
err = operator.Resume(ctx)
if tc.expectedErr != "" {
r.Error(err)
r.Equal(tc.expectedErr, err.Error())
return
}
} else {
operator := NewWorkflowRunStepOperator(cli, nil, tc.run)
err = operator.Resume(ctx, tc.step)
if tc.expectedErr != "" {
r.Error(err)
r.Equal(tc.expectedErr, err.Error())
return
}
}
r.NoError(err)
run := &v1alpha1.WorkflowRun{}
Expand Down Expand Up @@ -1037,13 +1047,23 @@ func TestRestartRunStep(t *testing.T) {
r.NoError(err)
}()
}
operator := NewWorkflowRunOperator(cli, nil, tc.run)
err = operator.Restart(ctx, tc.stepName)
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
if tc.stepName == "" {
operator := NewWorkflowRunOperator(cli, nil, tc.run)
err = operator.Restart(ctx)
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
}
r.NoError(err)
} else {
operator := NewWorkflowRunStepOperator(cli, nil, tc.run)
err = operator.Restart(ctx, tc.stepName)
if tc.expectedErr != "" {
r.Contains(err.Error(), tc.expectedErr)
return
}
r.NoError(err)
}
r.NoError(err)
run := &v1alpha1.WorkflowRun{}
err = cli.Get(ctx, client.ObjectKey{Name: tc.run.Name}, run)
r.NoError(err)
Expand Down

0 comments on commit e87f227

Please sign in to comment.