Skip to content

Commit

Permalink
Check jobs before working to see if they have already been received t…
Browse files Browse the repository at this point in the history
…oo many times.

Resolves an issue with the `--timeout` feature where jobs the repeatedly timed out would never be marked as `failed`, as the worker process would be killed before
it could reach the failing logic.

To maintain compatibility there are now two checks against the number of attempts a job has had, one before working the job and one in the case of an job raising an exception.

see laravel#15317 for more details.
  • Loading branch information
Max Brokman committed Sep 7, 2016
1 parent e68358d commit de2e30a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/Illuminate/Queue/AttemptsExceededException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php


namespace Illuminate\Queue;


use RuntimeException;

class AttemptsExceededException extends RuntimeException
{
//
}
43 changes: 43 additions & 0 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ public function process($connectionName, $job, WorkerOptions $options)
try {
$this->raiseBeforeJobEvent($connectionName, $job);

// Check if this job has already been received too many times
$this->markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $options->maxTries);

// Here we will fire off the job and let it process. We will catch any exceptions so
// they can be reported to the developers logs, etc. Once the job is finished the
// proper events will be fired to let any listeners know this job has finished.
Expand Down Expand Up @@ -250,6 +253,29 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
throw $e;
}

/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts. This will likely be because
* the job previously exceeded a timeout.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param int $maxTries
* @return void
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
if ($maxTries === 0 || $job->attempts() <= $maxTries) {
return;
}

$e = new AttemptsExceededException(
"Queue job has already been attempted more than maxTries, it may have previously timed out");

$this->failJob($connectionName, $job, $e);

throw $e;
}

/**
* Mark the given job as failed if it has exceeded the maximum allowed attempts.
*
Expand All @@ -266,6 +292,23 @@ protected function markJobAsFailedIfHasExceededMaxAttempts(
return;
}

$this->failJob($connectionName, $job, $e);
}

/**
* Mark the given job as failed and raise the relevant event.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param \Exception $e
* @return void
*/
protected function failJob($connectionName, $job, $e)
{
if ($job->isDeleted()) {
return;
}

// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
Expand Down
30 changes: 27 additions & 3 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?php

use Illuminate\Queue\AttemptsExceededException;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
Expand Down Expand Up @@ -89,10 +90,14 @@ public function test_job_is_not_released_if_it_has_exceeded_max_attempts()
{
$e = new RuntimeException;

$job = new WorkerFakeJob(function () use ($e) {
$job = new WorkerFakeJob(function ($job) use ($e) {

// In normal use this would be incremented by being popped off the queue
$job->attempts++;

throw $e;
});
$job->attempts = 5;
$job->attempts = 1;

$worker = $this->getWorker('default', ['queue' => [$job]]);
$worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));
Expand All @@ -106,6 +111,25 @@ public function test_job_is_not_released_if_it_has_exceeded_max_attempts()
$this->events->shouldNotHaveReceived('fire', [Mockery::type(JobProcessed::class)]);
}

public function test_job_is_failed_if_it_has_already_exceeded_max_attempts()
{
$job = new WorkerFakeJob(function ($job) {
$job->attempts++;
});
$job->attempts = 2;

$worker = $this->getWorker('default', ['queue' => [$job]]);
$worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));

$this->assertNull($job->releaseAfter);
$this->assertTrue($job->deleted);
$this->assertInstanceOf(AttemptsExceededException::class, $job->failedWith);
$this->exceptionHandler->shouldHaveReceived('report')->with(Mockery::type(AttemptsExceededException::class));
$this->events->shouldHaveReceived('fire')->with(Mockery::type(JobExceptionOccurred::class))->once();
$this->events->shouldHaveReceived('fire')->with(Mockery::type(JobFailed::class))->once();
$this->events->shouldNotHaveReceived('fire', [Mockery::type(JobProcessed::class)]);
}

/**
* Helpers...
*/
Expand Down Expand Up @@ -212,7 +236,7 @@ public function __construct($callback = null)
public function fire()
{
$this->fired = true;
$this->callback->__invoke();
$this->callback->__invoke($this);
}

public function payload()
Expand Down

0 comments on commit de2e30a

Please sign in to comment.