Skip to content

Commit

Permalink
Feat: support resume a specific suspend step in workflow
Browse files Browse the repository at this point in the history
Signed-off-by: FogDong <dongtianxin.tx@alibaba-inc.com>
  • Loading branch information
FogDong committed Feb 16, 2023
1 parent 319b5c2 commit 1279017
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 198 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ require (
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8
github.com/kubevela/prism v1.7.0-alpha.1
github.com/kubevela/workflow v0.4.1
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107
github.com/kyokomi/emoji v2.2.4+incompatible
github.com/mitchellh/hashstructure/v2 v2.0.1
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/oam-dev/cluster-gateway v1.7.0-alpha.1
github.com/oam-dev/cluster-register v1.0.4-0.20220928064144-5f76a9d7ca8c
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1119,8 +1119,8 @@ github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8 h1:jWkEQVVovRqONGoJ+W
github.com/kubevela/pkg v0.0.0-20230206074514-7c05c32743e8/go.mod h1:zJTitvYbj1Vg4l4FvqjDRJEjufT6GRKs8m+fY3V9d3E=
github.com/kubevela/prism v1.7.0-alpha.1 h1:oeZFn1Oy6gxSSFzMTfsWjLOCKaaooMVm1JGNK4j4Mlo=
github.com/kubevela/prism v1.7.0-alpha.1/go.mod h1:AJSDfdA+RkRSnWx3xEcogbmOTpX+l7RSIwqVHxwUtaI=
github.com/kubevela/workflow v0.4.1 h1:lYeWE9KgSSkb368u8G7cGfyzCz41Am8MdxgViRFJxXE=
github.com/kubevela/workflow v0.4.1/go.mod h1:AX/WL3G/YBkpmNpA/SKKm9M3Y0T9y95gZA8mFWylkyM=
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107 h1:KaNaPokvPAOiwJy8qx2ilLu7dXznATK7N+LE+2yv8aY=
github.com/kubevela/workflow v0.4.1-0.20230215100259-edc78492f107/go.mod h1:U94Hz5rlHPAatN+Birhumly26zjAguMumdhrYk+e5mo=
github.com/kulti/thelper v0.4.0/go.mod h1:vMu2Cizjy/grP+jmsvOFDx1kYP6+PD1lqg4Yu5exl2U=
github.com/kunwardeep/paralleltest v1.0.3/go.mod h1:vLydzomDFpk7yu5UX02RmP0H8QfRPOV/oFhWN85Mjb4=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
Expand Down Expand Up @@ -1260,8 +1260,8 @@ github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQ
github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/hashstructure/v2 v2.0.1 h1:L60q1+q7cXE4JeEJJKMnh2brFIe3rZxCihYAB61ypAY=
github.com/mitchellh/hashstructure/v2 v2.0.1/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4=
github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down
104 changes: 4 additions & 100 deletions pkg/apiserver/domain/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ type PipelineRunService interface {
ListPipelineRuns(ctx context.Context, base apis.PipelineBase) (apis.ListPipelineRunResponse, error)
DeletePipelineRun(ctx context.Context, meta apis.PipelineRunMeta) error
CleanPipelineRuns(ctx context.Context, base apis.PipelineBase) error
StopPipelineRun(ctx context.Context, pipeline apis.PipelineRunBase) error
GetPipelineRunOutput(ctx context.Context, meta apis.PipelineRun, step string) (apis.GetPipelineRunOutputResponse, error)
GetPipelineRunInput(ctx context.Context, meta apis.PipelineRun, step string) (apis.GetPipelineRunInputResponse, error)
GetPipelineRunLog(ctx context.Context, meta apis.PipelineRun, step string) (apis.GetPipelineRunLogResponse, error)
ResumePipelineRun(ctx context.Context, meta apis.PipelineRunMeta) error
ResumePipelineRun(ctx context.Context, meta apis.PipelineRunMeta, step string) error
TerminatePipelineRun(ctx context.Context, meta apis.PipelineRunMeta) error
}

Expand Down Expand Up @@ -332,18 +331,6 @@ func (p pipelineServiceImpl) DeletePipeline(ctx context.Context, pl apis.Pipelin
return nil
}

// StopPipelineRun will stop a pipelineRun
func (p pipelineRunServiceImpl) StopPipelineRun(ctx context.Context, pipelineRun apis.PipelineRunBase) error {
run, err := p.checkRunNotFinished(ctx, pipelineRun)
if err != nil {
return err
}
if err := p.terminatePipelineRun(ctx, run); err != nil {
return err
}
return nil
}

func (p pipelineRunServiceImpl) GetPipelineRunOutput(ctx context.Context, pipelineRun apis.PipelineRun, stepName string) (apis.GetPipelineRunOutputResponse, error) {
outputsSpec := make(map[string]v1alpha1.StepOutputs)
stepOutputs := make([]apis.StepOutputBase, 0)
Expand Down Expand Up @@ -1267,42 +1254,7 @@ func (p pipelineRunServiceImpl) checkRunNotFinished(ctx context.Context, pipelin
return &run, nil
}

func (p pipelineRunServiceImpl) terminatePipelineRun(ctx context.Context, run *v1alpha1.WorkflowRun) error {
run.Status.Terminated = true
run.Status.Suspend = false
steps := run.Status.Steps
for i, step := range steps {
switch step.Phase {
case v1alpha1.WorkflowStepPhaseFailed:
if step.Reason != wfTypes.StatusReasonFailedAfterRetries && step.Reason != wfTypes.StatusReasonTimeout {
steps[i].Reason = wfTypes.StatusReasonTerminate
}
case v1alpha1.WorkflowStepPhaseRunning:
steps[i].Phase = v1alpha1.WorkflowStepPhaseFailed
steps[i].Reason = wfTypes.StatusReasonTerminate
default:
}
for j, sub := range step.SubStepsStatus {
switch sub.Phase {
case v1alpha1.WorkflowStepPhaseFailed:
if sub.Reason != wfTypes.StatusReasonFailedAfterRetries && sub.Reason != wfTypes.StatusReasonTimeout {
steps[i].SubStepsStatus[j].Phase = wfTypes.StatusReasonTerminate
}
case v1alpha1.WorkflowStepPhaseRunning:
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseFailed
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
default:
}
}
}

if err := p.KubeClient.Status().Patch(ctx, run, client.Merge); err != nil {
return err
}
return nil
}

func (p pipelineRunServiceImpl) ResumePipelineRun(ctx context.Context, meta apis.PipelineRunMeta) error {
func (p pipelineRunServiceImpl) ResumePipelineRun(ctx context.Context, meta apis.PipelineRunMeta, step string) error {
project := ctx.Value(&apis.CtxKeyProject).(*model.Project)
run := v1alpha1.WorkflowRun{}
if err := p.KubeClient.Get(ctx, types.NamespacedName{
Expand All @@ -1316,22 +1268,7 @@ func (p pipelineRunServiceImpl) ResumePipelineRun(ctx context.Context, meta apis
return bcode.ErrPipelineRunFinished
}

run.Status.Suspend = false
steps := run.Status.Steps
for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].Phase = v1alpha1.WorkflowStepPhaseSucceeded
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == v1alpha1.WorkflowStepPhaseRunning {
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseSucceeded
}
}
}
if err := p.KubeClient.Status().Patch(ctx, &run, client.Merge); err != nil {
return err
}
return nil
return wfUtils.ResumeWorkflow(ctx, p.KubeClient, &run, step)
}

func (p pipelineRunServiceImpl) TerminatePipelineRun(ctx context.Context, meta apis.PipelineRunMeta) error {
Expand All @@ -1347,40 +1284,7 @@ func (p pipelineRunServiceImpl) TerminatePipelineRun(ctx context.Context, meta a
return bcode.ErrPipelineRunFinished
}

// set the pipeline run terminated to true
run.Status.Terminated = true
// set the pipeline run suspend to false
run.Status.Suspend = false
steps := run.Status.Steps
for i, step := range steps {
switch step.Phase {
case v1alpha1.WorkflowStepPhaseFailed:
if step.Reason != wfTypes.StatusReasonFailedAfterRetries && step.Reason != wfTypes.StatusReasonTimeout {
steps[i].Reason = wfTypes.StatusReasonTerminate
}
case v1alpha1.WorkflowStepPhaseRunning:
steps[i].Phase = v1alpha1.WorkflowStepPhaseFailed
steps[i].Reason = wfTypes.StatusReasonTerminate
default:
}
for j, sub := range step.SubStepsStatus {
switch sub.Phase {
case v1alpha1.WorkflowStepPhaseFailed:
if sub.Reason != wfTypes.StatusReasonFailedAfterRetries && sub.Reason != wfTypes.StatusReasonTimeout {
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
}
case v1alpha1.WorkflowStepPhaseRunning:
steps[i].SubStepsStatus[j].Phase = v1alpha1.WorkflowStepPhaseFailed
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
default:
}
}
}

if err := p.KubeClient.Status().Patch(ctx, &run, client.Merge); err != nil {
return err
}
return nil
return wfUtils.TerminateWorkflow(ctx, p.KubeClient, &run)
}

func checkPipelineSpec(spec model.WorkflowSpec) error {
Expand Down
69 changes: 6 additions & 63 deletions pkg/apiserver/domain/service/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strconv"
"strings"

workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
"helm.sh/helm/v3/pkg/time"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -35,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

workflowv1alpha1 "github.com/kubevela/workflow/api/v1alpha1"
wfContext "github.com/kubevela/workflow/pkg/context"
"github.com/kubevela/workflow/pkg/cue/model/value"
wfTypes "github.com/kubevela/workflow/pkg/types"
Expand All @@ -52,6 +52,7 @@ import (
"github.com/oam-dev/kubevela/pkg/oam"
pkgUtils "github.com/oam-dev/kubevela/pkg/utils"
"github.com/oam-dev/kubevela/pkg/utils/apply"
"github.com/oam-dev/kubevela/pkg/workflow/operation"
)

// LogSourceResource Read the step logs from the pod stdout.
Expand All @@ -76,7 +77,7 @@ type WorkflowService interface {
ListWorkflowRecords(ctx context.Context, workflow *model.Workflow, page, pageSize int) (*apisv1.ListWorkflowRecordsResponse, error)
DetailWorkflowRecord(ctx context.Context, workflow *model.Workflow, recordName string) (*apisv1.DetailWorkflowRecordResponse, error)
SyncWorkflowRecord(ctx context.Context) error
ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error
ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, stepName string) error
TerminateRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error
RollbackRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, revisionName string) (*apisv1.WorkflowRecordBase, error)
GetWorkflowRecordLog(ctx context.Context, record *model.WorkflowRecord, step string) (apisv1.GetPipelineRunLogResponse, error)
Expand Down Expand Up @@ -756,13 +757,13 @@ func (w *workflowServiceImpl) GetWorkflowRecord(ctx context.Context, workflow *m
return res[0].(*model.WorkflowRecord), nil
}

func (w *workflowServiceImpl) ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName string) error {
func (w *workflowServiceImpl) ResumeRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, stepName string) error {
oamApp, err := w.checkRecordRunning(ctx, appModel, workflow.EnvName)
if err != nil {
return err
}

if err := ResumeWorkflow(ctx, w.KubeClient, oamApp); err != nil {
if err := operation.ResumeWorkflow(ctx, w.KubeClient, oamApp, stepName); err != nil {
return err
}

Expand All @@ -778,7 +779,7 @@ func (w *workflowServiceImpl) TerminateRecord(ctx context.Context, appModel *mod
if err != nil {
return err
}
if err := TerminateWorkflow(ctx, w.KubeClient, oamApp); err != nil {
if err := operation.TerminateWorkflow(ctx, w.KubeClient, oamApp); err != nil {
return err
}
if err := w.syncWorkflowStatus(ctx, appModel.PrimaryKey(), oamApp, recordName, oamApp.Name, nil); err != nil {
Expand All @@ -788,64 +789,6 @@ func (w *workflowServiceImpl) TerminateRecord(ctx context.Context, appModel *mod
return nil
}

// ResumeWorkflow resume workflow
func ResumeWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application) error {
app.Status.Workflow.Suspend = false
steps := app.Status.Workflow.Steps
for i, step := range steps {
if step.Type == wfTypes.WorkflowStepTypeSuspend && step.Phase == workflowv1alpha1.WorkflowStepPhaseRunning {
steps[i].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
}
for j, sub := range step.SubStepsStatus {
if sub.Type == wfTypes.WorkflowStepTypeSuspend && sub.Phase == workflowv1alpha1.WorkflowStepPhaseRunning {
steps[i].SubStepsStatus[j].Phase = workflowv1alpha1.WorkflowStepPhaseSucceeded
}
}
}
if err := kubecli.Status().Patch(ctx, app, client.Merge); err != nil {
return err
}
return nil
}

// TerminateWorkflow terminate workflow
func TerminateWorkflow(ctx context.Context, kubecli client.Client, app *v1beta1.Application) error {
// set the workflow terminated to true
app.Status.Workflow.Terminated = true
// set the workflow suspend to false
app.Status.Workflow.Suspend = false
steps := app.Status.Workflow.Steps
for i, step := range steps {
switch step.Phase {
case workflowv1alpha1.WorkflowStepPhaseFailed:
if step.Reason != wfTypes.StatusReasonFailedAfterRetries && step.Reason != wfTypes.StatusReasonTimeout {
steps[i].Reason = wfTypes.StatusReasonTerminate
}
case workflowv1alpha1.WorkflowStepPhaseRunning:
steps[i].Phase = workflowv1alpha1.WorkflowStepPhaseFailed
steps[i].Reason = wfTypes.StatusReasonTerminate
default:
}
for j, sub := range step.SubStepsStatus {
switch sub.Phase {
case workflowv1alpha1.WorkflowStepPhaseFailed:
if sub.Reason != wfTypes.StatusReasonFailedAfterRetries && sub.Reason != wfTypes.StatusReasonTimeout {
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
}
case workflowv1alpha1.WorkflowStepPhaseRunning:
steps[i].SubStepsStatus[j].Phase = workflowv1alpha1.WorkflowStepPhaseFailed
steps[i].SubStepsStatus[j].Reason = wfTypes.StatusReasonTerminate
default:
}
}
}

if err := kubecli.Status().Patch(ctx, app, client.Merge); err != nil {
return err
}
return nil
}

func (w *workflowServiceImpl) RollbackRecord(ctx context.Context, appModel *model.Application, workflow *model.Workflow, recordName, revisionVersion string) (*apisv1.WorkflowRecordBase, error) {
if revisionVersion == "" {
// find the latest complete revision version
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/domain/service/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ var _ = Describe("Test workflow service functions", func() {

err = workflowService.ResumeRecord(ctx, &model.Application{
Name: appName,
}, &model.Workflow{Name: ResumeWorkflow, EnvName: "resume"}, "workflow-resume-1")
}, &model.Workflow{Name: ResumeWorkflow, EnvName: "resume"}, "workflow-resume-1", "")
Expect(err).Should(BeNil())

record, err := workflowService.DetailWorkflowRecord(ctx, &model.Workflow{Name: ResumeWorkflow, AppPrimaryKey: appName}, "workflow-resume-1")
Expand Down
1 change: 1 addition & 0 deletions pkg/apiserver/interfaces/api/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ func (c *application) GetWebServiceRoute() *restful.WebService {
Param(ws.PathParameter("appName", "identifier of the application.").DataType("string").Required(true)).
Param(ws.PathParameter("workflowName", "identifier of the workflow").DataType("string")).
Param(ws.PathParameter("record", "identifier of the workflow record").DataType("string")).
Param(ws.QueryParameter("step", "resume the workflow with specific step").DataType("string")).
Metadata(restfulspec.KeyOpenAPITags, tags).
Filter(c.appCheckFilter).
Filter(c.WorkflowAPI.workflowCheckFilter).
Expand Down
9 changes: 7 additions & 2 deletions pkg/apiserver/interfaces/api/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func initPipelineRoutes(ws *restful.WebService, n *project) {
ws.Route(ws.POST("/{projectName}/pipelines/{pipelineName}/runs/{runName}/resume").To(n.resumePipelineRun).
Doc("resume suspend pipeline run").
Filter(n.RBACService.CheckPerm("project/pipeline/pipelineRun", "resume")).
Param(ws.QueryParameter("step", "resume from specific step name").DataType("string")).
Returns(200, "OK", apis.EmptyResponse{}).
Returns(400, "Bad Request", bcode.Bcode{}).
Writes(apis.EmptyResponse{}).Do(meta, projParam, pipelineParam, runParam))
Expand Down Expand Up @@ -379,7 +380,11 @@ func (n *project) runPipeline(req *restful.Request, res *restful.Response) {

func (n *project) stopPipeline(req *restful.Request, res *restful.Response) {
pipelineRun := req.Request.Context().Value(&apis.CtxKeyPipelineRun).(*apis.PipelineRun)
err := n.PipelineRunService.StopPipelineRun(req.Request.Context(), pipelineRun.PipelineRunBase)
err := n.PipelineRunService.TerminatePipelineRun(req.Request.Context(), apis.PipelineRunMeta{
PipelineName: pipelineRun.PipelineName,
Project: pipelineRun.Project,
PipelineRunName: pipelineRun.PipelineRunName,
})
if err != nil {
klog.Errorf("stop pipeline failure %s", err.Error())
bcode.ReturnError(req, res, err)
Expand Down Expand Up @@ -469,7 +474,7 @@ func (n *project) resumePipelineRun(req *restful.Request, res *restful.Response)
PipelineName: pipeline.Name,
Project: apis.NameAlias{Name: project.Name, Alias: project.Alias},
PipelineRunName: run.PipelineRunName,
})
}, req.QueryParameter("step"))
if err != nil {
bcode.ReturnError(req, res, err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/interfaces/api/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (w *Workflow) detailWorkflowRecord(req *restful.Request, res *restful.Respo
func (w *Workflow) resumeWorkflowRecord(req *restful.Request, res *restful.Response) {
app := req.Request.Context().Value(&apis.CtxKeyApplication).(*model.Application)
workflow := req.Request.Context().Value(&apis.CtxKeyWorkflow).(*model.Workflow)
err := w.WorkflowService.ResumeRecord(req.Request.Context(), app, workflow, req.PathParameter("record"))
err := w.WorkflowService.ResumeRecord(req.Request.Context(), app, workflow, req.PathParameter("record"), req.QueryParameter("step"))
if err != nil {
bcode.ReturnError(req, res, err)
return
Expand Down
Loading

0 comments on commit 1279017

Please sign in to comment.