Skip to content

Commit

Permalink
fix(worker): work with booked job (#3397)
Browse files Browse the repository at this point in the history
  • Loading branch information
yesnault authored and fsamin committed Oct 4, 2018
1 parent 9bf9df8 commit 2469fbc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
50 changes: 28 additions & 22 deletions engine/worker/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,6 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
if !w.disableOldWorkflows && w.bookedPBJobID != 0 {
w.processBookedPBJob(pbjobs)
}
var exceptJobID int64
if w.bookedWJobID != 0 {
if errP := w.processBookedWJob(wjobs); errP != nil {
// Unbook job
if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil {
log.Error("runCmd> QueueJobRelease> Cannot release job")
}
exceptJobID = w.bookedWJobID
w.bookedWJobID = 0
} else {
exceptJobID = w.bookedWJobID
}
}
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}

go func(ctx context.Context, exceptID *int64) {
if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil {
log.Info("Queues polling stopped: %v", err)
}
}(ctx, &exceptJobID)

//Definition of the function which must be called to stop the worker
var endFunc = func() {
Expand Down Expand Up @@ -169,6 +147,34 @@ func runCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
}
}

var exceptJobID int64
if w.bookedWJobID != 0 {
if errP := w.processBookedWJob(wjobs); errP != nil {
// Unbook job
if errR := w.client.QueueJobRelease(true, w.bookedWJobID); errR != nil {
log.Error("runCmd> QueueJobRelease> Cannot release job")
}
exceptJobID = w.bookedWJobID
w.bookedWJobID = 0
// this worker was spawned for a job
// this job can't be process (errP != nil)
// so, call endFunc() now, this worker don't have to work
// on another job
endFunc()
return
}
exceptJobID = w.bookedWJobID
}
if err := w.client.WorkerSetStatus(sdk.StatusWaiting); err != nil {
log.Error("WorkerSetStatus> error on WorkerSetStatus(sdk.StatusWaiting): %s", err)
}

go func(ctx context.Context, exceptID *int64) {
if err := w.client.QueuePolling(ctx, wjobs, pbjobs, errs, 2*time.Second, 0, "", nil, exceptID); err != nil {
log.Info("Queues polling stopped: %v", err)
}
}(ctx, &exceptJobID)

// Errors check loops
go func(errs chan error) {
for {
Expand Down
3 changes: 3 additions & 0 deletions engine/worker/take_workflow_node_run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
func (w *currentWorker) takeWorkflowJob(ctx context.Context, job sdk.WorkflowNodeJobRun) (bool, error) {
info, err := w.client.QueueTakeJob(job, w.bookedWJobID == job.ID)
if err != nil {
if w.bookedWJobID == job.ID {
return false, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can't work on another job.")
}
return true, sdk.WrapError(err, "takeWorkflowJob> Unable to take workflow node run job. This worker can work on another job.")
}
t := ""
Expand Down

0 comments on commit 2469fbc

Please sign in to comment.