diff --git a/engine/worker/cmd_run.go b/engine/worker/cmd_run.go index f29340405b..f66839bf9e 100644 --- a/engine/worker/cmd_run.go +++ b/engine/worker/cmd_run.go @@ -118,28 +118,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { if !w.disableOldWorkflows && w.bookedPBJobID != 0 { w.processBookedPBJob(pbjobs) } - var exceptJobID int64 - if w.bookedWJobID != 0 { - if errP := w.processBookedWJob(wjobs); errP != nil { - // Unbook job - if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil { - log.Error("runCmd> QueueJobRelease> Cannot release job") - } - exceptJobID = w.bookedWJobID - w.bookedWJobID = 0 - } else { - exceptJobID = w.bookedWJobID - } - } - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) - } - - go func(ctx context.Context, exceptID *int64) { - if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil { - log.Info("Queues polling stopped: %v", err) - } - }(ctx, &exceptJobID) //Definition of the function which must be called to stop the worker var endFunc = func() { @@ -169,6 +147,34 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { } } + var exceptJobID int64 + if w.bookedWJobID != 0 { + if errP := w.processBookedWJob(wjobs); errP != nil { + // Unbook job + if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil { + log.Error("runCmd> QueueJobRelease> Cannot release job") + } + exceptJobID = w.bookedWJobID + w.bookedWJobID = 0 + // this worker was spawned for a job + // this job can't be process (errP != nil) + // so, call endFunc() now, this worker don't have to work + // on another job + endFunc() + return + } + exceptJobID = w.bookedWJobID + } + if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + } + + go func(ctx context.Context, exceptID *int64) { + if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil { + log.Info("Queues polling stopped: %v", err) + } + }(ctx, &exceptJobID) + // Errors check loops go func(errs chan error) { for { diff --git a/engine/worker/take_workflow_node_run_job.go b/engine/worker/take_workflow_node_run_job.go index de30a5c670..e43fab99f4 100644 --- a/engine/worker/take_workflow_node_run_job.go +++ b/engine/worker/take_workflow_node_run_job.go @@ -21,6 +21,9 @@ import ( func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (bool, error) { info, err := w.client.QueueTakeJob(job, w.bookedWJobID == job.ID) if err != nil { + if w.bookedWJobID == job.ID { + return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can't work on another job.") + } return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.") } t := ""