Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow user to restart only failed job #4972

Merged
merged 12 commits into from Feb 21, 2020
1 change: 0 additions & 1 deletion engine/api/api_routes.go
Expand Up @@ -265,7 +265,6 @@ func (api *API) InitRouter() {
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowRunHandler /*, AllowServices(true)*/, EnableTracing()), r.DELETE(api.deleteWorkflowRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/stop", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.stopWorkflowRunHandler, EnableTracing(), MaintenanceAware()))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/vcs/resync", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.postResyncVCSWorkflowRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/resync", Scope(sdk.AuthConsumerScopeRun), r.POST(api.resyncWorkflowRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/artifacts", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowRunArtifactsHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/nodes/{nodeRunID}", Scope(sdk.AuthConsumerScopeRun), r.GET(api.getWorkflowNodeRunHandler))
r.Handle("/project/{key}/workflows/{permWorkflowName}/runs/{number}/nodes/{nodeRunID}/stop", Scope(sdk.AuthConsumerScopeRun), r.POSTEXECUTE(api.stopWorkflowNodeRunHandler, MaintenanceAware()))
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/dao_run.go
Expand Up @@ -483,7 +483,7 @@ func CanBeRun(workflowRun *sdk.WorkflowRun, workflowNodeRun *sdk.WorkflowNodeRun
for _, ancestorID := range ancestorsID {
nodeRuns, ok := workflowRun.WorkflowNodeRuns[ancestorID]
if ok && (len(nodeRuns) == 0 || !sdk.StatusIsTerminated(nodeRuns[0].Status) ||
nodeRuns[0].Status == "" || nodeRuns[0].Status == sdk.StatusNeverBuilt) {
nodeRuns[0].Status == sdk.StatusNeverBuilt) {
return false
}
}
Expand Down
4 changes: 2 additions & 2 deletions engine/api/workflow/dao_run_test.go
Expand Up @@ -61,20 +61,20 @@ func TestCanBeRun(t *testing.T) {

wnr := &sdk.WorkflowNodeRun{
WorkflowNodeID: node1.ID,
Status: sdk.StatusSuccess, // a node node always have a status
}

ts := []struct {
status string
canBeRun bool
}{
{status: sdk.StatusBuilding, canBeRun: false},
{status: "", canBeRun: false},
{status: sdk.StatusSuccess, canBeRun: true},
}

for _, tc := range ts {
wnrs[nodeRoot.ID][0].Status = tc.status
test.Equal(t, workflow.CanBeRun(wr, wnr), tc.canBeRun)
test.Equal(t, tc.canBeRun, workflow.CanBeRun(wr, wnr))
}
}

Expand Down
1 change: 0 additions & 1 deletion engine/api/workflow/execute_node_job_run.go
Expand Up @@ -643,7 +643,6 @@ func RestartWorkflowNodeJob(ctx context.Context, db gorp.SqlExecutor, wNodeJob s
}
}
}

nodeRun, errNR := LoadAndLockNodeRunByID(ctx, db, wNodeJob.WorkflowNodeRunID)
if errNR != nil {
return errNR
Expand Down
77 changes: 70 additions & 7 deletions engine/api/workflow/execute_node_run.go
Expand Up @@ -131,16 +131,41 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
}

stagesTerminated := 0
var previousNodeRun *sdk.WorkflowNodeRun
if nr.Manual != nil && nr.Manual.OnlyFailedJobs {
var err error
previousNodeRun, err = checkRunOnlyFailedJobs(wr, nr)
if err != nil {
return report, err
}
}

//Browse stages
for stageIndex := range nr.Stages {
stage := &nr.Stages[stageIndex]
log.Debug("workflow.executeNodeRun> checking stage %s (status=%s)", stage.Name, stage.Status)
//Initialize stage status at waiting
if stage.Status == "" {
stage.Status = sdk.StatusWaiting
var previousStage sdk.Stage
// Find previous stage
if previousNodeRun != nil {
for i := range previousNodeRun.Stages {
if previousNodeRun.Stages[i].ID == stage.ID {
previousStage = previousNodeRun.Stages[i]
sguiheux marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
}

if stageIndex == 0 {
newStatus = sdk.StatusWaiting
if previousNodeRun == nil || previousStage.Status == sdk.StatusFail || !sdk.StatusIsTerminated(previousStage.Status) {
stage.Status = sdk.StatusWaiting
if stageIndex == 0 {
newStatus = sdk.StatusWaiting
}
} else if sdk.StatusIsTerminated(previousStage.Status) {
// If stage terminated, recopy it
nr.Stages[stageIndex] = previousStage
stagesTerminated++
continue
}

if len(stage.Jobs) == 0 {
Expand All @@ -149,7 +174,7 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
//Add job to Queue
//Insert data in workflow_node_run_job
log.Debug("workflow.executeNodeRun> stage %s call addJobsToQueue", stage.Name)
r, err := addJobsToQueue(ctx, db, stage, wr, nr)
r, err := addJobsToQueue(ctx, db, stage, wr, nr, &previousStage)
report, err = report.Merge(ctx, r, err)
if err != nil {
return report, err
Expand Down Expand Up @@ -344,7 +369,36 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
return report, nil
}

func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage, wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) (*ProcessorReport, error) {
func checkRunOnlyFailedJobs(wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) (*sdk.WorkflowNodeRun, error) {
var previousNR *sdk.WorkflowNodeRun
nrs, ok := wr.WorkflowNodeRuns[nr.WorkflowNodeID]
if !ok {
return nil, sdk.WrapError(sdk.ErrWorkflowNodeNotFound, "node %d not found in workflow run %d", nr.WorkflowNodeID, wr.ID)
}
for i := range nrs {
if nrs[i].SubNumber < nr.SubNumber {
previousNR = &nrs[i]
break
}
}

if previousNR == nil {
return nil, sdk.WrapError(sdk.ErrNotFound, "unable to find a previous execution of this pipeline")
}

if len(previousNR.Stages) != len(nr.Stages) {
return nil, sdk.NewErrorFrom(sdk.ErrForbidden, "you cannot rerun a pipeline that have a different number of stages")
}

for i, s := range nr.Stages {
if len(s.Jobs) != len(previousNR.Stages[i].Jobs) {
return nil, sdk.NewErrorFrom(sdk.ErrForbidden, "you cannot rerun a pipeline that have a different number of jobs")
}
}
return previousNR, nil
}

func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage, wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun, previousStage *sdk.Stage) (*ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.addJobsToQueue")
defer end()
Expand Down Expand Up @@ -378,9 +432,19 @@ func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage,
skippedOrDisabledJobs := 0
failedJobs := 0
//Browse the jobs
jobLoop:
for j := range stage.Jobs {
job := &stage.Jobs[j]

if previousStage != nil {
for _, rj := range previousStage.RunJobs {
if rj.Job.PipelineActionID == job.PipelineActionID && rj.Status != sdk.StatusFail && sdk.StatusIsTerminated(rj.Status) {
stage.RunJobs = append(stage.RunJobs, rj)
continue jobLoop
}
}
}

// errors generated in the loop will be added to job run spawn info
spawnErrs := sdk.MultiError{}

Expand Down Expand Up @@ -1039,7 +1103,6 @@ func getVCSInfos(ctx context.Context, db gorp.SqlExecutor, store cache.Store, pr
if vcsInfos.Hash != "" && (vcsInfos.Author == "" || vcsInfos.Message == "") {
commit, errCm := client.Commit(ctx, vcsInfos.Repository, vcsInfos.Hash)
if errCm != nil {
log.Warning(ctx, "unable to ")
return nil, sdk.WrapError(errCm, "cannot get commit infos for %s %s", vcsInfos.Repository, vcsInfos.Hash)
}
vcsInfos.Author = commit.Author.Name
Expand Down
19 changes: 10 additions & 9 deletions engine/api/workflow/process_node.go
Expand Up @@ -415,7 +415,7 @@ func createWorkflowNodeRun(wr *sdk.WorkflowRun, n *sdk.Node, parents []*sdk.Work
}

// CREATE RUN
run := sdk.WorkflowNodeRun{
nodeRun := sdk.WorkflowNodeRun{
WorkflowID: wr.WorkflowID,
LastModified: time.Now(),
Start: time.Now(),
Expand All @@ -424,26 +424,27 @@ func createWorkflowNodeRun(wr *sdk.WorkflowRun, n *sdk.Node, parents []*sdk.Work
WorkflowRunID: wr.ID,
WorkflowNodeID: n.ID,
WorkflowNodeName: n.Name,
Status: string(sdk.StatusWaiting),
Status: sdk.StatusWaiting,
Stages: stages,
Header: wr.Header,
}

if run.SubNumber >= wr.LastSubNumber {
wr.LastSubNumber = run.SubNumber
if nodeRun.SubNumber >= wr.LastSubNumber {
wr.LastSubNumber = nodeRun.SubNumber
}
if n.Context.ApplicationID != 0 {
run.ApplicationID = n.Context.ApplicationID
nodeRun.ApplicationID = n.Context.ApplicationID
}

parentsIDs := make([]int64, len(parents))
for i := range parents {
parentsIDs[i] = parents[i].ID
}
run.SourceNodeRuns = parentsIDs
run.HookEvent = hookEvent
run.Manual = manual
return &run
nodeRun.SourceNodeRuns = parentsIDs
nodeRun.HookEvent = hookEvent
nodeRun.Manual = manual

return &nodeRun
}

func computePipelineParameters(wr *sdk.WorkflowRun, n *sdk.Node, manual *sdk.WorkflowNodeRunManual) []sdk.Parameter {
Expand Down
56 changes: 12 additions & 44 deletions engine/api/workflow_run.go
Expand Up @@ -264,50 +264,6 @@ func (api *API) getLatestWorkflowRunHandler() service.Handler {
}
}

func (api *API) resyncWorkflowRunHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
key := vars["key"]
name := vars["permWorkflowName"]
number, err := requestVarInt(r, "number")
if err != nil {
return err
}

proj, err := project.Load(api.mustDB(), api.Cache, key, project.LoadOptions.WithIntegrations)
if err != nil {
return sdk.WrapError(err, "unable to load projet")
}

wf, err := workflow.Load(ctx, api.mustDB(), api.Cache, proj, name, workflow.LoadOptions{Minimal: true, DeepPipeline: false})
if err != nil {
return sdk.WrapError(err, "cannot load workflow %s/%s", key, name)
}
if wf.FromRepository != "" {
return sdk.WithStack(sdk.ErrWorkflowAsCodeResync)
}

run, err := workflow.LoadRun(ctx, api.mustDB(), key, name, number, workflow.LoadRunOptions{})
if err != nil {
return sdk.WrapError(err, "Unable to load last workflow run [%s/%d]", name, number)
}

tx, errT := api.mustDB().Begin()
if errT != nil {
return sdk.WrapError(errT, "resyncWorkflowRunHandler> Cannot start transaction")
}

if err := workflow.Resync(ctx, tx, api.Cache, proj, run); err != nil {
return sdk.WrapError(err, "Cannot resync pipelines")
}

if err := tx.Commit(); err != nil {
return sdk.WrapError(err, "Cannot commit transaction")
}
return service.WriteJSON(w, run, http.StatusOK)
}
}

func (api *API) getWorkflowRunHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
Expand Down Expand Up @@ -853,6 +809,11 @@ func (api *API) postWorkflowRunHandler() service.Handler {
return err
}

// Request check
if opts.Manual != nil && opts.Manual.OnlyFailedJobs && opts.Manual.Resync {
return sdk.WrapError(sdk.ErrWrongRequest, "You cannot resync workflow and run only failed jobs")
}

// CHECK IF IT S AN EXISTING RUN
var lastRun *sdk.WorkflowRun
if opts.Number != nil {
Expand Down Expand Up @@ -899,6 +860,13 @@ func (api *API) postWorkflowRunHandler() service.Handler {
var wf *sdk.Workflow
// IF CONTINUE EXISTING RUN
if lastRun != nil {
if opts != nil && opts.Manual != nil && opts.Manual.Resync {
log.Debug("Resync workflow %d for run %d", lastRun.Workflow.ID, lastRun.ID)
if err := workflow.Resync(ctx, api.mustDB(), api.Cache, p, lastRun); err != nil {
return err
}
}

wf = &lastRun.Workflow
// Check workflow name in case of rename
if wf.Name != name {
Expand Down