Skip to content

Commit

Permalink
fix(api): fix send pullrequest report + do not load node from old mod…
Browse files Browse the repository at this point in the history
…el (#3722)
  • Loading branch information
sguiheux authored and yesnault committed Dec 13, 2018
1 parent 608a8c2 commit 10e9ed3
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 77 deletions.
17 changes: 13 additions & 4 deletions cli/cdsctl/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,20 @@ func workflowRunManualRun(v cli.Values) error {
}
for _, wnrs := range wr.WorkflowNodeRuns {
for _, wnr := range wnrs {
wn := wr.Workflow.GetNode(wnr.WorkflowNodeID)
if wn.Name == v.GetString("node-name") {
fromNodeID = wnr.WorkflowNodeID
break
if wr.Version > 1 {
wn := wr.Workflow.WorkflowData.NodeByID(wnr.WorkflowNodeID)
if wn.Name == v.GetString("node-name") {
fromNodeID = wnr.WorkflowNodeID
break
}
} else {
wn := wr.Workflow.GetNode(wnr.WorkflowNodeID)
if wn.Name == v.GetString("node-name") {
fromNodeID = wnr.WorkflowNodeID
break
}
}

}
}
}
Expand Down
5 changes: 2 additions & 3 deletions cli/cdsctl/workflow_run_interactive.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ func workflowRunInteractive(v cli.Values, w *sdk.WorkflowRun, baseURL string) er
failedOn = ""
for _, wnrs := range wo.WorkflowNodeRuns {
for _, wnr := range wnrs {
wn := w.Workflow.GetNode(wnr.WorkflowNodeID)
for _, stage := range wnr.Stages {
for _, job := range stage.RunJobs {
status, _ := statusShort(job.Status)
Expand All @@ -39,7 +38,7 @@ func workflowRunInteractive(v cli.Values, w *sdk.WorkflowRun, baseURL string) er
end = fmt.Sprintf(" end:%s", job.Done)
}

jobLine := fmt.Sprintf("%s %s/%s/%s/%s %s %s \n", status, v[_WorkflowName], wn.Name, stage.Name, job.Job.Action.Name, start, end)
jobLine := fmt.Sprintf("%s %s/%s/%s/%s %s %s \n", status, v[_WorkflowName], wnr.WorkflowNodeName, stage.Name, job.Job.Action.Name, start, end)
if job.Status == sdk.StatusFail.String() {
newOutput += fmt.Sprintf(tm.Color(tm.Bold(jobLine), tm.RED))
} else {
Expand All @@ -66,7 +65,7 @@ func workflowRunInteractive(v cli.Values, w *sdk.WorkflowRun, baseURL string) er
if step.Status == sdk.StatusFail.String() {
if !failedOnStepKnowned {
// hide "Starting" text on resume
failedOn = fmt.Sprintf("%s%s / %s / %s / %s %s \n", failedOn, v[_WorkflowName], wn.Name, stage.Name, titleStep, strings.Replace(line, "Starting", "", 1))
failedOn = fmt.Sprintf("%s%s / %s / %s / %s %s \n", failedOn, v[_WorkflowName], wnr.WorkflowNodeName, stage.Name, titleStep, strings.Replace(line, "Starting", "", 1))
}
failedOnStepKnowned = true
titleStep = fmt.Sprintf(tm.Color(titleStep, tm.RED))
Expand Down
4 changes: 2 additions & 2 deletions engine/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
for {
select {
case <-ctx.Done():
log.Info("events.Http: context done")
log.Debug("events.Http: context done")
b.chanRemoveClient <- client.UUID
break leave
case <-r.Context().Done():
log.Info("events.Http: client disconnected")
log.Debug("events.Http: client disconnected")
b.chanRemoveClient <- client.UUID
break leave
case <-tick.C:
Expand Down
6 changes: 3 additions & 3 deletions engine/api/workflow/dao_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func GetNodeRunBuildCommits(ctx context.Context, db gorp.SqlExecutor, store cach
}

// PreviousNodeRun find previous node run
func PreviousNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, n sdk.WorkflowNode, workflowID int64) (sdk.WorkflowNodeRun, error) {
func PreviousNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, nodeName string, workflowID int64) (sdk.WorkflowNodeRun, error) {
var nodeRun sdk.WorkflowNodeRun
// check the first run of a workflow, no need to check previous
if nr.Number == 1 && nr.SubNumber == 0 {
Expand All @@ -713,8 +713,8 @@ func PreviousNodeRun(db gorp.SqlExecutor, nr sdk.WorkflowNodeRun, n sdk.Workflow
`, nodeRunFields)

var rr = NodeRun{}
if err := db.SelectOne(&rr, query, workflowID, n.Name, nr.VCSBranch, nr.VCSTag, nr.Number, nr.ID); err != nil {
return nodeRun, sdk.WrapError(err, "Cannot load previous run on workflow %d node %s nr.VCSBranch:%s nr.VCSTag:%s nr.Number:%d nr.ID:%d ", workflowID, n.Name, nr.VCSBranch, nr.VCSTag, nr.Number, nr.ID)
if err := db.SelectOne(&rr, query, workflowID, nodeName, nr.VCSBranch, nr.VCSTag, nr.Number, nr.ID); err != nil {
return nodeRun, sdk.WrapError(err, "Cannot load previous run on workflow %d node %s nr.VCSBranch:%s nr.VCSTag:%s nr.Number:%d nr.ID:%d ", workflowID, nodeName, nr.VCSBranch, nr.VCSTag, nr.Number, nr.ID)
}
pNodeRun, errF := fromDBNodeRun(rr, LoadRunOptions{})
if errF != nil {
Expand Down
20 changes: 14 additions & 6 deletions engine/api/workflow/dao_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,24 +440,32 @@ func CanBeRun(workflowRun *sdk.WorkflowRun, workflowNodeRun *sdk.WorkflowNodeRun
if workflowRun == nil {
return false
}
node := workflowRun.Workflow.GetNode(workflowNodeRun.WorkflowNodeID)
if node == nil {
return true

var ancestorsID []int64
if workflowRun.Version < 2 {
node := workflowRun.Workflow.GetNode(workflowNodeRun.WorkflowNodeID)
if node == nil {
return true
}
ancestorsID = node.Ancestors(&workflowRun.Workflow, true)
} else {
node := workflowRun.Workflow.WorkflowData.NodeByID(workflowNodeRun.WorkflowNodeID)
if node == nil {
return true
}
ancestorsID = node.Ancestors(workflowRun.Workflow.WorkflowData)
}

ancestorsID := node.Ancestors(&workflowRun.Workflow, true)
if ancestorsID == nil || len(ancestorsID) == 0 {
return true
}

for _, ancestorID := range ancestorsID {
nodeRuns, ok := workflowRun.WorkflowNodeRuns[ancestorID]
if ok && (len(nodeRuns) == 0 || !sdk.StatusIsTerminated(nodeRuns[0].Status) ||
nodeRuns[0].Status == "" || nodeRuns[0].Status == sdk.StatusNeverBuilt.String()) {
return false
}
}

return true
}

Expand Down
11 changes: 0 additions & 11 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,6 @@ func UpdateNodeJobRunStatus(ctx context.Context, dbFunc func() *gorp.DbMap, db g
var errReport error
report, errReport = report.Merge(execute(ctx, db, store, proj, nodeRun, runContext))

//Start a goroutine to update commit statuses in repositories manager
go func(wfRun *sdk.WorkflowRun) {
//The function could be called with nil project so we need to test if project is not nil
if sdk.StatusIsTerminated(wfRun.Status) && proj != nil {
wr.LastExecution = time.Now()
if err := ResyncCommitStatus(context.Background(), dbFunc(), store, proj, wfRun); err != nil {
log.Error("workflow.UpdateNodeJobRunStatus> %v", err)
}
}
}(wr)

return report, errReport
}

Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func NodeBuildParametersFromWorkflow(ctx context.Context, db gorp.SqlExecutor, s

// Process ancestor
for _, aID := range ancestorsIds {
ancestor := wf.GetNode(aID)
ancestor := wf.WorkflowData.NodeByID(aID)
if ancestor == nil {
continue
}
Expand Down
20 changes: 19 additions & 1 deletion engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func Resync(db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.W
return sdk.WrapError(errW, "Resync> Cannot load workflow")
}

// Resync old model
if err := resyncNode(wr.Workflow.Root, *wf); err != nil {
return err
}
Expand All @@ -35,8 +36,25 @@ func Resync(db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.W
}
}

//Resync pipelines
// Resync new model
oldNode := wr.Workflow.WorkflowData.Array()
for i := range oldNode {
nodeToUpdate := oldNode[i]
for _, n := range wf.WorkflowData.Array() {
if nodeToUpdate.Name == n.Name {
nodeToUpdate.Context = n.Context
break
}
}
}

//Resync map
wr.Workflow.Pipelines = wf.Pipelines
wr.Workflow.Applications = wf.Applications
wr.Workflow.Environments = wf.Environments
wr.Workflow.ProjectPlatforms = wf.ProjectPlatforms
wr.Workflow.HookModels = wf.HookModels
wr.Workflow.OutGoingHookModels = wf.OutGoingHookModels

return UpdateWorkflowRun(nil, db, wr)
}
Expand Down
98 changes: 66 additions & 32 deletions engine/api/workflow/workflow_run_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,10 @@ func SendEvent(db gorp.SqlExecutor, key string, report *ProcessorReport) {
if wnr.SubNumber > 0 {
previousNodeRun = wnr
} else {
// Load previous run on current node
node := wr.Workflow.GetNode(wnr.WorkflowNodeID)
if node != nil {
var errN error
previousNodeRun, errN = PreviousNodeRun(db, wnr, *node, wr.WorkflowID)
if errN != nil {
log.Warning("SendEvent.workflow> Cannot load previous node run: %s", errN)
}
} else {
log.Info("SendEvent.workflow.previousNodeRun > Unable to find node %d in workflow", wnr.WorkflowNodeID)
var errN error
previousNodeRun, errN = PreviousNodeRun(db, wnr, wnr.WorkflowNodeName, wr.WorkflowID)
if errN != nil {
log.Warning("SendEvent.workflow> Cannot load previous node run: %s", errN)
}
}

Expand All @@ -73,28 +67,44 @@ func SendEvent(db gorp.SqlExecutor, key string, report *ProcessorReport) {

// ResyncCommitStatus resync commit status for a workflow run
func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun) error {

_, end := observability.Span(ctx, "workflow.resyncCommitStatus",
observability.Tag(observability.TagWorkflow, wr.Workflow.Name),
observability.Tag(observability.TagWorkflowRun, wr.Number),
)
defer end()

for nodeID, nodeRuns := range wr.WorkflowNodeRuns {

sort.Slice(nodeRuns, func(i, j int) bool {
return nodeRuns[i].SubNumber >= nodeRuns[j].SubNumber
})

nodeRun := nodeRuns[0]

if !sdk.StatusIsTerminated(nodeRun.Status) {
continue
}

node := wr.Workflow.GetNode(nodeID)
if !node.IsLinkedToRepo() {
return nil
var vcsServerName string
var repoFullName string
if wr.Version > 1 {
node := wr.Workflow.WorkflowData.NodeByID(nodeID)
if !node.IsLinkedToRepo(&wr.Workflow) {
return nil
}
vcsServerName = wr.Workflow.Applications[node.Context.ApplicationID].VCSServer
repoFullName = wr.Workflow.Applications[node.Context.ApplicationID].RepositoryFullname
} else {
node := wr.Workflow.GetNode(nodeID)
if !node.IsLinkedToRepo() {
return nil
}
vcsServerName = node.Context.Application.VCSServer
repoFullName = node.Context.Application.RepositoryFullname
}

vcsServer := repositoriesmanager.GetProjectVCSServer(proj, node.Context.Application.VCSServer)
vcsServer := repositoriesmanager.GetProjectVCSServer(proj, vcsServerName)
if vcsServer == nil {
return nil
}
Expand All @@ -111,14 +121,15 @@ func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
if nodeRun.VCSTag != "" {
ref = nodeRun.VCSTag
}
statuses, errStatuses := client.ListStatuses(ctx, node.Context.Application.RepositoryFullname, ref)

statuses, errStatuses := client.ListStatuses(ctx, repoFullName, ref)
if errStatuses != nil {
return sdk.WrapError(errStatuses, "resyncCommitStatus> Cannot get statuses %s", details)
}

var statusFound *sdk.VCSCommitStatus
expected := sdk.VCSCommitStatusDescription(proj.Key, wr.Workflow.Name, sdk.EventRunWorkflowNode{
NodeName: node.Name,
NodeName: nodeRun.WorkflowNodeName,
})

for i, status := range statuses {
Expand Down Expand Up @@ -159,7 +170,6 @@ func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
}
continue
}

case sdk.StatusFail.String():
switch nodeRun.Status {
case sdk.StatusFail.String():
Expand Down Expand Up @@ -190,12 +200,36 @@ func ResyncCommitStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, nodeRun *sdk.WorkflowNodeRun) error {
log.Debug("Send status for node run %d", nodeRun.ID)

node := wr.Workflow.GetNode(nodeRun.WorkflowNodeID)
if !node.IsLinkedToRepo() {
return nil
var app sdk.Application
var pip sdk.Pipeline
var env sdk.Environment
if wr.Version > 1 {
node := wr.Workflow.WorkflowData.NodeByID(nodeRun.WorkflowNodeID)
if !node.IsLinkedToRepo(&wr.Workflow) {
return nil
}
app = wr.Workflow.Applications[node.Context.ApplicationID]
if node.Context.PipelineID > 0 {
pip = wr.Workflow.Pipelines[node.Context.PipelineID]
}
if node.Context.EnvironmentID > 0 {
env = wr.Workflow.Environments[node.Context.EnvironmentID]
}
} else {
node := wr.Workflow.GetNode(nodeRun.WorkflowNodeID)
if !node.IsLinkedToRepo() {
return nil
}
app = *node.Context.Application
if node.PipelineID > 0 {
pip = wr.Workflow.Pipelines[node.PipelineID]
}
if node.Context.EnvironmentID > 0 {
env = *node.Context.Environment
}
}

vcsServer := repositoriesmanager.GetProjectVCSServer(proj, node.Context.Application.VCSServer)
vcsServer := repositoriesmanager.GetProjectVCSServer(proj, app.VCSServer)
if vcsServer == nil {
return nil
}
Expand All @@ -222,7 +256,7 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
NodeID: nodeRun.WorkflowNodeID,
RunID: nodeRun.WorkflowRunID,
StagesSummary: make([]sdk.StageSummary, len(nodeRun.Stages)),
NodeName: node.Name,
NodeName: nodeRun.WorkflowNodeName,
}

for i := range nodeRun.Stages {
Expand All @@ -231,13 +265,13 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St

var pipName, appName, envName string

pipName = node.PipelineName
appName = node.Context.Application.Name
eventWNR.RepositoryManagerName = node.Context.Application.VCSServer
eventWNR.RepositoryFullName = node.Context.Application.RepositoryFullname
pipName = pip.Name
appName = app.Name
eventWNR.RepositoryManagerName = app.VCSServer
eventWNR.RepositoryFullName = app.RepositoryFullname

if node.Context.Environment != nil {
envName = node.Context.Environment.Name
if env.Name != "" {
envName = env.Name
}

evt := sdk.Event{
Expand All @@ -256,24 +290,24 @@ func sendVCSEventStatus(ctx context.Context, db gorp.SqlExecutor, store cache.St
}

//Check if this branch and this commit is a pullrequest
prs, err := client.PullRequests(ctx, node.Context.Application.RepositoryFullname)
prs, err := client.PullRequests(ctx, app.RepositoryFullname)
if err != nil {
log.Error("sendVCSEventStatus> unable to get pull requests on repo %s: %v", node.Context.Application.RepositoryFullname, err)
log.Error("sendVCSEventStatus> unable to get pull requests on repo %s: %v", app.RepositoryFullname, err)
return nil
}

//Send comment on pull request
for _, pr := range prs {
if pr.Head.Branch.DisplayID == nodeRun.VCSBranch && pr.Head.Branch.LatestCommit == nodeRun.VCSHash {
if nodeRun.Status != sdk.StatusFail.String() {
if nodeRun.Status != sdk.StatusFail.String() && nodeRun.Status != sdk.StatusStopped.String() {
continue
}
report, err := nodeRun.Report()
if err != nil {
log.Error("sendVCSEventStatus> unable to compute node run report%v", err)
return nil
}
if err := client.PullRequestComment(ctx, node.Context.Application.RepositoryFullname, pr.ID, report); err != nil {
if err := client.PullRequestComment(ctx, app.RepositoryFullname, pr.ID, report); err != nil {
log.Error("sendVCSEventStatus> unable to send PR report%v", err)
return nil
}
Expand Down
Loading

0 comments on commit 10e9ed3

Please sign in to comment.