Skip to content

Commit 4389d5a

Browse files
authored
fix(api,worker): send spawn infos when job is failed to show the reason (#5124)
1 parent 2130ed2 commit 4389d5a

File tree

16 files changed

+74
-41
lines changed

16 files changed

+74
-41
lines changed

engine/api/api_routes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func (api *API) InitRouter() {
318318
r.Handle("/queue/workflows/{permJobID}/book", Scope(sdk.AuthConsumerScopeRunExecution), r.POST(api.postBookWorkflowJobHandler, EnableTracing(), MaintenanceAware()), r.DELETE(api.deleteBookWorkflowJobHandler, EnableTracing(), MaintenanceAware()))
319319
r.Handle("/queue/workflows/{permJobID}/infos", Scope(sdk.AuthConsumerScopeRunExecution), r.GET(api.getWorkflowJobHandler, EnableTracing(), MaintenanceAware()))
320320
r.Handle("/queue/workflows/{permJobID}/vulnerability", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postVulnerabilityReportHandler, EnableTracing(), MaintenanceAware()))
321-
r.Handle("/queue/workflows/{permJobID}/spawn/infos", Scope(sdk.AuthConsumerScopeRunExecution), r.POST(r.Asynchronous(api.postSpawnInfosWorkflowJobHandler, 1), EnableTracing(), MaintenanceAware()))
321+
r.Handle("/queue/workflows/{permJobID}/spawn/infos", Scope(sdk.AuthConsumerScopeRunExecution), r.POST(api.postSpawnInfosWorkflowJobHandler, EnableTracing(), MaintenanceAware()))
322322
r.Handle("/queue/workflows/{permJobID}/result", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobResultHandler, EnableTracing(), MaintenanceAware()))
323323
r.Handle("/queue/workflows/{permJobID}/log", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(api.postWorkflowJobLogsHandler, MaintenanceAware()))
324324
r.Handle("/queue/workflows/log/service", Scope(sdk.AuthConsumerScopeRunExecution), r.POSTEXECUTE(r.Asynchronous(api.postWorkflowJobServiceLogsHandler, 1), MaintenanceAware()))

engine/api/router_middleware_auth.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
5555
if ok {
5656
claims := jwt.Claims.(*sdk.AuthSessionJWTClaims)
5757
sessionID := claims.StandardClaims.Id
58-
log.Debug("authMiddleware> try to retrieve session for given jwt with id: %s", sessionID)
5958
// Check for session based on jwt from context
6059
session, err = authentication.CheckSession(ctx, api.mustDB(), sessionID)
6160
if err != nil {
@@ -65,7 +64,6 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
6564

6665
if session != nil {
6766
ctx = context.WithValue(ctxWithJWT, contextSession, session)
68-
log.Debug("authMiddleware> try to retrieve consumer for given session with id: %s", session.ConsumerID)
6967
// Load auth consumer for current session in database with authentified user and contacts
7068
c, err := authentication.LoadConsumerByID(ctx, api.mustDB(), session.ConsumerID,
7169
authentication.LoadConsumerOptions.WithAuthentifiedUser)
@@ -102,8 +100,6 @@ func (api *API) authMiddleware(ctx context.Context, w http.ResponseWriter, req *
102100
}
103101

104102
if consumer != nil {
105-
log.Debug("authMiddleware> check scope for current consumer")
106-
107103
ctx = context.WithValue(ctx, contextAPIConsumer, consumer)
108104

109105
// Checks scopes, one of expected scopes should be in actual scopes

engine/api/workflow/execute_node_job_run.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,13 @@ func UpdateNodeJobRunStatus(ctx context.Context, db gorp.SqlExecutor, store cach
198198
report.Merge(ctx, r)
199199
return report, err
200200
}
201+
202+
spawnInfos, err := LoadNodeRunJobInfo(ctx, db, job.ID)
203+
if err != nil {
204+
return report, sdk.WrapError(err, "unable to load spawn infos for runJob: %d", job.ID)
205+
}
206+
job.SpawnInfos = spawnInfos
207+
201208
syncJobInNodeRun(nodeRun, job, stageIndex)
202209

203210
if job.Status != sdk.StatusStopped {
@@ -209,8 +216,10 @@ func UpdateNodeJobRunStatus(ctx context.Context, db gorp.SqlExecutor, store cach
209216
}
210217

211218
// AddSpawnInfosNodeJobRun saves spawn info before starting worker
212-
func AddSpawnInfosNodeJobRun(db gorp.SqlExecutor, jobID int64, infos []sdk.SpawnInfo) error {
219+
func AddSpawnInfosNodeJobRun(db gorp.SqlExecutor, nodeID, jobID int64, infos []sdk.SpawnInfo) error {
220+
213221
wnjri := &sdk.WorkflowNodeJobRunInfo{
222+
WorkflowNodeRunID: nodeID,
214223
WorkflowNodeJobRunID: jobID,
215224
SpawnInfos: PrepareSpawnInfos(infos),
216225
}
@@ -276,7 +285,7 @@ func TakeNodeJobRun(ctx context.Context, db gorp.SqlExecutor, store cache.Store,
276285
return nil, nil, sdk.WrapError(err, "cannot update worker_id in node job run %d", jobID)
277286
}
278287

279-
if err := AddSpawnInfosNodeJobRun(db, jobID, PrepareSpawnInfos(infos)); err != nil {
288+
if err := AddSpawnInfosNodeJobRun(db, job.WorkflowNodeRunID, jobID, PrepareSpawnInfos(infos)); err != nil {
280289
return nil, nil, sdk.WrapError(err, "cannot save spawn info on node job run %d", jobID)
281290
}
282291

engine/api/workflow/execute_node_run.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func syncJobInNodeRun(n *sdk.WorkflowNodeRun, j *sdk.WorkflowNodeJobRun, stageIn
3636
rj.Job = j.Job
3737
rj.Header = j.Header
3838
rj.Parameters = j.Parameters
39+
rj.SpawnInfos = j.SpawnInfos
3940
}
4041
}
4142
}
@@ -535,7 +536,7 @@ jobLoop:
535536
}
536537
next()
537538

538-
if err := AddSpawnInfosNodeJobRun(db, wjob.ID, PrepareSpawnInfos(wjob.SpawnInfos)); err != nil {
539+
if err := AddSpawnInfosNodeJobRun(db, wjob.WorkflowNodeRunID, wjob.ID, PrepareSpawnInfos(wjob.SpawnInfos)); err != nil {
539540
return nil, sdk.WrapError(err, "cannot save spawn info job %d", wjob.ID)
540541
}
541542

@@ -932,7 +933,7 @@ func stopWorkflowNodeJobRun(ctx context.Context, dbFunc func() *gorp.DbMap, stor
932933
return report
933934
}
934935

935-
if err := AddSpawnInfosNodeJobRun(tx, njr.ID, []sdk.SpawnInfo{stopInfos}); err != nil {
936+
if err := AddSpawnInfosNodeJobRun(tx, njr.WorkflowNodeRunID, njr.ID, []sdk.SpawnInfo{stopInfos}); err != nil {
936937
chanErr <- sdk.WrapError(err, "Cannot save spawn info job %d", njr.ID)
937938
tx.Rollback()
938939
wg.Done()

engine/api/workflow/run_workflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ queueRun:
589589
}
590590

591591
//AddSpawnInfosNodeJobRun
592-
err := workflow.AddSpawnInfosNodeJobRun(db, j.ID, []sdk.SpawnInfo{
592+
err := workflow.AddSpawnInfosNodeJobRun(db, j.WorkflowNodeRunID, j.ID, []sdk.SpawnInfo{
593593
{
594594
APITime: time.Now(),
595595
RemoteTime: time.Now(),

engine/api/workflow_queue.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -294,14 +294,14 @@ func (api *API) postVulnerabilityReportHandler() service.Handler {
294294
}
295295
}
296296

297-
func (api *API) postSpawnInfosWorkflowJobHandler() service.AsynchronousHandler {
298-
return func(ctx context.Context, r *http.Request) error {
297+
func (api *API) postSpawnInfosWorkflowJobHandler() service.Handler {
298+
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
299299
id, err := requestVarInt(r, "permJobID")
300300
if err != nil {
301301
return sdk.WrapError(err, "invalid id")
302302
}
303303

304-
if ok := isHatchery(ctx); !ok {
304+
if ok := isHatchery(ctx) || isWorker(ctx); !ok {
305305
return sdk.WithStack(sdk.ErrForbidden)
306306
}
307307

@@ -318,13 +318,12 @@ func (api *API) postSpawnInfosWorkflowJobHandler() service.AsynchronousHandler {
318318
}
319319
defer tx.Rollback() // nolint
320320

321-
if _, err := workflow.LoadNodeJobRun(ctx, tx, api.Cache, id); err != nil {
322-
if !sdk.ErrorIs(err, sdk.ErrWorkflowNodeRunJobNotFound) {
323-
return err
324-
}
325-
return nil
321+
jobRun, err := workflow.LoadNodeJobRun(ctx, tx, api.Cache, id)
322+
if err != nil {
323+
return err
326324
}
327-
if err := workflow.AddSpawnInfosNodeJobRun(tx, id, s); err != nil {
325+
326+
if err := workflow.AddSpawnInfosNodeJobRun(tx, jobRun.WorkflowNodeRunID, jobRun.ID, s); err != nil {
328327
return err
329328
}
330329

@@ -435,7 +434,7 @@ func postJobResult(ctx context.Context, dbFunc func(context.Context) *gorp.DbMap
435434
Message: sdk.SpawnMsg{ID: sdk.MsgSpawnInfoWorkerEnd.ID, Args: []interface{}{wr.Name, res.Duration}},
436435
}}
437436

438-
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
437+
if err := workflow.AddSpawnInfosNodeJobRun(tx, job.WorkflowNodeRunID, job.ID, workflow.PrepareSpawnInfos(infos)); err != nil {
439438
return nil, sdk.WrapError(err, "Cannot save spawn info job %d", job.ID)
440439
}
441440

engine/api/workflow_queue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ func Test_postWorkflowJobTestsResultsHandler(t *testing.T) {
591591
req := assets.NewJWTAuthentifiedRequest(t, ctx.hatcheryToken, "POST", uri, info)
592592
rec := httptest.NewRecorder()
593593
router.Mux.ServeHTTP(rec, req)
594-
require.Equal(t, 202, rec.Code)
594+
require.Equal(t, 204, rec.Code)
595595

596596
//spawn
597597
uri = router.GetRoute("POST", api.postTakeWorkflowJobHandler, map[string]string{

engine/worker/internal/action/test_helper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ func (_ TestWorker) Register(ctx context.Context) error {
7171
func (_ TestWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) error {
7272
return nil
7373
}
74-
func (_ TestWorker) ProcessJob(job sdk.WorkflowNodeJobRunData) (sdk.Result, error) {
75-
return sdk.Result{}, nil
74+
func (_ TestWorker) ProcessJob(job sdk.WorkflowNodeJobRunData) sdk.Result {
75+
return sdk.Result{}
7676
}
7777
func (w TestWorker) SendLog(ctx context.Context, level workerruntime.Level, format string) {
7878
w.t.Log("SendLog> [" + string(level) + "] " + format)

engine/worker/internal/run.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func processActionVariables(a *sdk.Action, parent *sdk.Action, jobParameters []s
9090
// replaces placeholder in all children recursively
9191
for i := range a.Actions {
9292
if err := processActionVariables(&a.Actions[i], a, jobParameters, secrets); err != nil {
93-
return nil
93+
return err
9494
}
9595
}
9696

@@ -109,7 +109,7 @@ func (w *CurrentWorker) replaceVariablesPlaceholder(a *sdk.Action, params []sdk.
109109
return nil
110110
}
111111

112-
func (w *CurrentWorker) runJob(ctx context.Context, a *sdk.Action, jobID int64, secrets []sdk.Variable) (sdk.Result, error) {
112+
func (w *CurrentWorker) runJob(ctx context.Context, a *sdk.Action, jobID int64, secrets []sdk.Variable) sdk.Result {
113113
log.Info(ctx, "runJob> start job %s (%d)", a.Name, jobID)
114114
defer func() { log.Info(ctx, "runJob> job %s (%d)", a.Name, jobID) }()
115115

@@ -124,7 +124,7 @@ func (w *CurrentWorker) runJob(ctx context.Context, a *sdk.Action, jobID int64,
124124
if err := w.updateStepStatus(ctx, jobID, jobStepIndex, sdk.StatusBuilding); err != nil {
125125
jobResult.Status = sdk.StatusFail
126126
jobResult.Reason = fmt.Sprintf("Cannot update step (%d) status (%s): %v", jobStepIndex, sdk.StatusBuilding, err)
127-
return jobResult, err
127+
return jobResult
128128
}
129129
var stepResult = sdk.Result{
130130
Status: sdk.StatusNeverBuilt,
@@ -163,7 +163,7 @@ func (w *CurrentWorker) runJob(ctx context.Context, a *sdk.Action, jobID int64,
163163
if err := w.updateStepStatus(ctx, jobID, jobStepIndex, stepResult.Status); err != nil {
164164
jobResult.Status = sdk.StatusFail
165165
jobResult.Reason = fmt.Sprintf("Cannot update step (%d) status (%s): %v", jobStepIndex, sdk.StatusBuilding, err)
166-
return jobResult, err
166+
return jobResult
167167
}
168168
}
169169

@@ -178,7 +178,7 @@ func (w *CurrentWorker) runJob(ctx context.Context, a *sdk.Action, jobID int64,
178178
if nCriticalFailed > 0 {
179179
jobResult.Status = sdk.StatusFail
180180
}
181-
return jobResult, nil
181+
return jobResult
182182
}
183183

184184
func (w *CurrentWorker) runAction(ctx context.Context, a sdk.Action, jobID int64, secrets []sdk.Variable, actionName string) sdk.Result {
@@ -459,7 +459,7 @@ func (w *CurrentWorker) setupKeysDirectory(ctx context.Context, jobInfo sdk.Work
459459
return kdFile, kdAbs, nil
460460
}
461461

462-
func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Result, error) {
462+
func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (res sdk.Result) {
463463
ctx := w.currentJob.context
464464
t0 := time.Now()
465465

@@ -472,6 +472,8 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
472472
defer cancel()
473473

474474
ctx = workerruntime.SetJobID(ctx, jobInfo.NodeJobRun.ID)
475+
ctx = workerruntime.SetStepOrder(ctx, 0)
476+
475477
// start logger routine with a large buffer
476478
w.logger.logChan = make(chan sdk.Log, 100000)
477479
go func() {
@@ -484,13 +486,16 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
484486
log.Error(ctx, "processJob> Drain logs error: %v", err)
485487
}
486488
}()
489+
defer func() {
490+
log.Error(ctx, "processJob> Status: %s | Reason: %s", res.Status, res.Reason)
491+
}()
487492

488493
wdFile, wdAbs, err := w.setupWorkingDirectory(ctx, jobInfo)
489494
if err != nil {
490495
return sdk.Result{
491496
Status: sdk.StatusFail,
492497
Reason: fmt.Sprintf("Error: unable to setup workfing directory: %v", err),
493-
}, err
498+
}
494499
}
495500
ctx = workerruntime.SetWorkingDirectory(ctx, wdFile)
496501
log.Debug("processJob> Setup workspace - %s", wdFile.Name())
@@ -500,7 +505,7 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
500505
return sdk.Result{
501506
Status: sdk.StatusFail,
502507
Reason: fmt.Sprintf("Error: unable to setup keys directory: %v", err),
503-
}, err
508+
}
504509
}
505510
ctx = workerruntime.SetKeysDirectory(ctx, kdFile)
506511
log.Debug("processJob> Setup key directory - %s", kdFile.Name())
@@ -527,8 +532,8 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
527532
if err := processVariablesAndParameters(&jobInfo.NodeJobRun.Job.Action, jobParameters, jobInfo.Secrets); err != nil {
528533
return sdk.Result{
529534
Status: sdk.StatusFail,
530-
Reason: fmt.Sprintf("Error: cannot process job %s parameters", jobInfo.NodeJobRun.Job.Action.Name),
531-
}, err
535+
Reason: fmt.Sprintf("unable to process job %s: %v", jobInfo.NodeJobRun.Job.Action.Name, err),
536+
}
532537
}
533538

534539
// Add secrets as string or password in ActionBuild.Args
@@ -544,7 +549,7 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
544549

545550
w.currentJob.params = jobParameters
546551

547-
res, err := w.runJob(ctx, &jobInfo.NodeJobRun.Job.Action, jobInfo.NodeJobRun.ID, jobInfo.Secrets)
552+
res = w.runJob(ctx, &jobInfo.NodeJobRun.Job.Action, jobInfo.NodeJobRun.ID, jobInfo.Secrets)
548553

549554
if len(res.NewVariables) > 0 {
550555
log.Debug("processJob> new variables: %v", res.NewVariables)
@@ -563,5 +568,5 @@ func (w *CurrentWorker) ProcessJob(jobInfo sdk.WorkflowNodeJobRunData) (sdk.Resu
563568
log.Error(ctx, "Cannot remove basedir content: %s", err)
564569
}
565570

566-
return res, err
571+
return res
567572
}

engine/worker/internal/take.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er
8282
}(cancel, job.ID, tick)
8383

8484
//Run !
85-
res, err := w.ProcessJob(*info)
86-
// We keep the err for later usage
85+
res := w.ProcessJob(*info)
8786
tick.Stop()
8887

8988
res.RemoteTime = time.Now()
@@ -92,6 +91,17 @@ func (w *CurrentWorker) Take(ctx context.Context, job sdk.WorkflowNodeJobRun) er
9291
//Wait until the logchannel is empty
9392
res.BuildID = job.ID
9493

94+
// Send the reason as a spawninfo
95+
if res.Status != sdk.StatusSuccess && res.Reason != "" {
96+
infos := []sdk.SpawnInfo{{
97+
RemoteTime: time.Now(),
98+
Message: sdk.SpawnMsg{ID: sdk.MsgWorkflowError.ID, Args: []interface{}{res.Reason}},
99+
}}
100+
if err := w.Client().QueueJobSendSpawnInfo(ctx, job.ID, infos); err != nil {
101+
log.Error(ctx, "processJob> Unable to send spawn info: %v", err)
102+
}
103+
}
104+
95105
var lasterr error
96106
for try := 1; try <= 10; try++ {
97107
log.Info(ctx, "takeWorkflowJob> Sending build result...")

0 commit comments

Comments
 (0)