Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): release mutex when stopping a node run #5216

Merged
merged 4 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,13 @@ func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*s
testsField = withLightNodeRunTestsField
}

query := fmt.Sprintf(`select %s %s
from workflow_node_run
where workflow_node_run.id = $1`, nodeRunFields, testsField)
query := fmt.Sprintf(`
SELECT %s %s
FROM workflow_node_run
WHERE workflow_node_run.id = $1
`, nodeRunFields, testsField)
if err := db.SelectOne(&rr, query, id); err != nil {
return nil, sdk.WrapError(err, "Unable to load workflow_node_run node=%d", id)
return nil, sdk.WrapError(err, "unable to load workflow_node_run with id %d", id)
}

r, err := fromDBNodeRun(rr, loadOpts)
Expand All @@ -207,23 +209,22 @@ func LoadNodeRunByID(db gorp.SqlExecutor, id int64, loadOpts LoadRunOptions) (*s
}

if loadOpts.WithArtifacts {
arts, errA := loadArtifactByNodeRunID(db, r.ID)
if errA != nil {
return nil, sdk.WrapError(errA, "LoadNodeRunByID>Error loading artifacts for run %d", r.ID)
arts, err := loadArtifactByNodeRunID(db, r.ID)
if err != nil {
return nil, sdk.WrapError(err, "cannot load artifacts for workflow node run %d", r.ID)
}
r.Artifacts = arts
}

if loadOpts.WithStaticFiles {
staticFiles, errS := loadStaticFilesByNodeRunID(db, r.ID)
if errS != nil {
return nil, sdk.WrapError(errS, "LoadNodeRunByID>Error loading static files for run %d", r.ID)
staticFiles, err := loadStaticFilesByNodeRunID(db, r.ID)
if err != nil {
return nil, sdk.WrapError(err, "cannot load static files for workflow node run %d", r.ID)
}
r.StaticFiles = staticFiles
}

return r, nil

}

//insertWorkflowNodeRun insert in table workflow_node_run
Expand Down
159 changes: 92 additions & 67 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/fsamin/go-dump"
"github.com/go-gorp/gorp"
"github.com/sirupsen/logrus"

"github.com/ovh/cds/engine/api/action"
"github.com/ovh/cds/engine/api/cache"
Expand Down Expand Up @@ -285,7 +286,7 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
}

// If pipeline build succeed, reprocess the workflow (in the same transaction)
//Delete jobs only when node is over
// Delete jobs only when node is over
if sdk.StatusIsTerminated(nr.Status) {
if nr.Status != sdk.StatusStopped {
r1, _, err := processWorkflowDataRun(ctx, db, store, proj, updatedWorkflowRun, nil, nil, nil)
Expand All @@ -295,82 +296,100 @@ func executeNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
report.Merge(ctx, r1)
}

//Delete the line in workflow_node_run_job
// Delete the line in workflow_node_run_job
if err := DeleteNodeJobRuns(db, nr.ID); err != nil {
return nil, sdk.WrapError(err, "unable to delete node %d job runs", nr.ID)
}

var hasMutex bool
var nodeName string

node := updatedWorkflowRun.Workflow.WorkflowData.NodeByID(nr.WorkflowNodeID)
if node != nil && node.Context != nil && node.Context.Mutex {
hasMutex = node.Context.Mutex
nodeName = node.Name
}

//Do we release a mutex ?
//Try to find one node run of the same node from the same workflow at status Waiting
// If current node has a mutex, we want to trigger another node run that can be waiting for the mutex
if hasMutex {
_, next := observability.Span(ctx, "workflow.releaseMutex")

mutexQuery := `select workflow_node_run.id
from workflow_node_run
join workflow_run on workflow_run.id = workflow_node_run.workflow_run_id
join workflow on workflow.id = workflow_run.workflow_id
where workflow.id = $1
and workflow_node_run.workflow_node_name = $2
and workflow_node_run.status = $3
order by workflow_node_run.start asc
limit 1`
waitingRunID, errID := db.SelectInt(mutexQuery, updatedWorkflowRun.WorkflowID, nodeName, string(sdk.StatusWaiting))
if errID != nil && errID != sql.ErrNoRows {
log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow node run ID: %v", errID)
return report, nil
}
//If not more run is found, stop the loop
if waitingRunID == 0 {
return report, nil
}
waitingRun, errRun := LoadNodeRunByID(db, waitingRunID, LoadRunOptions{})
if errRun != nil && sdk.Cause(errRun) != sql.ErrNoRows {
log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errRun)
return report, nil
}
//If not more run is found, stop the loop
if waitingRun == nil {
return report, nil
}

//Here we are loading another workflow run
workflowRun, errWRun := LoadRunByID(db, waitingRun.WorkflowRunID, LoadRunOptions{})
if errWRun != nil {
log.Error(ctx, "workflow.execute> Unable to load mutex-locked workflow rnode un: %v", errWRun)
return report, nil
}
AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{
ID: sdk.MsgWorkflowNodeMutexRelease.ID,
Args: []interface{}{waitingRun.WorkflowNodeName},
Type: sdk.MsgWorkflowNodeMutexRelease.Type,
})

if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil {
return nil, sdk.WrapError(err, "unable to update workflow run %d after mutex release", workflowRun.ID)
}

log.Debug("workflow.execute> process the node run %d because mutex has been released", waitingRun.ID)
r, err := executeNodeRun(ctx, db, store, proj, waitingRun)
r, err := releaseMutex(ctx, db, store, proj, updatedWorkflowRun.WorkflowID, nodeName)
report.Merge(ctx, r)
if err != nil {
return nil, sdk.WrapError(err, "unable to reprocess workflow")
return report, err
}

next()
}
}
return report, nil
}

func releaseMutex(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj sdk.Project, workflowID int64, nodeName string) (*ProcessorReport, error) {
_, next := observability.Span(ctx, "workflow.releaseMutex")
defer next()

mutexQuery := `
SELECT workflow_node_run.id
FROM workflow_node_run
JOIN workflow_run on workflow_run.id = workflow_node_run.workflow_run_id
JOIN workflow on workflow.id = workflow_run.workflow_id
WHERE workflow.id = $1
AND workflow_node_run.workflow_node_name = $2
AND workflow_node_run.status = $3
ORDER BY workflow_run.num ASC
LIMIT 1
`
waitingRunID, err := db.SelectInt(mutexQuery, workflowID, nodeName, string(sdk.StatusWaiting))
if err != nil && err != sql.ErrNoRows {
err = sdk.WrapError(err, "unable to load mutex-locked workflow node run id")
log.ErrorWithFields(ctx, logrus.Fields{
"stack_trace": fmt.Sprintf("%+v", err),
}, "%s", err)
return nil, nil
}
if waitingRunID == 0 {
return nil, nil
}

// Load the workflow node run that is waiting for the mutex
waitingRun, errRun := LoadNodeRunByID(db, waitingRunID, LoadRunOptions{})
if errRun != nil && sdk.Cause(errRun) != sql.ErrNoRows {
err = sdk.WrapError(err, "unable to load mutex-locked workflow node run")
log.ErrorWithFields(ctx, logrus.Fields{
"stack_trace": fmt.Sprintf("%+v", err),
}, "%s", err)
return nil, nil
}
if waitingRun == nil {
return nil, nil
}

// Load the workflow run that is waiting for the mutex
workflowRun, err := LoadRunByID(db, waitingRun.WorkflowRunID, LoadRunOptions{})
if err != nil {
err = sdk.WrapError(err, "unable to load mutex-locked workflow run")
log.ErrorWithFields(ctx, logrus.Fields{
"stack_trace": fmt.Sprintf("%+v", err),
}, "%s", err)
return nil, nil
}

// Add a spawn info on the workflow run
AddWorkflowRunInfo(workflowRun, sdk.SpawnMsg{
ID: sdk.MsgWorkflowNodeMutexRelease.ID,
Args: []interface{}{waitingRun.WorkflowNodeName},
Type: sdk.MsgWorkflowNodeMutexRelease.Type,
})
if err := UpdateWorkflowRun(ctx, db, workflowRun); err != nil {
return nil, sdk.WrapError(err, "unable to update workflow run %d after mutex release", workflowRun.ID)
}

log.Debug("workflow.execute> process the node run %d because mutex has been released", waitingRun.ID)
r, err := executeNodeRun(ctx, db, store, proj, waitingRun)
if err != nil {
return r, sdk.WrapError(err, "unable to reprocess workflow")
}

return r, nil
}

func checkRunOnlyFailedJobs(wr *sdk.WorkflowRun, nr *sdk.WorkflowNodeRun) (*sdk.WorkflowNodeRun, error) {
var previousNR *sdk.WorkflowNodeRun
nrs, ok := wr.WorkflowNodeRuns[nr.WorkflowNodeID]
Expand Down Expand Up @@ -853,27 +872,33 @@ func stopWorkflowNodeOutGoingHook(ctx context.Context, dbFunc func() *gorp.DbMap
}

// StopWorkflowNodeRun to stop a workflow node run with a specific spawn info
func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, proj sdk.Project, nodeRun sdk.WorkflowNodeRun, stopInfos sdk.SpawnInfo) (*ProcessorReport, error) {
func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, proj sdk.Project, workflowRun sdk.WorkflowRun, workflowNodeRun sdk.WorkflowNodeRun, stopInfos sdk.SpawnInfo) (*ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.StopWorkflowNodeRun")
defer end()

report := new(ProcessorReport)

var r1 *ProcessorReport
var errS error
if nodeRun.Stages != nil && len(nodeRun.Stages) > 0 {
r1, errS = stopWorkflowNodePipeline(ctx, dbFunc, store, proj, &nodeRun, stopInfos)
var r *ProcessorReport
var err error
if workflowNodeRun.Stages != nil && len(workflowNodeRun.Stages) > 0 {
r, err = stopWorkflowNodePipeline(ctx, dbFunc, store, proj, &workflowNodeRun, stopInfos)
}
if nodeRun.OutgoingHook != nil {
errS = stopWorkflowNodeOutGoingHook(ctx, dbFunc, &nodeRun)
if workflowNodeRun.OutgoingHook != nil {
err = stopWorkflowNodeOutGoingHook(ctx, dbFunc, &workflowNodeRun)
}
if errS != nil {
return report, sdk.WrapError(errS, "unable to stop workflow node run")
if err != nil {
return report, sdk.WrapError(err, "unable to stop workflow node run")
}

report.Merge(ctx, r1)
report.Add(ctx, nodeRun)
report.Merge(ctx, r)
report.Add(ctx, workflowNodeRun)

r, err = releaseMutex(ctx, dbFunc(), store, proj, workflowNodeRun.WorkflowID, workflowNodeRun.WorkflowNodeName)
report.Merge(ctx, r)
if err != nil {
return report, err
}

return report, nil
}
Expand Down
Loading