From a766dfda2b95b08d29cee959dbd975b8154e7c60 Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Wed, 3 Oct 2018 17:54:06 +0200 Subject: [PATCH 1/3] fix(worker): worker life don't allow worker to work on another job if it was created to work on one specified job this will fix a bunch of issue on worker using pre-requisite service & model options Signed-off-by: Yvonnick Esnault --- engine/worker/take_workflow_node_run_job.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/engine/worker/take_workflow_node_run_job.go b/engine/worker/take_workflow_node_run_job.go index de30a5c670..7defc320a1 100644 --- a/engine/worker/take_workflow_node_run_job.go +++ b/engine/worker/take_workflow_node_run_job.go @@ -26,6 +26,8 @@ func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNod t := "" if w.bookedWJobID == job.ID { t = ", this was my booked job" + } else if w.bookedWJobID > 0 { + return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker didn't find booked job, can't work on another job.") } log.Info("takeWorkflowJob> Job %d taken%s", job.ID, t) From a2039e715dccf35a54bd02fcd520588500741d8e Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Wed, 3 Oct 2018 21:06:22 +0200 Subject: [PATCH 2/3] cr Signed-off-by: Yvonnick Esnault --- engine/worker/cmd_run.go | 50 ++++++++++++--------- engine/worker/take_workflow_node_run_job.go | 8 ++-- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/engine/worker/cmd_run.go b/engine/worker/cmd_run.go index f29340405b..f66839bf9e 100644 --- a/engine/worker/cmd_run.go +++ b/engine/worker/cmd_run.go @@ -118,28 +118,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { if !w.disableOldWorkflows && w.bookedPBJobID != 0 { w.processBookedPBJob(pbjobs) } - var exceptJobID int64 - if w.bookedWJobID != 0 { - if errP := w.processBookedWJob(wjobs); errP != nil { - // Unbook job - if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil { - log.Error("runCmd> QueueJobRelease> Cannot release job") - } - exceptJobID = w.bookedWJobID - w.bookedWJobID = 0 - } else { - exceptJobID = w.bookedWJobID - } - } - if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { - log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) - } - - go func(ctx context.Context, exceptID *int64) { - if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil { - log.Info("Queues polling stopped: %v", err) - } - }(ctx, &exceptJobID) //Definition of the function which must be called to stop the worker var endFunc = func() { @@ -169,6 +147,34 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) { } } + var exceptJobID int64 + if w.bookedWJobID != 0 { + if errP := w.processBookedWJob(wjobs); errP != nil { + // Unbook job + if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil { + log.Error("runCmd> QueueJobRelease> Cannot release job") + } + exceptJobID = w.bookedWJobID + w.bookedWJobID = 0 + // this worker was spawned for a job + // this job can't be process (errP != nil) + // so, call endFunc() now, this worker don't have to work + // on another job + endFunc() + return + } + exceptJobID = w.bookedWJobID + } + if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil { + log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err) + } + + go func(ctx context.Context, exceptID *int64) { + if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil { + log.Info("Queues polling stopped: %v", err) + } + }(ctx, &exceptJobID) + // Errors check loops go func(errs chan error) { for { diff --git a/engine/worker/take_workflow_node_run_job.go b/engine/worker/take_workflow_node_run_job.go index 7defc320a1..bb0eff7fb4 100644 --- a/engine/worker/take_workflow_node_run_job.go +++ b/engine/worker/take_workflow_node_run_job.go @@ -21,13 +21,15 @@ import ( func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (bool, error) { info, err := w.client.QueueTakeJob(job, w.bookedWJobID == job.ID) if err != nil { - return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.") + if w.bookedWJobID == job.ID { + return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can't work on another job.") + } else { + return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.") + } } t := "" if w.bookedWJobID == job.ID { t = ", this was my booked job" - } else if w.bookedWJobID > 0 { - return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker didn't find booked job, can't work on another job.") } log.Info("takeWorkflowJob> Job %d taken%s", job.ID, t) From 8f7561a10bfd8f328935808498e642a7e889aa4f Mon Sep 17 00:00:00 2001 From: Yvonnick Esnault Date: Thu, 4 Oct 2018 10:05:08 +0200 Subject: [PATCH 3/3] cr Signed-off-by: Yvonnick Esnault --- engine/worker/take_workflow_node_run_job.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/engine/worker/take_workflow_node_run_job.go b/engine/worker/take_workflow_node_run_job.go index bb0eff7fb4..e43fab99f4 100644 --- a/engine/worker/take_workflow_node_run_job.go +++ b/engine/worker/take_workflow_node_run_job.go @@ -23,9 +23,8 @@ func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNod if err != nil { if w.bookedWJobID == job.ID { return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can't work on another job.") - } else { - return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.") } + return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.") } t := "" if w.bookedWJobID == job.ID {