Skip to content

Commit

Permalink
feat(api): add metadata on workflowRunEvent
Browse files Browse the repository at this point in the history
Signed-off-by: Yvonnick Esnault <yvonnick@esnau.lt>
  • Loading branch information
yesnault committed Jun 28, 2019
1 parent 3bdc0b0 commit 6b6b493
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 107 deletions.
9 changes: 7 additions & 2 deletions engine/api/event/publish_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func publishRunWorkflow(payload interface{}, key, workflowName, appName, pipName
}

// PublishWorkflowRun publish event on a workflow run
func PublishWorkflowRun(wr sdk.WorkflowRun, projectKey string) {
func PublishWorkflowRun(wr sdk.WorkflowRun, proj sdk.Project) {
metadata := proj.Metadata
for k, v := range wr.Workflow.Metadata {
metadata[k] = v
}
e := sdk.EventRunWorkflow{
ID: wr.ID,
Number: wr.Number,
Expand All @@ -43,8 +47,9 @@ func PublishWorkflowRun(wr sdk.WorkflowRun, projectKey string) {
LastModified: wr.LastModified.Unix(),
LastModifiedNano: wr.LastModified.UnixNano(),
Tags: wr.Tags,
Metadata: metadata,
}
publishRunWorkflow(e, projectKey, wr.Workflow.Name, "", "", "", wr.Number, wr.LastSubNumber, wr.Status, wr.Tags)
publishRunWorkflow(e, proj.Key, wr.Workflow.Name, "", "", "", wr.Number, wr.LastSubNumber, wr.Status, wr.Tags)
}

// PublishWorkflowNodeRun publish event on a workflow node run
Expand Down
2 changes: 1 addition & 1 deletion engine/api/grpc_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *grpcHandlers) SendResult(c context.Context, res *sdk.Result) (*empty.Em
}

workflow.ResyncNodeRunsWithCommits(db, h.store, p, report)
go workflow.SendEvent(db, p.Key, report)
go workflow.SendEvent(db, report)

return new(empty.Empty), nil
}
11 changes: 6 additions & 5 deletions engine/api/workflow/execute_node_job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ProcessorReport struct {
jobs []sdk.WorkflowNodeJobRun
nodes []sdk.WorkflowNodeRun
workflows []sdk.WorkflowRun
Project *sdk.Project
errors []error
}

Expand Down Expand Up @@ -109,7 +110,7 @@ func UpdateNodeJobRunStatus(ctx context.Context, dbFunc func() *gorp.DbMap, db g
)
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

log.Debug("UpdateNodeJobRunStatus> job.ID=%d status=%s", job.ID, status.String())

Expand Down Expand Up @@ -185,7 +186,7 @@ func UpdateNodeJobRunStatus(ctx context.Context, dbFunc func() *gorp.DbMap, db g
if errNR != nil {
return nil, sdk.WrapError(errNR, "Cannot LoadNodeRunByID node run %d", nodeRun.ID)
}
return report.Merge(syncTakeJobInNodeRun(ctx, db, nodeRun, job, stageIndex))
return report.Merge(syncTakeJobInNodeRun(ctx, db, proj, nodeRun, job, stageIndex))
}

_, next = observability.Span(ctx, "workflow.LoadRunByID")
Expand Down Expand Up @@ -258,12 +259,12 @@ func PrepareSpawnInfos(infos []sdk.SpawnInfo) []sdk.SpawnInfo {
}

// TakeNodeJobRun Take an a job run for update
func TakeNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, db gorp.SqlExecutor, store cache.Store, p *sdk.Project, jobID int64, workerModel string, workerName string, workerID string, infos []sdk.SpawnInfo) (*sdk.WorkflowNodeJobRun, *ProcessorReport, error) {
func TakeNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, jobID int64, workerModel string, workerName string, workerID string, infos []sdk.SpawnInfo) (*sdk.WorkflowNodeJobRun, *ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.TakeNodeJobRun")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

// first load without FOR UPDATE WAIT to quick check status
currentStatus, errS := db.SelectStr(`SELECT status FROM workflow_node_run_job WHERE id = $1`, jobID)
Expand Down Expand Up @@ -302,7 +303,7 @@ func TakeNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, db gorp.SqlE
}

var err error
report, err = report.Merge(UpdateNodeJobRunStatus(ctx, dbFunc, db, store, p, job, sdk.StatusBuilding))
report, err = report.Merge(UpdateNodeJobRunStatus(ctx, dbFunc, db, store, proj, job, sdk.StatusBuilding))
if err != nil {
log.Debug("TakeNodeJobRun> call UpdateNodeJobRunStatus on job %d set status from %s to %s", job.ID, job.Status, sdk.StatusBuilding)
return nil, nil, sdk.WrapError(err, "Cannot update node job run %d", jobID)
Expand Down
18 changes: 9 additions & 9 deletions engine/api/workflow/execute_node_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/ovh/cds/sdk/log"
)

func syncTakeJobInNodeRun(ctx context.Context, db gorp.SqlExecutor, n *sdk.WorkflowNodeRun, j *sdk.WorkflowNodeJobRun, stageIndex int) (*ProcessorReport, error) {
func syncTakeJobInNodeRun(ctx context.Context, db gorp.SqlExecutor, proj *sdk.Project, n *sdk.WorkflowNodeRun, j *sdk.WorkflowNodeJobRun, stageIndex int) (*ProcessorReport, error) {
_, end := observability.Span(ctx, "workflow.syncTakeJobInNodeRun")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

//If status is not waiting neither build: nothing to do
if sdk.StatusIsTerminated(n.Status) {
Expand Down Expand Up @@ -92,7 +92,7 @@ func execute(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *
return nil, sdk.WrapError(errWr, "workflow.execute> unable to load workflow run ID %d", nr.WorkflowRunID)
}

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
defer func(wNr *sdk.WorkflowNodeRun) {
report.Add(*wNr)
}(nr)
Expand Down Expand Up @@ -130,7 +130,7 @@ func execute(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *
//Insert data in workflow_node_run_job
log.Debug("workflow.execute> stage %s call addJobsToQueue", stage.Name)
var err error
report, err = report.Merge(addJobsToQueue(ctx, db, stage, wr, nr, runContext))
report, err = report.Merge(addJobsToQueue(ctx, db, proj, stage, wr, nr, runContext))
if err != nil {
return report, err
}
Expand Down Expand Up @@ -324,12 +324,12 @@ func execute(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *
return report, nil
}

func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, stage *sdk.Stage, wr *sdk.WorkflowRun, run *sdk.WorkflowNodeRun, runContext nodeRunContext) (*ProcessorReport, error) {
func addJobsToQueue(ctx context.Context, db gorp.SqlExecutor, proj *sdk.Project, stage *sdk.Stage, wr *sdk.WorkflowRun, run *sdk.WorkflowNodeRun, runContext nodeRunContext) (*ProcessorReport, error) {
var end func()
ctx, end = observability.Span(ctx, "workflow.addJobsToQueue")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

_, next := observability.Span(ctx, "checkCondition")
conditionsOK := checkCondition(wr, stage.Conditions, run.BuildParameters)
Expand Down Expand Up @@ -666,7 +666,7 @@ func stopWorkflowNodePipeline(ctx context.Context, dbFunc func() *gorp.DbMap, st
ctx, end = observability.Span(ctx, "workflow.stopWorkflowNodePipeline")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

const stopWorkflowNodeRunNBWorker = 5
var wg sync.WaitGroup
Expand Down Expand Up @@ -757,7 +757,7 @@ func StopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store c
ctx, end = observability.Span(ctx, "workflow.StopWorkflowNodeRun")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

var r1 *ProcessorReport
var errS error
Expand Down Expand Up @@ -815,7 +815,7 @@ func stopWorkflowNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, stor
ctx, end = observability.Span(ctx, "workflow.stopWorkflowNodeJobRun")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

for njrID := range chanNjrID {
tx, errTx := dbFunc().Begin()
Expand Down
6 changes: 3 additions & 3 deletions engine/api/workflow/execute_outgoing_hook_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func UpdateOutgoingHookRunStatus(ctx context.Context, db gorp.SqlExecutor, store
ctx, end := observability.Span(ctx, "workflow.UpdateOutgoingHookRunStatus")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

//Checking if the hook is still at status waiting or building
pendingOutgoingHooks := wr.PendingOutgoingHook()
Expand Down Expand Up @@ -50,7 +50,7 @@ func UpdateOutgoingHookRunStatus(ctx context.Context, db gorp.SqlExecutor, store
}

oldStatus := wr.Status
r1, err := computeAndUpdateWorkflowRunStatus(ctx, db, wr)
r1, err := computeAndUpdateWorkflowRunStatus(ctx, db, proj, wr)
if err != nil {
return report, sdk.WrapError(err, "processNodeOutGoingHook> Unable to compute workflow run status")
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func UpdateParentWorkflowRun(ctx context.Context, dbFunc func() *gorp.DbMap, sto
return sdk.WrapError(err, "Unable to commit transaction")
}

go SendEvent(dbFunc(), parentProj.Key, report)
go SendEvent(dbFunc(), report)

return nil
}
13 changes: 7 additions & 6 deletions engine/api/workflow/process_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach

//// Process Report
oldStatus := wr.Status
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
defer func(oldStatus string, wr *sdk.WorkflowRun) {
if oldStatus != wr.Status {
report.Add(*wr)
Expand All @@ -68,7 +68,7 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
}
report, _ = report.Merge(r1, nil)

r2, err := computeAndUpdateWorkflowRunStatus(ctx, db, wr)
r2, err := computeAndUpdateWorkflowRunStatus(ctx, db, proj, wr)
if err != nil {
return nil, false, sdk.WrapError(err, "unable to compute workflow run status")
}
Expand All @@ -85,7 +85,7 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
}
report, _ = report.Merge(r1, nil)

r2, err := computeAndUpdateWorkflowRunStatus(ctx, db, wr)
r2, err := computeAndUpdateWorkflowRunStatus(ctx, db, proj, wr)
if err != nil {
return nil, false, sdk.WrapError(err, "unable to compute workflow run status")
}
Expand All @@ -106,7 +106,7 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
}
report, _ = report.Merge(r2, nil)

r1, err := computeAndUpdateWorkflowRunStatus(ctx, db, wr)
r1, err := computeAndUpdateWorkflowRunStatus(ctx, db, proj, wr)
if err != nil {
return nil, false, sdk.WrapError(err, "unable to compute workflow run status")
}
Expand All @@ -115,8 +115,9 @@ func processWorkflowDataRun(ctx context.Context, db gorp.SqlExecutor, store cach
return report, true, nil
}

func computeAndUpdateWorkflowRunStatus(ctx context.Context, db gorp.SqlExecutor, wr *sdk.WorkflowRun) (*ProcessorReport, error) {
report := new(ProcessorReport)
func computeAndUpdateWorkflowRunStatus(ctx context.Context, db gorp.SqlExecutor, proj *sdk.Project, wr *sdk.WorkflowRun) (*ProcessorReport, error) {
report := &ProcessorReport{Project: proj}

// Recompute status counter, it's mandatory to resync
// the map of workflow node runs of the workflow run to get the right statuses
// After resync, recompute all status counter compute the workflow status
Expand Down
6 changes: 3 additions & 3 deletions engine/api/workflow/process_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

func processNodeTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, parentNodeRun []*sdk.WorkflowNodeRun, node *sdk.Node, parentSubNumber int) (*ProcessorReport, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

for j := range node.Triggers {
t := &node.Triggers[j]
Expand Down Expand Up @@ -50,7 +50,7 @@ func processNodeTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.S
}

func processNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, n *sdk.Node, subNumber int, parentNodeRuns []*sdk.WorkflowNodeRun, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
exist, errN := nodeRunExist(db, wr.ID, n.ID, wr.Number, subNumber)
if errN != nil {
return nil, false, sdk.WrapError(errN, "processNodeRun> unable to check if node run exist")
Expand Down Expand Up @@ -98,7 +98,7 @@ func processNodeRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
}

func processNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, n *sdk.Node, subNumber int, parents []*sdk.WorkflowNodeRun, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

//TODO: Check user for manual done but check permission also for automatic trigger and hooks (with system to authenticate a webhook)
if n.Context == nil {
Expand Down
2 changes: 1 addition & 1 deletion engine/api/workflow/process_outgoinghook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func processNodeOutGoingHook(ctx context.Context, db gorp.SqlExecutor, store cac
ctx, end := observability.Span(ctx, "workflow.processNodeOutGoingHook")
defer end()

report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}

//Check if the WorkflowNodeOutgoingHookRun already exist with the same subnumber
nrs, ok := wr.WorkflowNodeRuns[node.ID]
Expand Down
8 changes: 4 additions & 4 deletions engine/api/workflow/process_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

func processStartFromNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, startingFromNode *int64, maxsn int64, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
start := mapNodes[*startingFromNode]
if start == nil {
return nil, false, sdk.ErrWorkflowNodeNotFound
Expand Down Expand Up @@ -51,7 +51,7 @@ func processStartFromNode(ctx context.Context, db gorp.SqlExecutor, store cache.

func processStartFromRootNode(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node, hookEvent *sdk.WorkflowNodeRunHookEvent, manual *sdk.WorkflowNodeRunManual) (*ProcessorReport, bool, error) {
log.Debug("processWorkflowRun> starting from the root : %d (pipeline %s)", wr.Workflow.WorkflowData.Node.ID, wr.Workflow.Pipelines[wr.Workflow.WorkflowData.Node.Context.ID].Name)
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
//Run the root: manual or from an event
AddWorkflowRunInfo(wr, false, sdk.SpawnMsg{
ID: sdk.MsgWorkflowStarting.ID,
Expand All @@ -70,7 +70,7 @@ func processStartFromRootNode(ctx context.Context, db gorp.SqlExecutor, store ca
}

func processAllNodesTriggers(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node) (*ProcessorReport, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
//Checks the triggers
for k := range wr.WorkflowNodeRuns {
// only check the last node run
Expand All @@ -88,7 +88,7 @@ func processAllNodesTriggers(ctx context.Context, db gorp.SqlExecutor, store cac
}

func processAllJoins(ctx context.Context, db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.WorkflowRun, mapNodes map[int64]*sdk.Node) (*ProcessorReport, error) {
report := new(ProcessorReport)
report := &ProcessorReport{Project: proj}
//Checks the joins
for i := range wr.Workflow.WorkflowData.Joins {
j := &wr.Workflow.WorkflowData.Joins[i]
Expand Down
5 changes: 3 additions & 2 deletions engine/api/workflow/resync_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func Resync(db gorp.SqlExecutor, store cache.Store, proj *sdk.Project, wr *sdk.W
}

//ResyncWorkflowRunStatus resync the status of workflow if you stop a node run when workflow run is building
func ResyncWorkflowRunStatus(db gorp.SqlExecutor, wr *sdk.WorkflowRun) (*ProcessorReport, error) {
report := new(ProcessorReport)
func ResyncWorkflowRunStatus(db gorp.SqlExecutor, proj *sdk.Project, wr *sdk.WorkflowRun) (*ProcessorReport, error) {
report := &ProcessorReport{Project: proj}

var counterStatus statusCounter
for _, wnrs := range wr.WorkflowNodeRuns {
for _, wnr := range wnrs {
Expand Down
Loading

0 comments on commit 6b6b493

Please sign in to comment.