Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Add ability to clear queues #34330

Merged
merged 3 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Illuminate/Contracts/Queue/ClearableQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Illuminate\Contracts\Queue;

interface ClearableQueue
{
/**
* Clear all jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue);
}
14 changes: 14 additions & 0 deletions src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
use Illuminate\Foundation\Console\ViewClearCommand;
use Illuminate\Notifications\Console\NotificationTableCommand;
use Illuminate\Queue\Console\BatchesTableCommand;
use Illuminate\Queue\Console\ClearCommand as QueueClearCommand;
use Illuminate\Queue\Console\FailedTableCommand;
use Illuminate\Queue\Console\FlushFailedCommand as FlushFailedQueueCommand;
use Illuminate\Queue\Console\ForgetFailedCommand as ForgetFailedQueueCommand;
Expand Down Expand Up @@ -96,6 +97,7 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
'Optimize' => 'command.optimize',
'OptimizeClear' => 'command.optimize.clear',
'PackageDiscover' => 'command.package.discover',
'QueueClear' => 'command.queue.clear',
'QueueFailed' => 'command.queue.failed',
'QueueFlush' => 'command.queue.flush',
'QueueForget' => 'command.queue.forget',
Expand Down Expand Up @@ -712,6 +714,18 @@ protected function registerQueueWorkCommand()
});
}

/**
* Register the command.
*
* @return void
*/
protected function registerQueueClearCommand()
{
$this->app->singleton('command.queue.clear', function () {
return new QueueClearCommand;
});
}

/**
* Register the command.
*
Expand Down
73 changes: 73 additions & 0 deletions src/Illuminate/Queue/Console/ClearCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Console\ConfirmableTrait;
use Illuminate\Contracts\Queue\ClearableQueue;
use ReflectionClass;

class ClearCommand extends Command
{
use ConfirmableTrait;

/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:clear
{connection? : The name of the queue connection to clear}
{--queue= : The name of the queue to clear}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Clear the queue';

/**
* Execute the console command.
*
* @return int|null
*/
public function handle()
{
if (! $this->confirmToProceed()) {
return 1;
}

$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queueName = $this->getQueue($connection);
$queue = ($this->laravel['queue'])->connection($connection);

if ($queue instanceof ClearableQueue) {
$count = $queue->clear($queueName);

$this->line('<info>Cleared '.$count.' jobs from the '.$queueName.' queue</info> ');
} else {
$this->line('<error>Clearing queues is not supported on '.(new ReflectionClass($queue))->getShortName().'</error> ');
}

return 0;
}

/**
* Get the queue name to clear.
*
* @param string $connection
* @return string
*/
protected function getQueue($connection)
{
return $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);
}
}
16 changes: 15 additions & 1 deletion src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

namespace Illuminate\Queue;

use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
use Illuminate\Support\Carbon;
use PDO;

class DatabaseQueue extends Queue implements QueueContract
class DatabaseQueue extends Queue implements QueueContract, ClearableQueue
{
/**
* The database connection instance.
Expand Down Expand Up @@ -303,6 +304,19 @@ protected function markJobAsReserved($job)
return $job;
}

/**
* Clear all jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue)
{
return $this->database->table($this->table)
->where('queue', $this->getQueue($queue))
->delete();
}

/**
* Delete a reserved job from the queue.
*
Expand Down
18 changes: 18 additions & 0 deletions src/Illuminate/Queue/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ public static function size()
LUA;
}

/**
* Get the Lua script for clearing the queue.
*
* KEYS[1] - The name of the primary queue
* KEYS[2] - The name of the "delayed" queue
* KEYS[3] - The name of the "reserved" queue
*
* @return string
*/
public static function clear()
{
return <<<'LUA'
local size = redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
redis.call('del', KEYS[1], KEYS[2], KEYS[3])
return size
LUA;
}

/**
* Get the Lua script for pushing jobs onto the queue.
*
Expand Down
18 changes: 17 additions & 1 deletion src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

namespace Illuminate\Queue;

use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Contracts\Redis\Factory as Redis;
use Illuminate\Queue\Jobs\RedisJob;
use Illuminate\Support\Str;

class RedisQueue extends Queue implements QueueContract
class RedisQueue extends Queue implements QueueContract, ClearableQueue
{
/**
* The Redis factory implementation.
Expand Down Expand Up @@ -256,6 +257,21 @@ protected function retrieveNextJob($queue, $block = true)
return [$job, $reserved];
}

/**
* Clear all jobs from the queue.
*
* @param string $queue
* @return int
*/
public function clear($queue)
{
$queue = $this->getQueue($queue);

return $this->getConnection()->eval(
LuaScripts::clear(), 3, $queue, $queue.':delayed', $queue.':reserved'
);
}

/**
* Delete a reserved job from the queue.
*
Expand Down
29 changes: 29 additions & 0 deletions tests/Queue/QueueDatabaseQueueIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,35 @@ public function testPoppedJobsIncrementAttempts()
$this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!');
}

/**
* Test that the queue can be cleared.
*/
public function testThatQueueCanBeCleared()
{
$this->connection()
->table('jobs')
->insert([[
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved_at' => Carbon::now()->addDay()->getTimestamp(),
'available_at' => Carbon::now()->subDay()->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
], [
'id' => 2,
'queue' => $mock_queue_name,
'payload' => 'mock_payload 2',
'attempts' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]]);

$this->assertEquals(2, $this->queue->clear($mock_queue_name));
$this->assertEquals(0, $this->queue->size());
}

/**
* Test that jobs that are not reserved and have an available_at value in the future, are not popped.
*/
Expand Down
19 changes: 19 additions & 0 deletions tests/Queue/RedisQueueIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,25 @@ public function testDelete($driver)
$this->assertNull($this->queue->pop());
}

/**
* @dataProvider redisDriverProvider
*
* @param string $driver
*/
public function testClear($driver)
{
$this->setQueue($driver);

$job1 = new RedisQueueIntegrationTestJob(30);
$job2 = new RedisQueueIntegrationTestJob(40);

$this->queue->push($job1);
$this->queue->push($job2);

$this->assertEquals(2, $this->queue->clear(null));
$this->assertEquals(0, $this->queue->size());
}

/**
* @dataProvider redisDriverProvider
*
Expand Down