Skip to content
24 changes: 23 additions & 1 deletion src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Worker
const EXIT_SUCCESS = 0;
const EXIT_ERROR = 1;
const EXIT_MEMORY_LIMIT = 12;
const EXIT_CACHE_FAILED = 13;

/**
* The name of the worker.
Expand Down Expand Up @@ -92,13 +93,27 @@ class Worker
*/
public $paused = false;

/**
* Indicates if the worker restart cache retrieval fails.
*
* @var bool
*/
public $cacheFailed = false;

/**
* The callbacks used to pop jobs from queues.
*
* @var callable[]
*/
protected static $popCallbacks = [];

/**
* The custom exit code to be used when cache retrieval fails.
*
* @var int|null
*/
public static $cacheFailedExitCode;

/**
* The custom exit code to be used when memory is exceeded.
*
Expand Down Expand Up @@ -283,6 +298,7 @@ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $que
{
return ! ((($this->isDownForMaintenance)() && ! $options->force) ||
$this->paused ||
$this->cacheFailed ||
$this->events->until(new Looping($connectionName, $queue)) === false);
}

Expand Down Expand Up @@ -315,6 +331,7 @@ protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $startT
return match (true) {
$this->shouldQuit => static::EXIT_SUCCESS,
$this->memoryExceeded($options->memory) => static::$memoryExceededExitCode ?? static::EXIT_MEMORY_LIMIT,
$this->cacheFailed => static::$cacheFailedExitCode ?? static::EXIT_CACHE_FAILED,
$this->queueShouldRestart($lastRestart) => static::EXIT_SUCCESS,
$options->stopWhenEmpty && is_null($job) => static::EXIT_SUCCESS,
$options->maxTime && hrtime(true) / 1e9 - $startTime >= $options->maxTime => static::EXIT_SUCCESS,
Expand Down Expand Up @@ -733,7 +750,12 @@ protected function queueShouldRestart($lastRestart)
protected function getTimestampOfLastQueueRestart()
{
if ($this->cache) {
return $this->cache->get('illuminate:queue:restart');
try {
return $this->cache->get('illuminate:queue:restart');
} catch (Throwable $e) {
$this->exceptions->report($e);
$this->cacheFailed = true;
}
}
}

Expand Down
36 changes: 36 additions & 0 deletions tests/Integration/Queue/WorkCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@

namespace Illuminate\Tests\Integration\Queue;

use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Cache\CacheManager;
use Illuminate\Cache\Repository;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\UniqueConstraintViolationException;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Foundation\Testing\DatabaseMigrations;
use Illuminate\Queue\Worker;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Exceptions;
use Illuminate\Support\Facades\Queue;
use Mockery as m;
use Orchestra\Testbench\Attributes\WithMigration;
use RuntimeException;

Expand Down Expand Up @@ -184,6 +189,37 @@ public function testMemoryExitCode()
Worker::$memoryExceededExitCode = null;
}

public function testCacheErrorExitCode()
{
$this->markTestSkippedWhenUsingQueueDrivers(['redis', 'beanstalkd']);

Worker::$cacheFailedExitCode = 100;

Queue::push(new FirstJob);

Cache::swap($cacheManager = m::mock(CacheManager::class)->makePartial());

$repository = m::mock(Repository::class);

$cacheManager->shouldReceive('driver')
->twice()
->andReturn($repository);

$repository->shouldReceive('get')
->once()
->with('illuminate:queue:restart')
->andThrow(new Exception('Cache read failed'));

$this->artisan('queue:work', [
'--daemon' => true,
])->assertExitCode(100);

$this->assertSame(1, Queue::size());
$this->assertFalse(FirstJob::$ran);

Worker::$cacheFailedExitCode = null;
}

public function testFailedJobListenerOnlyRunsOnce()
{
$this->markTestSkippedWhenUsingQueueDrivers(['redis', 'beanstalkd']);
Expand Down
30 changes: 25 additions & 5 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
namespace Illuminate\Tests\Queue;

use Exception;
use Illuminate\Cache\Repository;
use Illuminate\Container\Container;
use Illuminate\Contracts\Cache\Store;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Queue\Job as QueueJobContract;
Expand Down Expand Up @@ -387,6 +389,29 @@ public function testWorkerPicksJobUsingCustomCallbacks()
Worker::popUsing('myworker', null);
}

public function testWorkerHandlesCacheFailed()
{
$workerOptions = new WorkerOptions();
$workerOptions->stopWhenEmpty = true;

$mockStore = m::mock(Store::class);

$mockStore->expects('get')
->with('illuminate:queue:restart')
->andThrow(new Exception('Cache read failed'));

$worker = $this->getWorker('default', ['queue' => [
$firstJob = new WorkerFakeJob,
]]);

$worker->setCache(new Repository($mockStore));

$worker->daemon('default', 'queue', $workerOptions);

$this->assertFalse($firstJob->fired);
$this->assertTrue($worker->cacheFailed);
}

public function testWorkerStartingIsDispatched()
{
$workerOptions = new WorkerOptions();
Expand Down Expand Up @@ -457,11 +482,6 @@ public function stop($status = 0, $options = null)
return $status;
}

public function daemonShouldRun(WorkerOptions $options, $connectionName, $queue)
{
return ! ($this->isDownForMaintenance)();
}

public function memoryExceeded($memoryLimit)
{
return $this->stopOnMemoryExceeded;
Expand Down